using Microsoft.Extensions.Logging; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading.Tasks; namespace EVCB_OCPP.WSServer.Helper; public class GroupSingleHandler { public GroupSingleHandler(Func, Task> handleFunc, ILogger logger, int workerCnt = 1) { this.handleFunc = handleFunc; this.logger = logger; //singleWorkLock = new (_WorkerCnt); //singleHandleTask = StartHandleTask(); _handleTasks = new Task[workerCnt]; for (int cnt = 0; cnt < workerCnt; cnt++) { _handleTasks[cnt] = StartHandleTask(); } } //private int _WorkerCnt = 1; //public int WorkerCnt //{ // get => _WorkerCnt; // set // { // if (IsStarted) // { // throw new Exception($"{nameof(WorkerCnt)} must not be changed afted {nameof(HandleAsync)} is called"); // } // _WorkerCnt = value; // singleWorkLock = new (_WorkerCnt); // } //} private readonly Func, Task> handleFunc; private readonly ILogger logger; private readonly BlockingCollection<(T param, SemaphoreSlim waitLock)> blockQueue = new(); //private SemaphoreSlim singleWorkLock;// = new SemaphoreSlim(1); private bool IsStarted = false; private Task singleHandleTask; private Task[] _handleTasks; public Task HandleAsync(T param) { IsStarted = true; SemaphoreSlim reqLock = new(0); blockQueue.Add((param, reqLock)); //TryStartHandler(); return reqLock.WaitAsync(); } //private void TryStartHandler() //{ // if (!singleWorkLock.Wait(0)) // { // return; // } // if (waitList.Count == 0) // { // singleWorkLock.Release(); // return; // } // singleHandleTask = StartHandleTask(); //} private Task StartHandleTask() { return Task.Run(async () => { while (true) { var handleList = new List<(T param, SemaphoreSlim waitLock)>(); try { var startData = blockQueue.Take(); handleList.Add(startData); } catch (InvalidOperationException e) { logger.LogError(e, "blockQueue.Take Error"); break; } var watch = Stopwatch.StartNew(); long t0, t1, t2, t3; while (blockQueue.TryTake(out var data)) { handleList.Add(data); } t0 = watch.ElapsedMilliseconds; var task = handleFunc(handleList.Select(x => x.param).ToList()); t1 = watch.ElapsedMilliseconds; await task.ConfigureAwait(false); t2 = watch.ElapsedMilliseconds; foreach (var handled in handleList) { handled.waitLock.Release(); } watch.Stop(); t3 = watch.ElapsedMilliseconds; if (t3 > 1000) { logger.LogWarning("StartHandleTask {0}/{1}/{2}/{3}", t0, t1, t2, t3); } } }); } //private async Task StartHandleTask() //{ // var handleList = new List<(T param, SemaphoreSlim waitLock)>(); // while (waitList.TryDequeue(out var handle)) // { // handleList.Add(handle); // } // int cnt = 0; // do // { // cnt++; // try // { // var task = handleFunc(handleList.Select(x => x.param)); // await task; // break; // } // catch (Exception e) // { // logger.LogError(e, "Trying Cnt {0}", cnt); // } // } // while (true); // foreach (var handled in handleList) // { // handled.waitLock.Release(); // } // singleWorkLock.Release(); // TryStartHandler(); //} }