using Microsoft.Extensions.Logging; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace EVCB_OCPP.WSServer.Helper; public class GroupSingleHandler { public GroupSingleHandler(Func, Task> handleFunc, ILogger logger) { this.handleFunc = handleFunc; this.logger = logger; singleWorkLock = new (_WorkerCnt); } private int _WorkerCnt = 1; public int WorkerCnt { get => _WorkerCnt; set { if (IsStarted) { throw new Exception($"{nameof(WorkerCnt)} must not be changed afted {nameof(HandleAsync)} is called"); } _WorkerCnt = value; singleWorkLock = new (_WorkerCnt); } } private readonly Func, Task> handleFunc; private readonly ILogger logger; private readonly ConcurrentQueue<(T param, SemaphoreSlim waitLock)> waitList = new(); private SemaphoreSlim singleWorkLock;// = new SemaphoreSlim(1); private bool IsStarted = false; private Task singleHandleTask; public Task HandleAsync(T param) { IsStarted = true; SemaphoreSlim reqLock = new(0); waitList.Enqueue((param, reqLock)); TryStartHandler(); return reqLock.WaitAsync(); } private void TryStartHandler() { if (!singleWorkLock.Wait(0)) { return; } if (waitList.Count == 0) { singleWorkLock.Release(); return; } singleHandleTask = StartHandleTask(); } private async Task StartHandleTask() { var handleList = new List<(T param, SemaphoreSlim waitLock)>(); while (waitList.TryDequeue(out var handle)) { handleList.Add(handle); } int cnt = 0; do { cnt++; try { var task = handleFunc(handleList.Select(x => x.param)); await task; break; } catch (Exception e) { logger.LogError(e, "Trying Cnt {0}", cnt); } } while (true); foreach (var handled in handleList) { handled.waitLock.Release(); } singleWorkLock.Release(); TryStartHandler(); } }