using Microsoft.Extensions.Logging; using Microsoft.IdentityModel.Tokens; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading.Tasks; namespace EVCB_OCPP.WSServer.Helper; public class GroupHandler { public GroupHandler( Func, Task> handleFunc, ILogger logger, int workerCnt = 1, int maxRetry = 10) { this.handleFunc = handleFunc; this.logger = logger; this.maxRetry = maxRetry; workersLock = new SemaphoreSlim(workerCnt); //singleWorkLock = new(_WorkerCnt); } private readonly Func, Task> handleFunc; private readonly ILogger logger; private readonly int maxRetry; private readonly ConcurrentQueue> waitList = new(); private SemaphoreSlim workersLock;// = new SemaphoreSlim(1); public async Task HandleAsync(T param) { var waitData = new WaitParam() { Data = param, Waiter = new SemaphoreSlim(0), Exception = null }; waitList.Enqueue(waitData); TryStartHandler(); await waitData.Waiter.WaitAsync(); if (waitData.Exception is not null) { throw waitData.Exception; } } private void TryStartHandler() { if (!workersLock.Wait(0)) { return; } if (waitList.Count == 0) { workersLock.Release(); return; } _ = StartHandleTask(); } private async Task StartHandleTask() { var timer = Stopwatch.StartNew(); List times = new(); var requests = new List>(); while (waitList.TryDequeue(out var handle)) { requests.Add(handle); } times.Add(timer.ElapsedMilliseconds); int cnt = 0; Exception lastException = null; var datas = requests.Select(x => x.Data).ToList(); for (; cnt < maxRetry; cnt++) { var bundleHandledata = new BundleHandlerData(datas); try { await handleFunc(bundleHandledata); } catch (Exception e) { lastException = e; } var completedRequests = requests.Where(x => bundleHandledata.CompletedDatas.Contains(x.Data)).ToList(); foreach (var request in completedRequests) { request.Waiter.Release(); } datas = datas.Except(bundleHandledata.CompletedDatas).ToList(); if (datas.IsNullOrEmpty()) { break; } logger.LogError(lastException?.Message); times.Add(timer.ElapsedMilliseconds); } var uncompletedRequests = requests.Where(x => datas.Contains(x.Data)).ToList(); foreach (var request in uncompletedRequests) { request.Exception = lastException; request.Waiter.Release(); } workersLock.Release(); timer.Stop(); if (timer.ElapsedMilliseconds > 1000) { logger.LogWarning($"StartHandleTask {string.Join("/", times)}"); } TryStartHandler(); } } public class BundleHandlerData { public List Datas { get; set; } public List CompletedDatas { get; set; } public BundleHandlerData(List datas) { Datas = datas; CompletedDatas = new(); } public void AddCompletedData(T competedData) { CompletedDatas.Add(competedData); } } internal class WaitParam { public T Data { get; init; } public SemaphoreSlim Waiter { get; init; } public Exception Exception { get; set; } }