GroupSingleHandler.cs 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. using Microsoft.Extensions.Logging;
  2. using System;
  3. using System.Collections.Concurrent;
  4. using System.Collections.Generic;
  5. using System.Linq;
  6. using System.Text;
  7. using System.Threading.Tasks;
  8. namespace EVCB_OCPP.WSServer.Helper;
  9. public class GroupSingleHandler<T>
  10. {
  11. public GroupSingleHandler(Func<IEnumerable<T>, Task> handleFunc, ILogger logger)
  12. {
  13. this.handleFunc = handleFunc;
  14. this.logger = logger;
  15. singleWorkLock = new (_WorkerCnt);
  16. }
  17. private int _WorkerCnt = 1;
  18. public int WorkerCnt
  19. {
  20. get => _WorkerCnt;
  21. set
  22. {
  23. if (IsStarted)
  24. {
  25. throw new Exception($"{nameof(WorkerCnt)} must not be changed afted {nameof(HandleAsync)} is called");
  26. }
  27. _WorkerCnt = value;
  28. singleWorkLock = new (_WorkerCnt);
  29. }
  30. }
  31. private readonly Func<IEnumerable<T>, Task> handleFunc;
  32. private readonly ILogger logger;
  33. private readonly ConcurrentQueue<(T param, SemaphoreSlim waitLock)> waitList = new();
  34. private SemaphoreSlim singleWorkLock;// = new SemaphoreSlim(1);
  35. private bool IsStarted = false;
  36. private Task singleHandleTask;
  37. public Task HandleAsync(T param)
  38. {
  39. IsStarted = true;
  40. SemaphoreSlim reqLock = new(0);
  41. waitList.Enqueue((param, reqLock));
  42. TryStartHandler();
  43. return reqLock.WaitAsync();
  44. }
  45. private void TryStartHandler()
  46. {
  47. if (!singleWorkLock.Wait(0))
  48. {
  49. return;
  50. }
  51. if (waitList.Count == 0)
  52. {
  53. singleWorkLock.Release();
  54. return;
  55. }
  56. singleHandleTask = StartHandleTask();
  57. }
  58. private async Task StartHandleTask()
  59. {
  60. var handleList = new List<(T param, SemaphoreSlim waitLock)>();
  61. while (waitList.TryDequeue(out var handle))
  62. {
  63. handleList.Add(handle);
  64. }
  65. int cnt = 0;
  66. do
  67. {
  68. cnt++;
  69. try
  70. {
  71. var task = handleFunc(handleList.Select(x => x.param));
  72. await task;
  73. break;
  74. }
  75. catch (Exception e)
  76. {
  77. logger.LogError(e, "Trying Cnt {0}", cnt);
  78. }
  79. }
  80. while (true);
  81. foreach (var handled in handleList)
  82. {
  83. handled.waitLock.Release();
  84. }
  85. singleWorkLock.Release();
  86. TryStartHandler();
  87. }
  88. }