GroupSingleHandler.cs 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. using Microsoft.Extensions.Logging;
  2. using System;
  3. using System.Collections.Concurrent;
  4. using System.Collections.Generic;
  5. using System.Linq;
  6. using System.Text;
  7. using System.Threading.Tasks;
  8. namespace EVCB_OCPP.WSServer.Helper;
  9. public class GroupSingleHandler<T>
  10. {
  11. public GroupSingleHandler(Func<IEnumerable<T>, Task> handleFunc, ILogger logger, int workerCnt = 1)
  12. {
  13. this.handleFunc = handleFunc;
  14. this.logger = logger;
  15. //singleWorkLock = new (_WorkerCnt);
  16. singleHandleTask = StartHandleTask();
  17. _handleTasks = new Task[workerCnt];
  18. for (int cnt = 0; cnt < workerCnt; cnt++)
  19. {
  20. _handleTasks[cnt] = StartHandleTask();
  21. }
  22. }
  23. //private int _WorkerCnt = 1;
  24. //public int WorkerCnt
  25. //{
  26. // get => _WorkerCnt;
  27. // set
  28. // {
  29. // if (IsStarted)
  30. // {
  31. // throw new Exception($"{nameof(WorkerCnt)} must not be changed afted {nameof(HandleAsync)} is called");
  32. // }
  33. // _WorkerCnt = value;
  34. // singleWorkLock = new (_WorkerCnt);
  35. // }
  36. //}
  37. private readonly Func<IEnumerable<T>, Task> handleFunc;
  38. private readonly ILogger logger;
  39. private readonly BlockingCollection<(T param, SemaphoreSlim waitLock)> blockQueue = new();
  40. //private SemaphoreSlim singleWorkLock;// = new SemaphoreSlim(1);
  41. private bool IsStarted = false;
  42. private Task singleHandleTask;
  43. private Task[] _handleTasks;
  44. public Task HandleAsync(T param)
  45. {
  46. IsStarted = true;
  47. SemaphoreSlim reqLock = new(0);
  48. blockQueue.Add((param, reqLock));
  49. //TryStartHandler();
  50. return reqLock.WaitAsync();
  51. }
  52. //private void TryStartHandler()
  53. //{
  54. // if (!singleWorkLock.Wait(0))
  55. // {
  56. // return;
  57. // }
  58. // if (waitList.Count == 0)
  59. // {
  60. // singleWorkLock.Release();
  61. // return;
  62. // }
  63. // singleHandleTask = StartHandleTask();
  64. //}
  65. private Task StartHandleTask()
  66. {
  67. return Task.Run(async () => {
  68. while (!blockQueue.IsCompleted)
  69. {
  70. var handleList = new List<(T param, SemaphoreSlim waitLock)>();
  71. try
  72. {
  73. //若blockQueue沒有資料,Take動作會被Block住,有資料時再往下執行
  74. //若
  75. var startData = blockQueue.Take();
  76. handleList.Add(startData);
  77. }
  78. catch (InvalidOperationException e) {
  79. logger.LogError(e, "blockQueue.Take Error");
  80. break;
  81. }
  82. while(blockQueue.TryTake(out var data))
  83. {
  84. handleList.Add(data);
  85. }
  86. var task = handleFunc(handleList.Select(x => x.param));
  87. await task;
  88. foreach (var handled in handleList)
  89. {
  90. handled.waitLock.Release();
  91. }
  92. }
  93. });
  94. }
  95. //private async Task StartHandleTask()
  96. //{
  97. // var handleList = new List<(T param, SemaphoreSlim waitLock)>();
  98. // while (waitList.TryDequeue(out var handle))
  99. // {
  100. // handleList.Add(handle);
  101. // }
  102. // int cnt = 0;
  103. // do
  104. // {
  105. // cnt++;
  106. // try
  107. // {
  108. // var task = handleFunc(handleList.Select(x => x.param));
  109. // await task;
  110. // break;
  111. // }
  112. // catch (Exception e)
  113. // {
  114. // logger.LogError(e, "Trying Cnt {0}", cnt);
  115. // }
  116. // }
  117. // while (true);
  118. // foreach (var handled in handleList)
  119. // {
  120. // handled.waitLock.Release();
  121. // }
  122. // singleWorkLock.Release();
  123. // TryStartHandler();
  124. //}
  125. }