using Microsoft.Extensions.Logging; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace EVCB_OCPP.WSServer.Helper; public class GroupSingleHandler { private readonly Func, Task> handleFunc; private readonly ILogger logger; private readonly ConcurrentQueue<(T param, SemaphoreSlim waitLock)> waitList = new(); //private readonly Dictionary reqLockPaisrs = new(); private readonly SemaphoreSlim singleWorkLock = new SemaphoreSlim(1); private Task singleHandleTask; public GroupSingleHandler(Func, Task> handleFunc, ILogger logger) { this.handleFunc = handleFunc; this.logger = logger; } public Task HandleAsync(T param) { SemaphoreSlim reqLock = new(0); waitList.Enqueue((param, reqLock)); TryStartHandler(); return reqLock.WaitAsync(); } private void TryStartHandler() { if (!singleWorkLock.Wait(0)) { return; } if (waitList.Count == 0) { singleWorkLock.Release(); return; } singleHandleTask = StartHandleTask(); } private async Task StartHandleTask() { var handleList = new List<(T param, SemaphoreSlim waitLock)>(); while (waitList.TryDequeue(out var handle)) { handleList.Add(handle); } int cnt = 0; do { cnt++; try { var task = handleFunc(handleList.Select(x => x.param)); await task; break; } catch (Exception e) { logger.LogError(e, "Trying Cnt {0}", cnt); } } while (true); foreach (var handled in handleList) { handled.waitLock.Release(); } singleWorkLock.Release(); TryStartHandler(); } }