GroupSingleHandler.cs 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. using Microsoft.Extensions.Logging;
  2. using System;
  3. using System.Collections.Concurrent;
  4. using System.Collections.Generic;
  5. using System.Linq;
  6. using System.Text;
  7. using System.Threading.Tasks;
  8. namespace EVCB_OCPP.WSServer.Helper;
  9. public class GroupSingleHandler<T>
  10. {
  11. private readonly Func<IEnumerable<T>, Task> handleFunc;
  12. private readonly ILogger logger;
  13. private readonly ConcurrentQueue<(T param, SemaphoreSlim waitLock)> waitList = new();
  14. //private readonly Dictionary<T, SemaphoreSlim> reqLockPaisrs = new();
  15. private readonly SemaphoreSlim singleWorkLock = new SemaphoreSlim(1);
  16. private Task singleHandleTask;
  17. public GroupSingleHandler(Func<IEnumerable<T>, Task> handleFunc, ILogger logger)
  18. {
  19. this.handleFunc = handleFunc;
  20. this.logger = logger;
  21. }
  22. public Task HandleAsync(T param)
  23. {
  24. SemaphoreSlim reqLock = new(0);
  25. waitList.Enqueue((param, reqLock));
  26. TryStartHandler();
  27. return reqLock.WaitAsync();
  28. }
  29. private void TryStartHandler()
  30. {
  31. if (!singleWorkLock.Wait(0))
  32. {
  33. return;
  34. }
  35. if (waitList.Count == 0)
  36. {
  37. singleWorkLock.Release();
  38. return;
  39. }
  40. singleHandleTask = StartHandleTask();
  41. }
  42. private async Task StartHandleTask()
  43. {
  44. var handleList = new List<(T param, SemaphoreSlim waitLock)>();
  45. while (waitList.TryDequeue(out var handle))
  46. {
  47. handleList.Add(handle);
  48. }
  49. int cnt = 0;
  50. do
  51. {
  52. cnt++;
  53. try
  54. {
  55. var task = handleFunc(handleList.Select(x => x.param));
  56. await task;
  57. break;
  58. }
  59. catch (Exception e)
  60. {
  61. logger.LogError(e, "Trying Cnt {0}", cnt);
  62. }
  63. }
  64. while (true);
  65. foreach (var handled in handleList)
  66. {
  67. handled.waitLock.Release();
  68. }
  69. singleWorkLock.Release();
  70. TryStartHandler();
  71. }
  72. }