MainDbService.cs 16 KB


  1. using EVCB_OCPP.Domain;
  2. using EVCB_OCPP.Domain.Models.Database;
  3. using EVCB_OCPP.Packet.Messages.Core;
  4. using EVCB_OCPP.WSServer.Helper;
  5. using Microsoft.Data.SqlClient;
  6. using Microsoft.EntityFrameworkCore;
  7. using Microsoft.Extensions.Caching.Memory;
  8. using Microsoft.Extensions.Configuration;
  9. using Microsoft.Extensions.DependencyInjection;
  10. using Microsoft.Extensions.Logging;
  11. using MongoDB.Driver.Core.Connections;
  12. using Newtonsoft.Json;
  13. using OCPPPackage.Profiles;
  14. using System;
  15. using System.Collections.Generic;
  16. using System.Data;
  17. using System.Linq;
  18. using System.Text;
  19. using System.Threading;
  20. using System.Threading.Tasks;
  21. namespace EVCB_OCPP.WSServer.Service;
  22. public interface IMainDbService
  23. {
  24. Task<string> GetMachineAuthorizationKey(string ChargeBoxId);
  25. Task<string> GetMachineConfiguration(string ChargeBoxId, string configName);
  26. Task<string> GetMachineHeartbeatInterval(string ChargeBoxId);
  27. Task<MachineAndCustomerInfo> GetMachineIdAndCustomerInfo(string ChargeBoxId);
  28. Task<string> GetMachineSecurityProfile(string ChargeBoxId);
  29. Task UpdateMachineBasicInfo(string ChargeBoxId, Machine machine);
  30. Task AddOCMF(OCMF oCMF);
  31. ValueTask<ConnectorStatus> GetConnectorStatus(string ChargeBoxId, int ConnectorId);
  32. Task UpdateConnectorStatus(string Id, ConnectorStatus connectorStatus);
  33. Task AddServerMessage(ServerMessage message);
  34. Task AddServerMessage(string ChargeBoxId, string OutAction, object OutRequest, string CreatedBy = "", DateTime? CreatedOn = null, string SerialNo = "", string InMessage = "");
  35. }
  36. public class MainDbService : IMainDbService
  37. {
  38. public MainDbService(
  39. IDbContextFactory<MainDBContext> contextFactory,
  40. IMemoryCache memoryCache,
  41. IConfiguration configuration,
  42. ILoggerFactory loggerFactory)
  43. {
  44. this.contextFactory = contextFactory;
  45. this.memoryCache = memoryCache;
  46. this.loggerFactory = loggerFactory;
  47. var startupLimit = GetStartupLimit(configuration);
  48. this.connectionString = configuration.GetConnectionString("MainDBContext");
  49. this.startupSemaphore = new (startupLimit);
  50. var opLimit = GetOpLimit(configuration);
  51. this.opSemaphore = new SemaphoreSlim(opLimit);
  52. InitUpdateConnectorStatusHandler();
  53. InitUpdateMachineBasicInfoHandler();
  54. InitAddServerMessageHandler();
  55. }
  56. private readonly IDbContextFactory<MainDBContext> contextFactory;
  57. private readonly IMemoryCache memoryCache;
  58. private readonly ILoggerFactory loggerFactory;
  59. private string connectionString;
  60. private readonly QueueSemaphore startupSemaphore;
  61. private readonly SemaphoreSlim opSemaphore;
  62. private GroupSingleHandler<StatusNotificationParam> statusNotificationHandler;
  63. private GroupSingleHandler<UpdateMachineBasicInfoParam> updateMachineBasicInfoHandler;
  64. private GroupSingleHandler<ServerMessage> addServerMessageHandler;
  65. public async Task<MachineAndCustomerInfo> GetMachineIdAndCustomerInfo(string ChargeBoxId)
  66. {
  67. using var semaphoreWrapper = await startupSemaphore.GetToken();
  68. using var db = contextFactory.CreateDbContext();
  69. var machine = await db.Machine.Where(x => x.ChargeBoxId == ChargeBoxId && x.IsDelete == false).Select(x => new { x.CustomerId, x.Id }).AsNoTracking().FirstOrDefaultAsync();
  70. if (machine == null)
  71. {
  72. return new MachineAndCustomerInfo(string.Empty, Guid.Empty, "Unknown");
  73. }
  74. var customerName = await db.Customer.Where(x => x.Id == machine.CustomerId).Select(x => x.Name).FirstOrDefaultAsync();
  75. return new MachineAndCustomerInfo(machine.Id, machine.CustomerId, customerName);
  76. }
  77. public async Task<string> GetMachineConfiguration(string ChargeBoxId, string configName)
  78. {
  79. using var semaphoreWrapper = await startupSemaphore.GetToken();
  80. using var db = contextFactory.CreateDbContext();
  81. return await db.MachineConfigurations
  82. .Where(x => x.ChargeBoxId == ChargeBoxId && x.ConfigureName == configName)
  83. .Select(x => x.ConfigureSetting).FirstOrDefaultAsync();
  84. }
  85. public async Task<string> GetMachineSecurityProfile(string ChargeBoxId)
  86. {
  87. return await GetMachineConfiguration(ChargeBoxId, StandardConfiguration.SecurityProfile);
  88. }
  89. public async Task<string> GetMachineAuthorizationKey(string ChargeBoxId)
  90. {
  91. return await GetMachineConfiguration(ChargeBoxId, StandardConfiguration.AuthorizationKey);
  92. }
  93. public async Task<string> GetMachineHeartbeatInterval(string ChargeBoxId)
  94. {
  95. return await GetMachineConfiguration(ChargeBoxId, StandardConfiguration.HeartbeatInterval);
  96. }
  97. public async Task UpdateMachineBasicInfo(string ChargeBoxId, Machine machine)
  98. {
  99. //using var semaphoreWrapper = await startupSemaphore.GetToken();
  100. //using var db = await contextFactory.CreateDbContextAsync();
  101. //var _machine = db.Machine.FirstOrDefault(x => x.ChargeBoxId == ChargeBoxId);
  102. //_machine.ChargeBoxSerialNumber = machine.ChargeBoxSerialNumber;
  103. //_machine.ChargePointSerialNumber = machine.ChargePointSerialNumber;
  104. //_machine.ChargePointModel = machine.ChargePointModel;
  105. //_machine.ChargePointVendor = machine.ChargePointVendor;
  106. //_machine.FW_CurrentVersion = machine.FW_CurrentVersion;
  107. //_machine.Iccid = DateTime.UtcNow.ToString("yy-MM-dd HH:mm");
  108. //_machine.Imsi = machine.Imsi;
  109. //_machine.MeterSerialNumber = machine.MeterSerialNumber;
  110. //_machine.MeterType = machine.MeterType;
  111. //await db.SaveChangesAsync();
  112. //using var semaphoreWrapper = await startupSemaphore.GetToken();
  113. await updateMachineBasicInfoHandler.HandleAsync(new UpdateMachineBasicInfoParam(ChargeBoxId, machine));
  114. }
  115. public async Task AddOCMF(OCMF oCMF)
  116. {
  117. using var db = contextFactory.CreateDbContext();
  118. db.OCMF.Add(oCMF);
  119. await db.SaveChangesAsync();
  120. }
  121. public async ValueTask<ConnectorStatus> GetConnectorStatus(string ChargeBoxId, int ConnectorId)
  122. {
  123. var key = $"{ChargeBoxId}{ConnectorId}";
  124. if (memoryCache.TryGetValue<ConnectorStatus>(key, out var status))
  125. {
  126. return status;
  127. }
  128. using var db = contextFactory.CreateDbContext();
  129. var statusFromDb = await db.ConnectorStatus.Where(x => x.ChargeBoxId == ChargeBoxId
  130. && x.ConnectorId == ConnectorId).AsNoTracking().FirstOrDefaultAsync();
  131. memoryCache.Set(key, statusFromDb);
  132. return statusFromDb;
  133. }
  134. public async Task UpdateConnectorStatus(string Id, ConnectorStatus Status)
  135. {
  136. //using var db = await contextFactory.CreateDbContextAsync();
  137. //ConnectorStatus status = new() { Id = Id };
  138. //db.ChangeTracker.AutoDetectChangesEnabled = false;
  139. //db.ConnectorStatus.Attach(status);
  140. //status.CreatedOn = Status.CreatedOn;
  141. //status.Status = Status.Status;
  142. //status.ChargePointErrorCodeId = Status.ChargePointErrorCodeId;
  143. //status.ErrorInfo = Status.ErrorInfo;
  144. //status.VendorId = Status.VendorId;
  145. //status.VendorErrorCode = Status.VendorErrorCode;
  146. //db.Entry(status).Property(x => x.CreatedOn).IsModified = true;
  147. //db.Entry(status).Property(x => x.Status).IsModified = true;
  148. //db.Entry(status).Property(x => x.ChargePointErrorCodeId).IsModified = true;
  149. //db.Entry(status).Property(x => x.ErrorInfo).IsModified = true;
  150. //db.Entry(status).Property(x => x.VendorId).IsModified = true;
  151. //db.Entry(status).Property(x => x.VendorErrorCode).IsModified = true;
  152. //await db.SaveChangesAsync();
  153. await statusNotificationHandler.HandleAsync(new StatusNotificationParam(Id, Status));
  154. var key = $"{Status.ChargeBoxId}{Status.ConnectorId}";
  155. if (memoryCache.TryGetValue<ConnectorStatus>(key, out _))
  156. {
  157. memoryCache.Remove(key);
  158. }
  159. memoryCache.Set(key, Status);
  160. return;
  161. }
  162. public Task AddServerMessage(string ChargeBoxId, string OutAction, object OutRequest, string CreatedBy, DateTime? CreatedOn = null, string SerialNo = "", string InMessage = "")
  163. {
  164. if (string.IsNullOrEmpty(CreatedBy))
  165. {
  166. CreatedBy = "Server";
  167. }
  168. if (string.IsNullOrEmpty(SerialNo))
  169. {
  170. SerialNo = Guid.NewGuid().ToString();
  171. }
  172. var _CreatedOn = CreatedOn ?? DateTime.UtcNow;
  173. string _OutRequest = "";
  174. if (OutRequest is not null)
  175. {
  176. _OutRequest = JsonConvert.SerializeObject(
  177. OutRequest,
  178. new JsonSerializerSettings()
  179. {
  180. NullValueHandling = NullValueHandling.Ignore,
  181. Formatting = Formatting.None
  182. });
  183. }
  184. return AddServerMessage(new ServerMessage()
  185. {
  186. ChargeBoxId = ChargeBoxId,
  187. CreatedBy = CreatedBy,
  188. CreatedOn = _CreatedOn,
  189. OutAction = OutAction,
  190. OutRequest = _OutRequest,
  191. SerialNo = SerialNo,
  192. InMessage = InMessage
  193. });
  194. }
  195. public Task AddServerMessage(ServerMessage message)
  196. {
  197. return addServerMessageHandler.HandleAsync(message);
  198. }
  199. private void InitUpdateMachineBasicInfoHandler()
  200. {
  201. if (updateMachineBasicInfoHandler is not null)
  202. {
  203. throw new Exception($"{nameof(InitUpdateMachineBasicInfoHandler)} should only called once");
  204. }
  205. updateMachineBasicInfoHandler = new GroupSingleHandler<UpdateMachineBasicInfoParam>(
  206. handleFunc: BundelUpdateMachineBasicInfo,
  207. logger: loggerFactory.CreateLogger("UpdateMachineBasicInfoHandler"),
  208. workerCnt: 10);
  209. }
  210. private async Task BundelUpdateMachineBasicInfo(IEnumerable<UpdateMachineBasicInfoParam> pams)
  211. {
  212. using var db = await contextFactory.CreateDbContextAsync();
  213. using var trans = await db.Database.BeginTransactionAsync();
  214. pams = pams.DistinctBy(x => x.ChargeBoxId);
  215. foreach (var pam in pams)
  216. {
  217. var _machine = db.Machine.FirstOrDefault(x => x.ChargeBoxId == pam.ChargeBoxId);
  218. _machine.ChargeBoxSerialNumber = pam.machine.ChargeBoxSerialNumber;
  219. _machine.ChargePointSerialNumber = pam.machine.ChargePointSerialNumber;
  220. _machine.ChargePointModel = pam.machine.ChargePointModel;
  221. _machine.ChargePointVendor = pam.machine.ChargePointVendor;
  222. _machine.FW_CurrentVersion = pam.machine.FW_CurrentVersion;
  223. _machine.Iccid = DateTime.UtcNow.ToString("yy-MM-dd HH:mm");
  224. _machine.Imsi = pam.machine.Imsi;
  225. _machine.MeterSerialNumber = pam.machine.MeterSerialNumber;
  226. _machine.MeterType = pam.machine.MeterType;
  227. }
  228. db.SaveChanges();
  229. trans.Commit();
  230. }
  231. private void InitUpdateConnectorStatusHandler()
  232. {
  233. if (statusNotificationHandler is not null)
  234. {
  235. throw new Exception($"{nameof(InitUpdateConnectorStatusHandler)} should only called once");
  236. }
  237. statusNotificationHandler = new GroupSingleHandler<StatusNotificationParam>(
  238. handleFunc: BundleUpdateConnectorStatus,
  239. logger: loggerFactory.CreateLogger("StatusNotificationHandler"),
  240. workerCnt: 10);
  241. }
  242. private async Task BundleUpdateConnectorStatus(IEnumerable<StatusNotificationParam> statusNotifications)
  243. {
  244. using var db = await contextFactory.CreateDbContextAsync();
  245. using var trans = await db.Database.BeginTransactionAsync();
  246. statusNotifications = statusNotifications.OrderBy(x => x.Status.CreatedOn).DistinctBy(x => x.Id);
  247. foreach (var param in statusNotifications)
  248. {
  249. ConnectorStatus status = new() { Id = param.Id };
  250. //db.ChangeTracker.AutoDetectChangesEnabled = false;
  251. db.ConnectorStatus.Attach(status);
  252. status.CreatedOn = param.Status.CreatedOn;
  253. status.Status = param.Status.Status;
  254. status.ChargePointErrorCodeId = param.Status.ChargePointErrorCodeId;
  255. status.ErrorInfo = param.Status.ErrorInfo;
  256. status.VendorId = param.Status.VendorId;
  257. status.VendorErrorCode = param.Status.VendorErrorCode;
  258. db.Entry(status).Property(x => x.CreatedOn).IsModified = true;
  259. db.Entry(status).Property(x => x.Status).IsModified = true;
  260. db.Entry(status).Property(x => x.ChargePointErrorCodeId).IsModified = true;
  261. db.Entry(status).Property(x => x.ErrorInfo).IsModified = true;
  262. db.Entry(status).Property(x => x.VendorId).IsModified = true;
  263. db.Entry(status).Property(x => x.VendorErrorCode).IsModified = true;
  264. //db.SaveChanges();
  265. }
  266. db.SaveChanges();
  267. trans.Commit();
  268. db.ChangeTracker.Clear();
  269. }
  270. private void InitAddServerMessageHandler()
  271. {
  272. if (addServerMessageHandler is not null)
  273. {
  274. throw new Exception($"{nameof(InitAddServerMessageHandler)} should only called once");
  275. }
  276. addServerMessageHandler = new GroupSingleHandler<ServerMessage>(
  277. handleFunc: BulkInsertServerMessage,
  278. logger: loggerFactory.CreateLogger("AddServerMessageHandler"));
  279. }
  280. private async Task BundleAddServerMessage(IEnumerable<ServerMessage> messages)
  281. {
  282. using var db = await contextFactory.CreateDbContextAsync();
  283. using var trans = await db.Database.BeginTransactionAsync();
  284. foreach (var message in messages)
  285. {
  286. db.ServerMessage.Add(message);
  287. }
  288. db.SaveChanges();
  289. trans.Commit();
  290. db.ChangeTracker.Clear();
  291. }
  292. private Task BulkInsertServerMessage(IEnumerable<ServerMessage> messages)
  293. {
  294. var table = new DataTable();
  295. table.Columns.Add("ChargeBoxId");
  296. table.Columns.Add("SerialNo");
  297. table.Columns.Add("OutAction");
  298. table.Columns.Add("OutRequest");
  299. table.Columns.Add("InMessage");
  300. table.Columns.Add("CreatedOn");
  301. table.Columns.Add("CreatedBy");
  302. table.Columns.Add("UpdatedOn");
  303. table.Columns.Add("ReceivedOn");
  304. foreach (var param in messages)
  305. {
  306. var row = table.NewRow();
  307. row["ChargeBoxId"] = param.ChargeBoxId;
  308. row["SerialNo"] = param.SerialNo;
  309. row["OutAction"] = param.OutAction;
  310. row["OutRequest"] = param.OutRequest;
  311. row["InMessage"] = param.InMessage;
  312. row["CreatedOn"] = param.CreatedOn;
  313. row["CreatedBy"] = param.CreatedBy;
  314. row["UpdatedOn"] = param.UpdatedOn;
  315. row["ReceivedOn"] = param.ReceivedOn;
  316. table.Rows.Add(row);
  317. }
  318. using SqlConnection sqlConnection = new SqlConnection(connectionString);
  319. sqlConnection.Open();
  320. using SqlBulkCopy sqlBulkCopy = new SqlBulkCopy(sqlConnection);
  321. sqlBulkCopy.BatchSize = messages.Count();
  322. sqlBulkCopy.DestinationTableName = "ServerMessage";
  323. sqlBulkCopy.ColumnMappings.Add("ChargeBoxId", "ChargeBoxId");
  324. sqlBulkCopy.ColumnMappings.Add("SerialNo", "SerialNo");
  325. sqlBulkCopy.ColumnMappings.Add("OutAction", "OutAction");
  326. sqlBulkCopy.ColumnMappings.Add("OutRequest", "OutRequest");
  327. sqlBulkCopy.ColumnMappings.Add("InMessage", "InMessage");
  328. sqlBulkCopy.ColumnMappings.Add("CreatedOn", "CreatedOn");
  329. sqlBulkCopy.ColumnMappings.Add("CreatedBy", "CreatedBy");
  330. sqlBulkCopy.ColumnMappings.Add("UpdatedOn", "UpdatedOn");
  331. sqlBulkCopy.ColumnMappings.Add("ReceivedOn", "ReceivedOn");
  332. return sqlBulkCopy.WriteToServerAsync(table);
  333. }
  334. private int GetStartupLimit(IConfiguration configuration)
  335. {
  336. var limitConfig = configuration["MainDbStartupLimit"];
  337. int limit = 5;
  338. if (limitConfig != default)
  339. {
  340. int.TryParse(limitConfig, out limit);
  341. }
  342. return limit;
  343. }
  344. private int GetOpLimit(IConfiguration configuration)
  345. {
  346. var limitConfig = configuration["MainDbOpLimit"];
  347. int limit = 500;
  348. if (limitConfig != default)
  349. {
  350. int.TryParse(limitConfig, out limit);
  351. }
  352. return limit;
  353. }
  354. }
  355. public record MachineAndCustomerInfo (string MachineId, Guid CustomerId, string CustomerName);
  356. public record StatusNotificationParam(string Id, ConnectorStatus Status);
  357. public record UpdateMachineBasicInfoParam(string ChargeBoxId, Machine machine);