QueueHandler.cs 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Text;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. using static System.Runtime.InteropServices.JavaScript.JSType;
  9. namespace EVCB_OCPP.WSServer.Helper;
  10. public class QueueHandler<T>
  11. {
  12. public QueueHandler(Func<T,Task> handler = null,int maxConcurrency = 1)
  13. {
  14. _queue = new ();
  15. this._handler = handler;
  16. this._semaphore = new (maxConcurrency);
  17. this._handlerPair = new();
  18. }
  19. public void Enqueue(T data)
  20. {
  21. _queue.Enqueue(data);
  22. _ = TrtStartHandler();
  23. }
  24. public void Enqueue(T data, Func<T, Task> handler)
  25. {
  26. _queue.Enqueue(data);
  27. _handlerPair.Add(data, handler);
  28. _ = TrtStartHandler();
  29. }
  30. private readonly ConcurrentQueue<T> _queue;
  31. private readonly Func<T, Task> _handler;
  32. private readonly SemaphoreSlim _semaphore;
  33. private readonly Dictionary<T, Func<T, Task>> _handlerPair;
  34. private async Task TrtStartHandler()
  35. {
  36. if(_semaphore.Wait(0))
  37. {
  38. return;
  39. }
  40. if(!_queue.TryDequeue(out var data))
  41. {
  42. return;
  43. }
  44. var handler = GetHandler(data);
  45. await handler(data);
  46. _semaphore.Release();
  47. _ = TrtStartHandler();
  48. }
  49. private Func<T, Task> GetHandler(T data)
  50. {
  51. if (_handlerPair.TryGetValue(data, out Func<T, Task> value))
  52. {
  53. return value;
  54. }
  55. if (_handler is not null)
  56. {
  57. return _handler;
  58. }
  59. return (data) => Task.CompletedTask;
  60. }
  61. }