1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- using Microsoft.Extensions.Logging;
- using System;
- using System.Collections.Concurrent;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
- namespace EVCB_OCPP.WSServer.Helper;
- public class GroupSingleHandler<T>
- {
- private readonly Func<IEnumerable<T>, Task> handleFunc;
- private readonly ILogger logger;
- private readonly ConcurrentQueue<(T param, SemaphoreSlim waitLock)> waitList = new();
- //private readonly Dictionary<T, SemaphoreSlim> reqLockPaisrs = new();
- private readonly SemaphoreSlim singleWorkLock = new SemaphoreSlim(1);
- private Task singleHandleTask;
- public GroupSingleHandler(Func<IEnumerable<T>, Task> handleFunc, ILogger logger)
- {
- this.handleFunc = handleFunc;
- this.logger = logger;
- }
- public Task HandleAsync(T param)
- {
- SemaphoreSlim reqLock = new(0);
- waitList.Enqueue((param, reqLock));
- TryStartHandler();
- return reqLock.WaitAsync();
- }
- private void TryStartHandler()
- {
- if (!singleWorkLock.Wait(0))
- {
- return;
- }
- if (waitList.Count == 0)
- {
- singleWorkLock.Release();
- return;
- }
- singleHandleTask = StartHandleTask();
- }
- private async Task StartHandleTask()
- {
- var handleList = new List<(T param, SemaphoreSlim waitLock)>();
- while (waitList.TryDequeue(out var handle))
- {
- handleList.Add(handle);
- }
- int cnt = 0;
- do
- {
- cnt++;
- try
- {
- var task = handleFunc(handleList.Select(x => x.param));
- await task;
- break;
- }
- catch (Exception e)
- {
- logger.LogError(e, "Trying Cnt {0}", cnt);
- }
- }
- while (true);
- foreach (var handled in handleList)
- {
- handled.waitLock.Release();
- }
- singleWorkLock.Release();
- TryStartHandler();
- }
- }
|