ConfirmWaitingMessageSerevice.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. using Azure.Core;
  2. using Azure;
  3. using EVCB_OCPP.Domain;
  4. using EVCB_OCPP.Domain.Models.MainDb;
  5. using EVCB_OCPP.Packet.Features;
  6. using EVCB_OCPP.WSServer.Message;
  7. using Microsoft.EntityFrameworkCore;
  8. using Microsoft.EntityFrameworkCore.Query.Internal;
  9. using Microsoft.Extensions.Logging;
  10. using Microsoft.Identity.Client;
  11. using Newtonsoft.Json;
  12. using System;
  13. using System.Collections.Generic;
  14. using System.Linq;
  15. using System.Text;
  16. using System.Threading.Tasks;
  17. using EVCB_OCPP.WSServer.Service.DbService;
  18. using System.Collections.Concurrent;
  19. namespace EVCB_OCPP.WSServer.Service
  20. {
  21. public enum ConfirmWaitingMessageRemoveReason
  22. {
  23. HitRetryLimit,
  24. Confirmed,
  25. }
  26. public class ConfirmWaitingMessageSerevice
  27. {
  28. private static readonly List<string> needConfirmActions = new List<string>()
  29. {
  30. "GetConfiguration",
  31. "ChangeConfiguration",
  32. "RemoteStartTransaction",
  33. "RemoteStopTransaction",
  34. "ChangeAvailability",
  35. "ClearCache",
  36. "DataTransfer",
  37. "Reset",
  38. "UnlockConnector",
  39. "TriggerMessage",
  40. "GetDiagnostics",
  41. "UpdateFirmware",
  42. "GetLocalListVersion",
  43. "SendLocalList",
  44. "SetChargingProfile",
  45. "ClearChargingProfile",
  46. "GetCompositeSchedule",
  47. "ReserveNow",
  48. "CancelReservation",
  49. "ExtendedTriggerMessage"
  50. };
  51. public static bool IsNeedConfirm(string action)
  52. {
  53. return needConfirmActions.Contains(action);
  54. }
  55. public ConfirmWaitingMessageSerevice(
  56. IMainDbService mainDbService,
  57. IDbContextFactory<MainDBContext> maindbContextFactory
  58. ,ILogger<ConfirmWaitingMessageSerevice> logger)
  59. {
  60. this.mainDbService = mainDbService;
  61. this.maindbContextFactory = maindbContextFactory;
  62. this.logger = logger;
  63. }
  64. public event EventHandler<ConfirmWaitingMessageRemoveReason> OnMessageRemoved;
  65. private readonly Object _lockConfirmPacketList = new object();
  66. private readonly IMainDbService mainDbService;
  67. private readonly IDbContextFactory<MainDBContext> maindbContextFactory;
  68. private readonly ILogger<ConfirmWaitingMessageSerevice> logger;
  69. private readonly List<NeedConfirmMessage> needConfirmPacketList = new();
  70. private readonly ConcurrentDictionary<string, MessageResultWaitObject> asyncWaitingTasks = new();
  71. internal async Task Add(string chargePointSerialNumber, int table_id, string requestId, string action, string msg_id, string createdBy, string sendMessage)
  72. {
  73. NeedConfirmMessage _needConfirmMsg = new NeedConfirmMessage
  74. {
  75. Id = table_id,
  76. SentAction = action,
  77. SentOn = DateTime.UtcNow,
  78. SentTimes = 4,
  79. ChargePointSerialNumber = chargePointSerialNumber,
  80. RequestId = requestId,
  81. SentUniqueId = msg_id,
  82. CreatedBy = createdBy,
  83. SentMessage = sendMessage
  84. };
  85. if (needConfirmActions.Contains(action))
  86. {
  87. lock (_lockConfirmPacketList)
  88. {
  89. needConfirmPacketList.Add(_needConfirmMsg);
  90. }
  91. }
  92. #region 更新資料表單一欄位
  93. var dateTimeNow = DateTime.UtcNow;
  94. var _UpdatedItem = new ServerMessage() { Id = table_id, UpdatedOn = dateTimeNow };
  95. //db.Configuration.AutoDetectChangesEnabled = false;//自動呼叫DetectChanges()比對所有的entry集合的每一個屬性Properties的新舊值
  96. //db.Configuration.ValidateOnSaveEnabled = false;// 因為Entity有些欄位必填,若不避開會有Validate錯誤
  97. // var _UpdatedItem = db.ServerMessage.Where(x => x.Id == item.Id).FirstOrDefault();
  98. //using (var db = await maindbContextFactory.CreateDbContextAsync())
  99. //{
  100. // db.ServerMessage.Attach(_UpdatedItem);
  101. // _UpdatedItem.UpdatedOn = dateTimeNow;
  102. // db.Entry(_UpdatedItem).Property(x => x.UpdatedOn).IsModified = true;// 可以直接使用這方式強制某欄位要更新,只是查詢集合耗效能而己
  103. // await db.SaveChangesAsync();
  104. // db.ChangeTracker.Clear();
  105. //}
  106. await mainDbService.UpdateServerMessageUpdateTime(table_id);
  107. #endregion
  108. }
  109. internal List<NeedConfirmMessage> GetPendingMessages()
  110. {
  111. List<NeedConfirmMessage> sendMessages = new List<NeedConfirmMessage>();
  112. lock (_lockConfirmPacketList)
  113. {
  114. sendMessages = needConfirmPacketList.Where(x => x.SentTimes > 1 && x.CreatedBy == "Server").ToList();
  115. }
  116. return sendMessages;
  117. }
  118. internal async Task<bool> TryConfirmMessage(MessageResult analysisResult)
  119. {
  120. bool confirmed = false;
  121. //if (needConfirmActions.Contains(analysisResult.Action))
  122. if (!IsNeedConfirm(analysisResult.Action))
  123. {
  124. return confirmed;
  125. }
  126. NeedConfirmMessage foundRequest = null;
  127. lock (_lockConfirmPacketList)
  128. {
  129. foundRequest = needConfirmPacketList.Where(x => x.SentUniqueId == analysisResult.UUID).FirstOrDefault();
  130. }
  131. if (foundRequest != null && foundRequest.Id > 0)
  132. {
  133. RemoveWaitingMsg(foundRequest, ConfirmWaitingMessageRemoveReason.Confirmed, analysisResult);
  134. foundRequest.SentTimes = 0;
  135. foundRequest.SentInterval = 0;
  136. analysisResult.RequestId = foundRequest.RequestId;
  137. try
  138. {
  139. using (var db = await maindbContextFactory.CreateDbContextAsync())
  140. {
  141. var sc = await db.ServerMessage.Where(x => x.Id == foundRequest.Id).FirstOrDefaultAsync();
  142. sc.InMessage = JsonConvert.SerializeObject(analysisResult.Message, Formatting.None);
  143. sc.ReceivedOn = DateTime.UtcNow;
  144. await db.SaveChangesAsync();
  145. // Console.WriteLine(string.Format("Now:{0} ServerMessage Id:{1} ", DateTime.UtcNow.ToString("yyyy/MM/dd HH:mm:ss"), foundRequest.Id));
  146. }
  147. }
  148. catch (Exception ex)
  149. {
  150. logger.LogWarning(string.Format("TryConfirmMessage:{0}", JsonConvert.SerializeObject(analysisResult)));
  151. logger.LogWarning(ex.ToString());
  152. }
  153. confirmed = true;
  154. }
  155. else if (analysisResult.Action == Actions.TriggerMessage.ToString())
  156. {
  157. confirmed = true;
  158. }
  159. else
  160. {
  161. logger.LogError(string.Format("Received no record Action:{0} MessageId:{1} ", analysisResult.Action, analysisResult.UUID));
  162. }
  163. return confirmed;
  164. }
  165. internal void SignalMessageSended(NeedConfirmMessage resendItem)
  166. {
  167. var dateTimeNow = DateTime.UtcNow;
  168. resendItem.SentTimes--;
  169. resendItem.SentOn = dateTimeNow;
  170. }
  171. internal void RemoveExpiredConfirmMessage()
  172. {
  173. var before10Mins = DateTime.UtcNow.AddMinutes(-10);
  174. var removeList = needConfirmPacketList.Where(x => x.SentTimes == 0 || x.SentOn < before10Mins).ToList();
  175. foreach (var item in removeList)
  176. {
  177. RemoveWaitingMsg(item, ConfirmWaitingMessageRemoveReason.HitRetryLimit);
  178. }
  179. }
  180. internal async Task<object> SendAndWaitUntilResultAsync(Func<Task<string>> startSendTaskFunc, CancellationToken token = default)
  181. {
  182. object message;
  183. do
  184. {
  185. var requestId = await startSendTaskFunc();
  186. message = await WaitResultAsync(requestId, token: token);
  187. }
  188. while (message == null && !token.IsCancellationRequested);
  189. return message;
  190. }
  191. internal Task<object> SendAndWaitUntilResultAsync(Func<string, Task> startSendTaskFunc, int maxWaitSec = 180_000, CancellationToken token = default)
  192. {
  193. Func<string, CancellationToken, Task> func = (string result, CancellationToken token) => startSendTaskFunc(result);
  194. return SendAndWaitUntilResultAsync(func, maxWaitSec: maxWaitSec , token: token);
  195. }
  196. internal async Task<object> SendAndWaitUntilResultAsync(Func<string, CancellationToken, Task> startSendTaskFunc, int maxWaitSec = 180_000, CancellationToken token = default)
  197. {
  198. object message;
  199. do
  200. {
  201. message = await SendAndWaitResultAsync(startSendTaskFunc, maxWaitSec: maxWaitSec, token: token);
  202. }
  203. while (message == null && !token.IsCancellationRequested);
  204. return message;
  205. }
  206. internal Task<object> SendAndWaitResultAsync(Func<string, Task> startSendTaskFunc, int maxWaitSec = 1_000, CancellationToken token = default)
  207. {
  208. Func<string, CancellationToken, Task> func = (string result, CancellationToken token) => startSendTaskFunc(result);
  209. return SendAndWaitResultAsync(func, maxWaitSec: maxWaitSec, token: token);
  210. }
  211. internal async Task<object> SendAndWaitResultAsync(Func<string, CancellationToken, Task> startSendTaskFunc, int maxWaitSec = 1_000, CancellationToken token = default)
  212. {
  213. object message;
  214. var SerialNo = Guid.NewGuid().ToString();
  215. var waitObject = CreateAndAddWaitObject(SerialNo);
  216. await startSendTaskFunc(SerialNo, token);
  217. message = await WaitResultAsync(SerialNo, waitObject: waitObject, maxWaitSec: maxWaitSec, token: token);
  218. return message;
  219. }
  220. internal async Task<object> WaitResultAsync(string serialNo, MessageResultWaitObject waitObject = null, int maxWaitSec = 180_000, CancellationToken token = default)
  221. {
  222. if (waitObject == null)
  223. {
  224. waitObject = new MessageResultWaitObject();
  225. asyncWaitingTasks.TryAdd(serialNo, waitObject);
  226. }
  227. var task = waitObject.Lock.WaitAsync(token);
  228. var completedTask = await Task.WhenAny(task, Task.Delay(maxWaitSec, token));
  229. if (completedTask != task)
  230. {
  231. logger.LogWarning("wait {msg} time out", serialNo);
  232. }
  233. asyncWaitingTasks.Remove(serialNo, out var _);
  234. return waitObject.Result?.Message;
  235. }
  236. private MessageResultWaitObject CreateAndAddWaitObject(string serialNo)
  237. {
  238. var waiObj = new MessageResultWaitObject();
  239. asyncWaitingTasks.TryAdd(serialNo, waiObj);
  240. return waiObj;
  241. }
  242. private void RemoveWaitingMsg(
  243. NeedConfirmMessage msg,
  244. ConfirmWaitingMessageRemoveReason reason,
  245. MessageResult response = null)
  246. {
  247. lock (_lockConfirmPacketList)
  248. {
  249. needConfirmPacketList.Remove(msg);
  250. }
  251. if (asyncWaitingTasks.Keys.Contains(msg.RequestId))
  252. {
  253. asyncWaitingTasks[msg.RequestId].Result = response;
  254. asyncWaitingTasks[msg.RequestId].Lock.Release();
  255. }
  256. OnMessageRemoved?.Invoke(this, reason);
  257. }
  258. }
  259. internal class MessageResultWaitObject
  260. {
  261. public MessageResult Result { get; set; } = null;
  262. public SemaphoreSlim Lock { get; set; } = new SemaphoreSlim(0);
  263. }
  264. }