GroupSingleHandler.cs 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. using Microsoft.Extensions.Logging;
  2. using System;
  3. using System.Collections.Concurrent;
  4. using System.Collections.Generic;
  5. using System.Diagnostics;
  6. using System.Linq;
  7. using System.Text;
  8. using System.Threading.Tasks;
  9. namespace EVCB_OCPP.WSServer.Helper;
  10. public class GroupSingleHandler<T>
  11. {
  12. public GroupSingleHandler(Func<IEnumerable<T>, Task> handleFunc, ILogger logger, int workerCnt = 1)
  13. {
  14. this.handleFunc = handleFunc;
  15. this.logger = logger;
  16. //singleWorkLock = new (_WorkerCnt);
  17. //singleHandleTask = StartHandleTask();
  18. _handleTasks = new Task[workerCnt];
  19. for (int cnt = 0; cnt < workerCnt; cnt++)
  20. {
  21. _handleTasks[cnt] = StartHandleTask();
  22. }
  23. }
  24. //private int _WorkerCnt = 1;
  25. //public int WorkerCnt
  26. //{
  27. // get => _WorkerCnt;
  28. // set
  29. // {
  30. // if (IsStarted)
  31. // {
  32. // throw new Exception($"{nameof(WorkerCnt)} must not be changed afted {nameof(HandleAsync)} is called");
  33. // }
  34. // _WorkerCnt = value;
  35. // singleWorkLock = new (_WorkerCnt);
  36. // }
  37. //}
  38. private readonly Func<IEnumerable<T>, Task> handleFunc;
  39. private readonly ILogger logger;
  40. private readonly BlockingCollection<(T param, SemaphoreSlim waitLock)> blockQueue = new();
  41. //private SemaphoreSlim singleWorkLock;// = new SemaphoreSlim(1);
  42. private bool IsStarted = false;
  43. private Task singleHandleTask;
  44. private Task[] _handleTasks;
  45. public Task HandleAsync(T param)
  46. {
  47. IsStarted = true;
  48. SemaphoreSlim reqLock = new(0);
  49. blockQueue.Add((param, reqLock));
  50. //TryStartHandler();
  51. return reqLock.WaitAsync();
  52. }
  53. //private void TryStartHandler()
  54. //{
  55. // if (!singleWorkLock.Wait(0))
  56. // {
  57. // return;
  58. // }
  59. // if (waitList.Count == 0)
  60. // {
  61. // singleWorkLock.Release();
  62. // return;
  63. // }
  64. // singleHandleTask = StartHandleTask();
  65. //}
  66. private Task StartHandleTask()
  67. {
  68. return Task.Run(async () => {
  69. while (true)
  70. {
  71. var handleList = new List<(T param, SemaphoreSlim waitLock)>();
  72. try
  73. {
  74. var startData = blockQueue.Take();
  75. handleList.Add(startData);
  76. }
  77. catch (InvalidOperationException e)
  78. {
  79. logger.LogError(e, "blockQueue.Take Error");
  80. break;
  81. }
  82. var watch = Stopwatch.StartNew();
  83. long t0, t1, t2, t3;
  84. while (blockQueue.TryTake(out var data))
  85. {
  86. handleList.Add(data);
  87. }
  88. t0 = watch.ElapsedMilliseconds;
  89. var task = handleFunc(handleList.Select(x => x.param).ToList());
  90. t1 = watch.ElapsedMilliseconds;
  91. await task.ConfigureAwait(false);
  92. t2 = watch.ElapsedMilliseconds;
  93. foreach (var handled in handleList)
  94. {
  95. handled.waitLock.Release();
  96. }
  97. watch.Stop();
  98. t3 = watch.ElapsedMilliseconds;
  99. if (t3 > 1000)
  100. {
  101. logger.LogWarning("StartHandleTask {0}/{1}/{2}/{3}", t0, t1, t2, t3);
  102. }
  103. }
  104. });
  105. }
  106. //private async Task StartHandleTask()
  107. //{
  108. // var handleList = new List<(T param, SemaphoreSlim waitLock)>();
  109. // while (waitList.TryDequeue(out var handle))
  110. // {
  111. // handleList.Add(handle);
  112. // }
  113. // int cnt = 0;
  114. // do
  115. // {
  116. // cnt++;
  117. // try
  118. // {
  119. // var task = handleFunc(handleList.Select(x => x.param));
  120. // await task;
  121. // break;
  122. // }
  123. // catch (Exception e)
  124. // {
  125. // logger.LogError(e, "Trying Cnt {0}", cnt);
  126. // }
  127. // }
  128. // while (true);
  129. // foreach (var handled in handleList)
  130. // {
  131. // handled.waitLock.Release();
  132. // }
  133. // singleWorkLock.Release();
  134. // TryStartHandler();
  135. //}
  136. }