using System.Collections.Concurrent; using System.Diagnostics; namespace EVCB_OCPP.DBAPI.Helper; public class GroupHandler where TI : class { public GroupHandler( Func, Task> handleFunc, ILogger logger, int workerCnt = 1, int maxRetry = 10) { this.handleFunc = handleFunc; this.logger = logger; this.maxRetry = maxRetry; this.workersLock = new SemaphoreSlim(workerCnt); //singleWorkLock = new(_WorkerCnt); } private readonly Func, Task> handleFunc; private readonly ILogger logger; private readonly int maxRetry; private readonly ConcurrentQueue> waitList = new(); private readonly SemaphoreSlim workersLock;// = new SemaphoreSlim(1); public async Task HandleAsync(TI param, CancellationToken token = default) { var waitData = new WaitParam() { Data = param, Waiter = new SemaphoreSlim(0), Exception = null }; waitList.Enqueue(waitData); TryStartHandler(); await waitData.Waiter.WaitAsync(token); if (waitData.Exception is not null) { throw waitData.Exception; } return waitData.Result; } private void TryStartHandler() { if (!workersLock.Wait(0)) { return; } if (waitList.Count == 0) { workersLock.Release(); return; } _ = StartHandleTask(); } private async Task StartHandleTask() { var timer = Stopwatch.StartNew(); List times = new(); var requests = new List>(); while (waitList.TryDequeue(out var handle)) { requests.Add(handle); } times.Add(timer.ElapsedMilliseconds); int cnt = 0; Exception lastException = null; var datas = requests.Select(x => x.Data).ToList(); for (; cnt < maxRetry; cnt++) { var bundleHandledata = new BundleHandlerData(datas); try { await handleFunc(bundleHandledata); } catch (Exception e) { lastException = e; } var completedKeys = bundleHandledata.CompletedDatas.Select(x => x.Key).ToList(); var completedRequests = requests.Where(x => completedKeys.Any(y => y == x.Data)).ToList(); foreach (var request in completedRequests) { var result = bundleHandledata.CompletedDatas.FirstOrDefault(x => x.Key == request.Data); request.Result = result.Value; request.Waiter.Release(); } //datas = datas.Except(bundleHandledata.CompletedDatas).ToList(); datas = datas.Where(x => !completedKeys.Contains(x)).ToList(); if (datas == null || datas.Count == 0) { break; } logger?.LogError(lastException?.Message); times.Add(timer.ElapsedMilliseconds); } var uncompletedRequests = requests.Where(x => datas.Contains(x.Data)).ToList(); foreach (var request in uncompletedRequests) { request.Exception = lastException; request.Waiter.Release(); } workersLock.Release(); timer.Stop(); if (timer.ElapsedMilliseconds > 1000) { logger?.LogWarning($"StartHandleTask {string.Join("/", times)}"); } TryStartHandler(); } } public class BundleHandlerData { public List Datas { get; set; } public List> CompletedDatas { get; set; } public BundleHandlerData(List datas) { Datas = datas; CompletedDatas = new(); } public void AddCompletedData(TI competedData, TO result) { CompletedDatas.Add(new KeyValuePair(competedData, result)); ; } } internal class WaitParam { public TI Data { get; init; } public TO Result { get; set; } public SemaphoreSlim Waiter { get; init; } public Exception Exception { get; set; } }