GroupHandler.cs 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. using Microsoft.Extensions.Logging;
  2. using Microsoft.IdentityModel.Tokens;
  3. using System;
  4. using System.Collections.Concurrent;
  5. using System.Collections.Generic;
  6. using System.Diagnostics;
  7. using System.Linq;
  8. using System.Text;
  9. using System.Threading.Tasks;
  10. namespace EVCB_OCPP.WSServer.Helper;
  11. public class GroupHandler<T>
  12. {
  13. public GroupHandler(
  14. Func<BundleHandlerData<T>, Task> handleFunc,
  15. ILogger logger, int workerCnt = 1, int maxRetry = 10)
  16. {
  17. this.handleFunc = handleFunc;
  18. this.logger = logger;
  19. this.maxRetry = maxRetry;
  20. workersLock = new SemaphoreSlim(workerCnt);
  21. //singleWorkLock = new(_WorkerCnt);
  22. }
  23. private readonly Func<BundleHandlerData<T>, Task> handleFunc;
  24. private readonly ILogger logger;
  25. private readonly int maxRetry;
  26. private readonly ConcurrentQueue<WaitParam<T>> waitList = new();
  27. private SemaphoreSlim workersLock;// = new SemaphoreSlim(1);
  28. public async Task HandleAsync(T param)
  29. {
  30. var waitData = new WaitParam<T>() { Data = param, Waiter = new SemaphoreSlim(0), Exception = null };
  31. waitList.Enqueue(waitData);
  32. TryStartHandler();
  33. await waitData.Waiter.WaitAsync();
  34. if (waitData.Exception is not null)
  35. {
  36. throw waitData.Exception;
  37. }
  38. }
  39. private void TryStartHandler()
  40. {
  41. if (!workersLock.Wait(0))
  42. {
  43. return;
  44. }
  45. if (waitList.Count == 0)
  46. {
  47. workersLock.Release();
  48. return;
  49. }
  50. _ = StartHandleTask();
  51. }
  52. private async Task StartHandleTask()
  53. {
  54. var timer = Stopwatch.StartNew();
  55. List<long> times = new();
  56. var requests = new List<WaitParam<T>>();
  57. while (waitList.TryDequeue(out var handle))
  58. {
  59. requests.Add(handle);
  60. }
  61. times.Add(timer.ElapsedMilliseconds);
  62. int cnt = 0;
  63. Exception lastException = null;
  64. var datas = requests.Select(x => x.Data).ToList();
  65. for (; cnt < maxRetry; cnt++)
  66. {
  67. var bundleHandledata = new BundleHandlerData<T>(datas);
  68. try
  69. {
  70. await handleFunc(bundleHandledata);
  71. }
  72. catch (Exception e)
  73. {
  74. lastException = e;
  75. }
  76. var completedRequests = requests.Where(x => bundleHandledata.CompletedDatas.Contains(x.Data)).ToList();
  77. foreach (var request in completedRequests)
  78. {
  79. request.Waiter.Release();
  80. }
  81. datas = datas.Except(bundleHandledata.CompletedDatas).ToList();
  82. if (datas.IsNullOrEmpty())
  83. {
  84. break;
  85. }
  86. logger.LogError(lastException?.Message);
  87. times.Add(timer.ElapsedMilliseconds);
  88. }
  89. var uncompletedRequests = requests.Where(x => datas.Contains(x.Data)).ToList();
  90. foreach (var request in uncompletedRequests)
  91. {
  92. request.Exception = lastException;
  93. request.Waiter.Release();
  94. }
  95. workersLock.Release();
  96. timer.Stop();
  97. if (timer.ElapsedMilliseconds > 1000)
  98. {
  99. logger.LogWarning($"StartHandleTask {string.Join("/", times)}");
  100. }
  101. TryStartHandler();
  102. }
  103. }
  104. public class BundleHandlerData<T>
  105. {
  106. public List<T> Datas { get; set; }
  107. public List<T> CompletedDatas { get; set; }
  108. public BundleHandlerData(List<T> datas)
  109. {
  110. Datas = datas;
  111. CompletedDatas = new();
  112. }
  113. public void AddCompletedData(T competedData)
  114. {
  115. CompletedDatas.Add(competedData);
  116. }
  117. }
  118. internal class WaitParam<T>
  119. {
  120. public T Data { get; init; }
  121. public SemaphoreSlim Waiter { get; init; }
  122. public Exception Exception { get; set; }
  123. }