123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119 |
- using Microsoft.Extensions.Logging;
- using System;
- using System.Collections.Concurrent;
- using System.Collections.Generic;
- using System.Diagnostics;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
- namespace EVCB_OCPP.WSServer.Helper;
- public class GroupSingleHandler<T>
- {
- public GroupSingleHandler(Func<IEnumerable<T>, Task> handleFunc, ILogger logger, int workerCnt = 1)
- {
- this.handleFunc = handleFunc;
- this.logger = logger;
- WorkerCnt = workerCnt;
- //singleWorkLock = new(_WorkerCnt);
- }
- private int _WorkerCnt = 1;
- public int WorkerCnt
- {
- get => _WorkerCnt;
- set
- {
- if (IsStarted)
- {
- throw new Exception($"{nameof(WorkerCnt)} must not be changed afted {nameof(HandleAsync)} is called");
- }
- _WorkerCnt = value;
- singleWorkLock = new(_WorkerCnt);
- }
- }
- private readonly Func<IEnumerable<T>, Task> handleFunc;
- private readonly ILogger logger;
- private readonly ConcurrentQueue<(T param, SemaphoreSlim waitLock)> waitList = new();
- private SemaphoreSlim singleWorkLock;// = new SemaphoreSlim(1);
- private bool IsStarted = false;
- private Task singleHandleTask;
- public Task HandleAsync(T param)
- {
- IsStarted = true;
- SemaphoreSlim reqLock = new(0);
- waitList.Enqueue((param, reqLock));
- TryStartHandler();
- return reqLock.WaitAsync();
- }
- private void TryStartHandler()
- {
- if (!singleWorkLock.Wait(0))
- {
- return;
- }
- if (waitList.Count == 0)
- {
- singleWorkLock.Release();
- return;
- }
- singleHandleTask = StartHandleTask();
- }
- private async Task StartHandleTask()
- {
- var timer = Stopwatch.StartNew();
- long t0 = 0, t1 = 0, t2 = 0;
- var handleList = new List<(T param, SemaphoreSlim waitLock)>();
- while (waitList.TryDequeue(out var handle))
- {
- handleList.Add(handle);
- }
- t0 = timer.ElapsedMilliseconds;
- int cnt = 0;
- do
- {
- cnt++;
- try
- {
- var task = handleFunc(handleList.Select(x => x.param));
- await task;
- t1 = timer.ElapsedMilliseconds;
- break;
- }
- catch (Exception e)
- {
- logger.LogError(e, "Trying Cnt {0}", cnt);
- logger.LogError(e.Message);
- }
- }
- while (true);
- foreach (var handled in handleList)
- {
- handled.waitLock.Release();
- }
- singleWorkLock.Release();
- timer.Stop();
- t2= timer.ElapsedMilliseconds;
- if (t2 >1000)
- {
- logger.LogWarning("StartHandleTask {0}/{1}/{2}", t0, t1, t2);
- }
- TryStartHandler();
- }
- }
|