123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305 |
- using Azure.Core;
- using Azure;
- using EVCB_OCPP.Domain;
- using EVCB_OCPP.Domain.Models.MainDb;
- using EVCB_OCPP.Packet.Features;
- using EVCB_OCPP.WSServer.Message;
- using Microsoft.EntityFrameworkCore;
- using Microsoft.EntityFrameworkCore.Query.Internal;
- using Microsoft.Extensions.Logging;
- using Microsoft.Identity.Client;
- using Newtonsoft.Json;
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
- using EVCB_OCPP.WSServer.Service.DbService;
- using System.Collections.Concurrent;
- namespace EVCB_OCPP.WSServer.Service
- {
- public enum ConfirmWaitingMessageRemoveReason
- {
- HitRetryLimit,
- Confirmed,
- }
- public class ConfirmWaitingMessageSerevice
- {
- private static readonly List<string> needConfirmActions = new List<string>()
- {
- "GetConfiguration",
- "ChangeConfiguration",
- "RemoteStartTransaction",
- "RemoteStopTransaction",
- "ChangeAvailability",
- "ClearCache",
- "DataTransfer",
- "Reset",
- "UnlockConnector",
- "TriggerMessage",
- "GetDiagnostics",
- "UpdateFirmware",
- "GetLocalListVersion",
- "SendLocalList",
- "SetChargingProfile",
- "ClearChargingProfile",
- "GetCompositeSchedule",
- "ReserveNow",
- "CancelReservation",
- "ExtendedTriggerMessage"
- };
- public static bool IsNeedConfirm(string action)
- {
- return needConfirmActions.Contains(action);
- }
- public ConfirmWaitingMessageSerevice(
- IMainDbService mainDbService,
- IDbContextFactory<MainDBContext> maindbContextFactory
- ,ILogger<ConfirmWaitingMessageSerevice> logger)
- {
- this.mainDbService = mainDbService;
- this.maindbContextFactory = maindbContextFactory;
- this.logger = logger;
- }
- public event EventHandler<ConfirmWaitingMessageRemoveReason> OnMessageRemoved;
- private readonly Object _lockConfirmPacketList = new object();
- private readonly IMainDbService mainDbService;
- private readonly IDbContextFactory<MainDBContext> maindbContextFactory;
- private readonly ILogger<ConfirmWaitingMessageSerevice> logger;
- private readonly List<NeedConfirmMessage> needConfirmPacketList = new();
- private readonly ConcurrentDictionary<string, MessageResultWaitObject> asyncWaitingTasks = new();
- internal async Task Add(string chargePointSerialNumber, int table_id, string requestId, string action, string msg_id, string createdBy, string sendMessage)
- {
- NeedConfirmMessage _needConfirmMsg = new NeedConfirmMessage
- {
- Id = table_id,
- SentAction = action,
- SentOn = DateTime.UtcNow,
- SentTimes = 4,
- ChargePointSerialNumber = chargePointSerialNumber,
- RequestId = requestId,
- SentUniqueId = msg_id,
- CreatedBy = createdBy,
- SentMessage = sendMessage
- };
- if (needConfirmActions.Contains(action))
- {
- lock (_lockConfirmPacketList)
- {
- needConfirmPacketList.Add(_needConfirmMsg);
- }
- }
- #region 更新資料表單一欄位
- var dateTimeNow = DateTime.UtcNow;
- var _UpdatedItem = new ServerMessage() { Id = table_id, UpdatedOn = dateTimeNow };
- //db.Configuration.AutoDetectChangesEnabled = false;//自動呼叫DetectChanges()比對所有的entry集合的每一個屬性Properties的新舊值
- //db.Configuration.ValidateOnSaveEnabled = false;// 因為Entity有些欄位必填,若不避開會有Validate錯誤
- // var _UpdatedItem = db.ServerMessage.Where(x => x.Id == item.Id).FirstOrDefault();
- //using (var db = await maindbContextFactory.CreateDbContextAsync())
- //{
- // db.ServerMessage.Attach(_UpdatedItem);
- // _UpdatedItem.UpdatedOn = dateTimeNow;
- // db.Entry(_UpdatedItem).Property(x => x.UpdatedOn).IsModified = true;// 可以直接使用這方式強制某欄位要更新,只是查詢集合耗效能而己
- // await db.SaveChangesAsync();
- // db.ChangeTracker.Clear();
- //}
- await mainDbService.UpdateServerMessageUpdateTime(table_id);
- #endregion
- }
- internal List<NeedConfirmMessage> GetPendingMessages()
- {
- List<NeedConfirmMessage> sendMessages = new List<NeedConfirmMessage>();
- lock (_lockConfirmPacketList)
- {
- sendMessages = needConfirmPacketList.Where(x => x.SentTimes > 1 && x.CreatedBy == "Server").ToList();
- }
- return sendMessages;
- }
- internal async Task<bool> TryConfirmMessage(MessageResult analysisResult)
- {
- bool confirmed = false;
- //if (needConfirmActions.Contains(analysisResult.Action))
- if (!IsNeedConfirm(analysisResult.Action))
- {
- return confirmed;
- }
- NeedConfirmMessage foundRequest = null;
- lock (_lockConfirmPacketList)
- {
- foundRequest = needConfirmPacketList.Where(x => x.SentUniqueId == analysisResult.UUID).FirstOrDefault();
- }
- if (foundRequest != null && foundRequest.Id > 0)
- {
- RemoveWaitingMsg(foundRequest, ConfirmWaitingMessageRemoveReason.Confirmed, analysisResult);
- foundRequest.SentTimes = 0;
- foundRequest.SentInterval = 0;
- analysisResult.RequestId = foundRequest.RequestId;
- try
- {
- using (var db = await maindbContextFactory.CreateDbContextAsync())
- {
- var sc = await db.ServerMessage.Where(x => x.Id == foundRequest.Id).FirstOrDefaultAsync();
- sc.InMessage = JsonConvert.SerializeObject(analysisResult.Message, Formatting.None);
- sc.ReceivedOn = DateTime.UtcNow;
- await db.SaveChangesAsync();
- // Console.WriteLine(string.Format("Now:{0} ServerMessage Id:{1} ", DateTime.UtcNow.ToString("yyyy/MM/dd HH:mm:ss"), foundRequest.Id));
- }
- }
- catch (Exception ex)
- {
- logger.LogWarning(string.Format("TryConfirmMessage:{0}", JsonConvert.SerializeObject(analysisResult)));
- logger.LogWarning(ex.ToString());
- }
- confirmed = true;
- }
- else if (analysisResult.Action == Actions.TriggerMessage.ToString())
- {
- confirmed = true;
- }
- else
- {
- logger.LogError(string.Format("Received no record Action:{0} MessageId:{1} ", analysisResult.Action, analysisResult.UUID));
- }
- return confirmed;
- }
- internal void SignalMessageSended(NeedConfirmMessage resendItem)
- {
- var dateTimeNow = DateTime.UtcNow;
- resendItem.SentTimes--;
- resendItem.SentOn = dateTimeNow;
- }
- internal void RemoveExpiredConfirmMessage()
- {
- var before10Mins = DateTime.UtcNow.AddMinutes(-10);
- var removeList = needConfirmPacketList.Where(x => x.SentTimes == 0 || x.SentOn < before10Mins).ToList();
- foreach (var item in removeList)
- {
- RemoveWaitingMsg(item, ConfirmWaitingMessageRemoveReason.HitRetryLimit);
- }
- }
- internal async Task<object> SendAndWaitUntilResultAsync(Func<Task<string>> startSendTaskFunc, CancellationToken token = default)
- {
- object message;
- do
- {
- var requestId = await startSendTaskFunc();
- message = await WaitResultAsync(requestId, token: token);
- }
- while (message == null && !token.IsCancellationRequested);
- return message;
- }
- internal Task<object> SendAndWaitUntilResultAsync(Func<string, Task> startSendTaskFunc, int maxWaitSec = 180_000, CancellationToken token = default)
- {
- Func<string, CancellationToken, Task> func = (string result, CancellationToken token) => startSendTaskFunc(result);
- return SendAndWaitUntilResultAsync(func, maxWaitSec: maxWaitSec , token: token);
- }
- internal async Task<object> SendAndWaitUntilResultAsync(Func<string, CancellationToken, Task> startSendTaskFunc, int maxWaitSec = 180_000, CancellationToken token = default)
- {
- object message;
- do
- {
- message = await SendAndWaitResultAsync(startSendTaskFunc, maxWaitSec: maxWaitSec, token: token);
- }
- while (message == null && !token.IsCancellationRequested);
- return message;
- }
- internal Task<object> SendAndWaitResultAsync(Func<string, Task> startSendTaskFunc, int maxWaitSec = 1_000, CancellationToken token = default)
- {
- Func<string, CancellationToken, Task> func = (string result, CancellationToken token) => startSendTaskFunc(result);
- return SendAndWaitResultAsync(func, maxWaitSec: maxWaitSec, token: token);
- }
- internal async Task<object> SendAndWaitResultAsync(Func<string, CancellationToken, Task> startSendTaskFunc, int maxWaitSec = 1_000, CancellationToken token = default)
- {
- object message;
- var SerialNo = Guid.NewGuid().ToString();
- var waitObject = CreateAndAddWaitObject(SerialNo);
- await startSendTaskFunc(SerialNo, token);
- message = await WaitResultAsync(SerialNo, waitObject: waitObject, maxWaitSec: maxWaitSec, token: token);
- return message;
- }
- internal async Task<object> WaitResultAsync(string serialNo, MessageResultWaitObject waitObject = null, int maxWaitSec = 180_000, CancellationToken token = default)
- {
- if (waitObject == null)
- {
- waitObject = new MessageResultWaitObject();
- asyncWaitingTasks.TryAdd(serialNo, waitObject);
- }
- var task = waitObject.Lock.WaitAsync(token);
- var completedTask = await Task.WhenAny(task, Task.Delay(maxWaitSec, token));
- if (completedTask != task)
- {
- logger.LogWarning("wait {msg} time out", serialNo);
- }
- asyncWaitingTasks.Remove(serialNo, out var _);
- return waitObject.Result?.Message;
- }
- private MessageResultWaitObject CreateAndAddWaitObject(string serialNo)
- {
- var waiObj = new MessageResultWaitObject();
- asyncWaitingTasks.TryAdd(serialNo, waiObj);
- return waiObj;
- }
- private void RemoveWaitingMsg(
- NeedConfirmMessage msg,
- ConfirmWaitingMessageRemoveReason reason,
- MessageResult response = null)
- {
- lock (_lockConfirmPacketList)
- {
- needConfirmPacketList.Remove(msg);
- }
- if (asyncWaitingTasks.Keys.Contains(msg.RequestId))
- {
- asyncWaitingTasks[msg.RequestId].Result = response;
- asyncWaitingTasks[msg.RequestId].Lock.Release();
- }
- OnMessageRemoved?.Invoke(this, reason);
- }
- }
- internal class MessageResultWaitObject
- {
- public MessageResult Result { get; set; } = null;
- public SemaphoreSlim Lock { get; set; } = new SemaphoreSlim(0);
- }
- }
|