123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- using System.Collections.Concurrent;
- using System.Diagnostics;
- namespace EVCB_OCPP.DBAPI.Helper;
- public class GroupHandler<TI, TO> where TI : class
- {
- public GroupHandler(
- Func<BundleHandlerData<TI, TO>, Task> handleFunc,
- ILogger logger, int workerCnt = 1, int maxRetry = 10)
- {
- this.handleFunc = handleFunc;
- this.logger = logger;
- this.maxRetry = maxRetry;
- this.workersLock = new SemaphoreSlim(workerCnt);
- //singleWorkLock = new(_WorkerCnt);
- }
- private readonly Func<BundleHandlerData<TI, TO>, Task> handleFunc;
- private readonly ILogger logger;
- private readonly int maxRetry;
- private readonly ConcurrentQueue<WaitParam<TI, TO>> waitList = new();
- private readonly SemaphoreSlim workersLock;// = new SemaphoreSlim(1);
- public async Task<TO> HandleAsync(TI param, CancellationToken token = default)
- {
- var waitData = new WaitParam<TI, TO>() { Data = param, Waiter = new SemaphoreSlim(0), Exception = null };
- waitList.Enqueue(waitData);
- TryStartHandler();
- await waitData.Waiter.WaitAsync(token);
- if (waitData.Exception is not null)
- {
- throw waitData.Exception;
- }
- return waitData.Result;
- }
- private void TryStartHandler()
- {
- if (!workersLock.Wait(0))
- {
- return;
- }
- if (waitList.Count == 0)
- {
- workersLock.Release();
- return;
- }
- _ = StartHandleTask();
- }
- private async Task StartHandleTask()
- {
- var timer = Stopwatch.StartNew();
- List<long> times = new();
- var requests = new List<WaitParam<TI, TO>>();
- while (waitList.TryDequeue(out var handle))
- {
- requests.Add(handle);
- }
- times.Add(timer.ElapsedMilliseconds);
- int cnt = 0;
- Exception lastException = null;
- var datas = requests.Select(x => x.Data).ToList();
- for (; cnt < maxRetry; cnt++)
- {
- var bundleHandledata = new BundleHandlerData<TI, TO>(datas);
- try
- {
- await handleFunc(bundleHandledata);
- }
- catch (Exception e)
- {
- lastException = e;
- }
- var completedKeys = bundleHandledata.CompletedDatas.Select(x => x.Key).ToList();
- var completedRequests = requests.Where(x => completedKeys.Any(y => y == x.Data)).ToList();
- foreach (var request in completedRequests)
- {
- var result = bundleHandledata.CompletedDatas.FirstOrDefault(x => x.Key == request.Data);
- request.Result = result.Value;
- request.Waiter.Release();
- }
- //datas = datas.Except(bundleHandledata.CompletedDatas).ToList();
- datas = datas.Where(x => !completedKeys.Contains(x)).ToList();
- if (datas == null || datas.Count == 0)
- {
- break;
- }
- logger?.LogError(lastException?.Message);
- times.Add(timer.ElapsedMilliseconds);
- }
- var uncompletedRequests = requests.Where(x => datas.Contains(x.Data)).ToList();
- foreach (var request in uncompletedRequests)
- {
- request.Exception = lastException;
- request.Waiter.Release();
- }
- workersLock.Release();
- timer.Stop();
- if (timer.ElapsedMilliseconds > 1000)
- {
- logger?.LogWarning($"StartHandleTask {string.Join("/", times)}");
- }
- TryStartHandler();
- }
- }
- public class BundleHandlerData<TI, TO>
- {
- public List<TI> Datas { get; set; }
- public List<KeyValuePair<TI, TO>> CompletedDatas { get; set; }
- public BundleHandlerData(List<TI> datas)
- {
- Datas = datas;
- CompletedDatas = new();
- }
- public void AddCompletedData(TI competedData, TO result)
- {
- CompletedDatas.Add(new KeyValuePair<TI, TO>(competedData, result)); ;
- }
- }
- internal class WaitParam<TI, TO>
- {
- public TI Data { get; init; }
- public TO Result { get; set; }
- public SemaphoreSlim Waiter { get; init; }
- public Exception Exception { get; set; }
- }
|