using Dapper; using EVCB_OCPP.Domain; using EVCB_OCPP.Domain.ConnectionFactory; using EVCB_OCPP.WSServer.Helper; using EVCB_OCPP.WSServer.Service.WsService; using log4net; using Microsoft.Data.SqlClient; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using NLog.Fluent; using System; using System.Collections.Generic; using System.Data; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading.Tasks; using static System.Runtime.InteropServices.JavaScript.JSType; namespace EVCB_OCPP.WSServer.Service; public interface IConnectionLogdbService { void WarmUpLog(); void WriteMachineLog(WsClientData clientData, string data, string messageType, string errorMsg = "", bool isSent = false); } public class ConnectionLogdbService : IConnectionLogdbService { public const string LimitConfigKey = "ConnectionLogDbLimit"; public ConnectionLogdbService( IDbContextFactory connectionLogdbContextFactory, ISqlConnectionFactory sqlConnectionFactory, ILogger logger, IConfiguration configuration) { this.connectionLogdbContextFactory = connectionLogdbContextFactory; this.sqlConnectionFactory = sqlConnectionFactory; this.logger = logger; //connectionLogdbConnectionString = configuration.GetConnectionString("MeterValueDBContext"); var opLimit = GetLimit(configuration); this.queueHandler = new(WriteMachineLogEF, opLimit); InitInsertConnectonLogHandler(); } private readonly IDbContextFactory connectionLogdbContextFactory; private readonly ISqlConnectionFactory sqlConnectionFactory; private readonly ILogger logger; private readonly QueueHandler queueHandler; //private readonly string connectionLogdbConnectionString; private readonly Queue _existTables = new(); private GroupHandler insertConnectonLogHandler; public void WarmUpLog() { try { using (var log = connectionLogdbContextFactory.CreateDbContext()) { log.MachineConnectionLogs.ToList(); } } catch (Exception ex) { logger.LogError(ex.Message); logger.LogError(ex.StackTrace); //Console.WriteLine(ex.ToString()); } } public void WriteMachineLog(WsClientData clientData, string data, string messageType, string errorMsg = "", bool isSent = false) { var log = new MachineLog(clientData, data, messageType, errorMsg, isSent); //queueHandler.Enqueue(log); //_ = WriteMachineLogEF(log); insertConnectonLogHandler.HandleAsync(log); //_ = InsertWithDapper(log); } private async Task InsertWithDapper(MachineLog log) { var watch = Stopwatch.StartNew(); long t0, t1, t2, t3; var workTime = DateTime.UtcNow; if (!await GetTableExist(workTime)) { t0 = watch.ElapsedMilliseconds; await WriteMachineLogEF(log); watch.Stop(); t1 = watch.ElapsedMilliseconds; if (t1 > 500) { logger.LogWarning("ConnectionLog InsertWithDapper {0}/{1}", t0, t1); } return; } t0 = watch.ElapsedMilliseconds; var tableName = GetTableName(workTime); string command = $""" INSERT INTO {tableName} (CreatedOn, ChargeBoxId, MessageType, Data, Msg, IsSent, EVSEEndPoint, Session) VALUES (@CreatedOn, @ChargeBoxId, @MessageType, @Data, @Msg, @IsSent, @EVSEEndPoint, @Session); """; var parameters = new DynamicParameters(); parameters.Add("CreatedOn", workTime, DbType.DateTime); parameters.Add("ChargeBoxId", log.clientData.ChargeBoxId == null ? "unknown" : log.clientData.ChargeBoxId.Replace("'", "''"), DbType.String, size:50); ; parameters.Add("MessageType", log.messageType.Replace("'", "''"), DbType.String, size: 50); parameters.Add("Data", log.data.Replace("'", "''"), DbType.String); parameters.Add("Msg", log.errorMsg.Replace("'", "''"), DbType.String, size: 200); parameters.Add("IsSent", log.isSent, DbType.Boolean); parameters.Add("EVSEEndPoint", log.clientData.Endpoint == null ? "123" : log.clientData.Endpoint.ToString(), DbType.String, size: 25); parameters.Add("Session", log.clientData.SessionID == null ? "123" : log.clientData.SessionID, DbType.String, size: 36); t1 = watch.ElapsedMilliseconds; using var sqlConnection = await sqlConnectionFactory.CreateAsync(); t2 = watch.ElapsedMilliseconds; await sqlConnection.ExecuteAsync(command, parameters); watch.Stop(); t3 = watch.ElapsedMilliseconds; if (t3 > 1000) { logger.LogWarning("ConnectionLog Dapper {0}/{1}/{2}/{3}", t0, t1, t2, t3); } } private void InitInsertConnectonLogHandler() { if (insertConnectonLogHandler is not null) { throw new Exception($"{nameof(InitInsertConnectonLogHandler)} should only called once"); } insertConnectonLogHandler = new GroupHandler( //BulkInsertWithBulkCopy, BundleInsertWithDapper, //loggerFactory.CreateLogger("InsertMeterValueHandler") logger, workerCnt: 20 ); } private async Task WriteMachineLogEF(MachineLog log) { var watcher = Stopwatch.StartNew(); try { if (log.clientData == null || string.IsNullOrEmpty(log.data)) return; if (log.clientData.ChargeBoxId == null) { logger.LogCritical(log.clientData.Path.ToString() + "]********************session ChargeBoxId null sessionId=" + log.clientData.SessionID); } string sp = "[dbo].[uspInsertMachineConnectionLog] @CreatedOn," + "@ChargeBoxId,@MessageType,@Data,@Msg,@IsSent,@EVSEEndPoint,@Session"; var dd = DateTime.UtcNow; SqlParameter[] parameter = { new SqlParameter("CreatedOn", SqlDbType.DateTime){ Value = dd }, new SqlParameter("ChargeBoxId", SqlDbType.NVarChar, 50){ Value= log.clientData.ChargeBoxId==null?"unknown":log.clientData.ChargeBoxId.Replace("'","''") }, new SqlParameter("MessageType", SqlDbType.NVarChar , 50){ Value = log.messageType.Replace("'","''")}, new SqlParameter("Data", SqlDbType.NVarChar, -1) { Value = log.data.Replace("'", "''") }, new SqlParameter("Msg", SqlDbType.NVarChar, 200) { Value = log.errorMsg.Replace("'", "''") }, new SqlParameter("IsSent", SqlDbType.Bit) { Value = log.isSent }, new SqlParameter("EVSEEndPoint", SqlDbType.NVarChar, 25) { Value = log.clientData.Endpoint == null ? "123" : log.clientData.Endpoint.ToString() }, new SqlParameter("Session", SqlDbType.NVarChar, 36) { Value = log.clientData.SessionID == null ? "123" : log.clientData.SessionID } }; using (var db = await connectionLogdbContextFactory.CreateDbContextAsync()) { await db.Database.ExecuteSqlRawAsync(sp, parameter); } } catch (Exception ex) { logger.LogError(ex.ToString()); } watcher.Stop(); if (watcher.ElapsedMilliseconds > 1000) { logger.LogWarning("WriteMachineLog too long {0}", watcher.ElapsedMilliseconds); } } private async Task BundleInsertWithDapper(BundleHandlerData bundleHandlerData) { var watch = Stopwatch.StartNew(); long t0, t1, t2, t3, t4; var workTime = DateTime.UtcNow; var parmsList = bundleHandlerData.Datas.ToList(); if (parmsList.Count == 0) { return; } var candidate = parmsList[0]; if (!await GetTableExist(workTime)) { t0 = watch.ElapsedMilliseconds; await WriteMachineLogEF(candidate); watch.Stop(); t1 = watch.ElapsedMilliseconds; if (t1 > 500) { logger.LogWarning("ConnectionLog InsertWithDapper {0}/{1}", t0, t1); } parmsList.Remove(candidate); bundleHandlerData.AddCompletedData(candidate); } t0 = watch.ElapsedMilliseconds; t1 = watch.ElapsedMilliseconds; using SqlConnection sqlConnection = await sqlConnectionFactory.CreateAsync(); //using var trans = await sqlConnection.BeginTransactionAsync(); t2 = watch.ElapsedMilliseconds; var tableName = GetTableName(workTime); string command = $""" INSERT INTO {tableName} (CreatedOn, ChargeBoxId, MessageType, Data, Msg, IsSent, EVSEEndPoint, Session) VALUES (@CreatedOn, @ChargeBoxId, @MessageType, @Data, @Msg, @IsSent, @EVSEEndPoint, @Session); """; foreach (var log in parmsList) { var parameters = new DynamicParameters(); parameters.Add("CreatedOn", workTime, DbType.DateTime); parameters.Add("ChargeBoxId", log.clientData.ChargeBoxId == null ? "unknown" : log.clientData.ChargeBoxId.Replace("'", "''"), DbType.String, size: 50); ; parameters.Add("MessageType", log.messageType.Replace("'", "''"), DbType.String, size: 50); parameters.Add("Data", log.data.Replace("'", "''"), DbType.String); parameters.Add("Msg", log.errorMsg.Replace("'", "''"), DbType.String, size: 200); parameters.Add("IsSent", log.isSent, DbType.Boolean); parameters.Add("EVSEEndPoint", log.clientData.Endpoint == null ? "123" : log.clientData.Endpoint.ToString(), DbType.String, size: 25); parameters.Add("Session", log.clientData.SessionID == null ? "123" : log.clientData.SessionID, DbType.String, size: 36); await sqlConnection.ExecuteAsync(command, parameters //, trans ); bundleHandlerData.AddCompletedData(log); } t3 = watch.ElapsedMilliseconds; //await trans.CommitAsync(); watch.Stop(); t4 = watch.ElapsedMilliseconds; if (t4 > 1000) { logger.LogWarning("MachineLog Bundle Dapper {0}/{1}/{2}/{3}/{4}/{5}", t0, t1, t2, t3, t4, bundleHandlerData.Datas.Count()); } } private async Task BulkInsertWithBulkCopy(IEnumerable parms) { var watcher = Stopwatch.StartNew(); long t0 = 0, t1 = 0, t2 = 0, t3 = 0, t4 = 0; var parmsList = parms.ToList(); if (parmsList.Count == 0) { return; } var workTime = DateTime.UtcNow; if (!await GetTableExist(workTime)) { var candidate = parmsList.First(); await WriteMachineLogEF(candidate); parmsList.Remove(candidate); } t0 = watcher.ElapsedMilliseconds; var table = new DataTable(); table.Columns.Add("CreatedOn"); table.Columns.Add("ChargeBoxId"); table.Columns.Add("MessageType"); table.Columns.Add("Data"); table.Columns.Add("Msg"); table.Columns.Add("IsSent"); table.Columns.Add("EVSEEndPoint"); table.Columns.Add("Session"); foreach (var param in parmsList) { var row = table.NewRow(); row["CreatedOn"] = workTime; row["ChargeBoxId"] = param.clientData.ChargeBoxId == null ? "unknown" : param.clientData.ChargeBoxId.Replace("'", "''"); row["MessageType"] = param.messageType.Replace("'", "''"); row["Data"] = param.data.Replace("'", "''"); row["Msg"] = param.errorMsg.Replace("'", "''"); row["IsSent"] = param.isSent; row["EVSEEndPoint"] = param.clientData.Endpoint == null ? "123" : param.clientData.Endpoint.ToString(); row["Session"] = param.clientData.SessionID == null ? "123" : param.clientData.SessionID; table.Rows.Add(row); } t1 = watcher.ElapsedMilliseconds; using SqlConnection sqlConnection = await sqlConnectionFactory.CreateAsync(); using SqlBulkCopy sqlBulkCopy = new SqlBulkCopy(sqlConnection as SqlConnection); t2 = watcher.ElapsedMilliseconds; sqlBulkCopy.BatchSize = parmsList.Count(); sqlBulkCopy.DestinationTableName = GetTableName(workTime); sqlBulkCopy.ColumnMappings.Add("CreatedOn", "CreatedOn"); sqlBulkCopy.ColumnMappings.Add("ChargeBoxId", "ChargeBoxId"); sqlBulkCopy.ColumnMappings.Add("MessageType", "MessageType"); sqlBulkCopy.ColumnMappings.Add("Data", "Data"); sqlBulkCopy.ColumnMappings.Add("Msg", "Msg"); sqlBulkCopy.ColumnMappings.Add("IsSent", "IsSent"); sqlBulkCopy.ColumnMappings.Add("EVSEEndPoint", "EVSEEndPoint"); sqlBulkCopy.ColumnMappings.Add("Session", "Session"); t3 = watcher.ElapsedMilliseconds; await sqlBulkCopy.WriteToServerAsync(table); watcher.Stop(); t4 = watcher.ElapsedMilliseconds; if (t4 > 500) { logger.LogWarning("ConnectionLog BulkInsertWithBulkCopy Slow {0}/{1}/{2}/{3}/{4}/{5}", t0, t1, t2, t3, t4, parms.Count()); } } private async ValueTask GetTableExist(DateTime tableDateTime) { var tableName = GetTableName(tableDateTime); if (_existTables.Contains(tableName)) { return true; } FormattableString checkTableSql = $"SELECT Count(*) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = {tableName}"; using var db = await connectionLogdbContextFactory.CreateDbContextAsync(); var resultList = await db.Database.SqlQuery(checkTableSql)?.ToListAsync(); if (resultList is not null && resultList.Count > 0 && resultList[0] > 0) { _existTables.Enqueue(tableName); if (_existTables.Count > 30) { _existTables.TryDequeue(out _); } return true; } return false; } private static string GetTableName(DateTime dateTime) => $"MachineConnectionLog{dateTime:yyMMdd}"; private int GetLimit(IConfiguration configuration) { var limitConfig = configuration[LimitConfigKey]; int limit = 10; if (limitConfig != default) { int.TryParse(limitConfig, out limit); } return limit; } } internal record MachineLog(WsClientData clientData, string data, string messageType, string errorMsg, bool isSent);