using Microsoft.Extensions.Logging; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading.Tasks; namespace EVCB_OCPP.WSServer.Helper; public class GroupSingleHandler { public GroupSingleHandler(Func, Task> handleFunc, ILogger logger, int workerCnt = 1) { this.handleFunc = handleFunc; this.logger = logger; WorkerCnt = workerCnt; //singleWorkLock = new(_WorkerCnt); } private int _WorkerCnt = 1; public int WorkerCnt { get => _WorkerCnt; set { if (IsStarted) { throw new Exception($"{nameof(WorkerCnt)} must not be changed afted {nameof(HandleAsync)} is called"); } _WorkerCnt = value; singleWorkLock = new(_WorkerCnt); } } private readonly Func, Task> handleFunc; private readonly ILogger logger; private readonly ConcurrentQueue<(T param, SemaphoreSlim waitLock)> waitList = new(); private SemaphoreSlim singleWorkLock;// = new SemaphoreSlim(1); private bool IsStarted = false; private Task singleHandleTask; public Task HandleAsync(T param) { IsStarted = true; SemaphoreSlim reqLock = new(0); waitList.Enqueue((param, reqLock)); TryStartHandler(); return reqLock.WaitAsync(); } private void TryStartHandler() { if (!singleWorkLock.Wait(0)) { return; } if (waitList.Count == 0) { singleWorkLock.Release(); return; } singleHandleTask = StartHandleTask(); } private async Task StartHandleTask() { var timer = Stopwatch.StartNew(); long t0 = 0, t1 = 0, t2 = 0; var handleList = new List<(T param, SemaphoreSlim waitLock)>(); while (waitList.TryDequeue(out var handle)) { handleList.Add(handle); } t0 = timer.ElapsedMilliseconds; int cnt = 0; do { cnt++; try { var task = handleFunc(handleList.Select(x => x.param)); await task; t1 = timer.ElapsedMilliseconds; break; } catch (Exception e) { logger.LogError(e, "Trying Cnt {0}", cnt); logger.LogError(e.Message); } } while (true); foreach (var handled in handleList) { handled.waitLock.Release(); } singleWorkLock.Release(); timer.Stop(); t2= timer.ElapsedMilliseconds; if (t2 >1000) { logger.LogWarning("StartHandleTask {0}/{1}/{2}", t0, t1, t2); } TryStartHandler(); } }