123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- using Microsoft.Extensions.Logging;
- using System;
- using System.Collections.Concurrent;
- using System.Collections.Generic;
- using System.Diagnostics;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
- namespace EVCB_OCPP.WSServer.Helper;
- public class GroupSingleHandler<T>
- {
- public GroupSingleHandler(Func<IEnumerable<T>, Task> handleFunc, ILogger logger, int workerCnt = 1)
- {
- this.handleFunc = handleFunc;
- this.logger = logger;
- //singleWorkLock = new (_WorkerCnt);
- //singleHandleTask = StartHandleTask();
- _handleTasks = new Task[workerCnt];
- for (int cnt = 0; cnt < workerCnt; cnt++)
- {
- _handleTasks[cnt] = StartHandleTask();
- }
- }
- //private int _WorkerCnt = 1;
- //public int WorkerCnt
- //{
- // get => _WorkerCnt;
- // set
- // {
- // if (IsStarted)
- // {
- // throw new Exception($"{nameof(WorkerCnt)} must not be changed afted {nameof(HandleAsync)} is called");
- // }
- // _WorkerCnt = value;
- // singleWorkLock = new (_WorkerCnt);
- // }
- //}
- private readonly Func<IEnumerable<T>, Task> handleFunc;
- private readonly ILogger logger;
- private readonly BlockingCollection<(T param, SemaphoreSlim waitLock)> blockQueue = new();
- //private SemaphoreSlim singleWorkLock;// = new SemaphoreSlim(1);
- private bool IsStarted = false;
- private Task singleHandleTask;
- private Task[] _handleTasks;
- public Task HandleAsync(T param)
- {
- IsStarted = true;
- SemaphoreSlim reqLock = new(0);
- blockQueue.Add((param, reqLock));
- //TryStartHandler();
- return reqLock.WaitAsync();
- }
- //private void TryStartHandler()
- //{
- // if (!singleWorkLock.Wait(0))
- // {
- // return;
- // }
- // if (waitList.Count == 0)
- // {
- // singleWorkLock.Release();
- // return;
- // }
- // singleHandleTask = StartHandleTask();
- //}
- private Task StartHandleTask()
- {
- return Task.Run(async () => {
- while (true)
- {
- var handleList = new List<(T param, SemaphoreSlim waitLock)>();
- try
- {
- var startData = blockQueue.Take();
- handleList.Add(startData);
- }
- catch (InvalidOperationException e)
- {
- logger.LogError(e, "blockQueue.Take Error");
- break;
- }
- var watch = Stopwatch.StartNew();
- long t0, t1, t2, t3;
- while (blockQueue.TryTake(out var data))
- {
- handleList.Add(data);
- }
- t0 = watch.ElapsedMilliseconds;
- var task = handleFunc(handleList.Select(x => x.param).ToList());
- t1 = watch.ElapsedMilliseconds;
- await task.ConfigureAwait(false);
- t2 = watch.ElapsedMilliseconds;
- foreach (var handled in handleList)
- {
- handled.waitLock.Release();
- }
- watch.Stop();
- t3 = watch.ElapsedMilliseconds;
- if (t3 > 1000)
- {
- logger.LogWarning("StartHandleTask {0}/{1}/{2}/{3}", t0, t1, t2, t3);
- }
- }
- });
- }
- //private async Task StartHandleTask()
- //{
- // var handleList = new List<(T param, SemaphoreSlim waitLock)>();
- // while (waitList.TryDequeue(out var handle))
- // {
- // handleList.Add(handle);
- // }
- // int cnt = 0;
- // do
- // {
- // cnt++;
- // try
- // {
- // var task = handleFunc(handleList.Select(x => x.param));
- // await task;
- // break;
- // }
- // catch (Exception e)
- // {
- // logger.LogError(e, "Trying Cnt {0}", cnt);
- // }
- // }
- // while (true);
- // foreach (var handled in handleList)
- // {
- // handled.waitLock.Release();
- // }
- // singleWorkLock.Release();
- // TryStartHandler();
- //}
- }
|