123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384 |
- using Dapper;
- using EVCB_OCPP.Domain;
- using EVCB_OCPP.Domain.ConnectionFactory;
- using EVCB_OCPP.WSServer.Helper;
- using EVCB_OCPP.WSServer.Service.WsService;
- using log4net;
- using Microsoft.Data.SqlClient;
- using Microsoft.EntityFrameworkCore;
- using Microsoft.Extensions.Configuration;
- using Microsoft.Extensions.Logging;
- using NLog.Fluent;
- using System;
- using System.Collections.Generic;
- using System.Data;
- using System.Diagnostics;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
- using static System.Runtime.InteropServices.JavaScript.JSType;
- namespace EVCB_OCPP.WSServer.Service.DbService;
- public interface IConnectionLogdbService
- {
- void WarmUpLog();
- void WriteMachineLog(WsClientData clientData, string data, string messageType, string errorMsg = "", bool isSent = false);
- }
- public class ConnectionLogdbService : IConnectionLogdbService
- {
- public const string LimitConfigKey = "ConnectionLogDbLimit";
- public ConnectionLogdbService(
- IDbContextFactory<ConnectionLogDBContext> connectionLogdbContextFactory,
- ISqlConnectionFactory<ConnectionLogDBContext> sqlConnectionFactory,
- ILogger<ConnectionLogdbService> logger,
- IConfiguration configuration)
- {
- this.connectionLogdbContextFactory = connectionLogdbContextFactory;
- this.sqlConnectionFactory = sqlConnectionFactory;
- this.logger = logger;
- //connectionLogdbConnectionString = configuration.GetConnectionString("MeterValueDBContext");
- var opLimit = GetLimit(configuration);
- queueHandler = new(WriteMachineLogEF, opLimit);
- InitInsertConnectonLogHandler();
- }
- private readonly IDbContextFactory<ConnectionLogDBContext> connectionLogdbContextFactory;
- private readonly ISqlConnectionFactory<ConnectionLogDBContext> sqlConnectionFactory;
- private readonly ILogger<ConnectionLogdbService> logger;
- private readonly QueueHandler<MachineLog> queueHandler;
- //private readonly string connectionLogdbConnectionString;
- private readonly Queue<string> _existTables = new();
- private GroupHandler<MachineLog> insertConnectonLogHandler;
- public void WarmUpLog()
- {
- try
- {
- using (var log = connectionLogdbContextFactory.CreateDbContext())
- {
- log.MachineConnectionLog.ToList();
- }
- }
- catch (Exception ex)
- {
- logger.LogError(ex.Message);
- logger.LogError(ex.StackTrace);
- //Console.WriteLine(ex.ToString());
- }
- }
- public void WriteMachineLog(WsClientData clientData, string data, string messageType, string errorMsg = "", bool isSent = false)
- {
- var log = new MachineLog(clientData, data, messageType, errorMsg, isSent);
- //queueHandler.Enqueue(log);
- //_ = WriteMachineLogEF(log);
- insertConnectonLogHandler.HandleAsync(log);
- //_ = InsertWithDapper(log);
- }
- private async Task InsertWithDapper(MachineLog log)
- {
- var watch = Stopwatch.StartNew();
- long t0, t1, t2, t3;
- var workTime = DateTime.UtcNow;
- if (!await GetTableExist(workTime))
- {
- t0 = watch.ElapsedMilliseconds;
- await WriteMachineLogEF(log);
- watch.Stop();
- t1 = watch.ElapsedMilliseconds;
- if (t1 > 500)
- {
- logger.LogWarning("ConnectionLog InsertWithDapper {0}/{1}", t0, t1);
- }
- return;
- }
- t0 = watch.ElapsedMilliseconds;
- var tableName = GetTableName(workTime);
- string command = $"""
- INSERT INTO {tableName} (CreatedOn, ChargeBoxId, MessageType, Data, Msg, IsSent, EVSEEndPoint, Session)
- VALUES (@CreatedOn, @ChargeBoxId, @MessageType, @Data, @Msg, @IsSent, @EVSEEndPoint, @Session);
- """;
- var parameters = new DynamicParameters();
- parameters.Add("CreatedOn", workTime, DbType.DateTime);
- parameters.Add("ChargeBoxId", log.clientData.ChargeBoxId == null ? "unknown" : log.clientData.ChargeBoxId.Replace("'", "''"), DbType.String, size: 50); ;
- parameters.Add("MessageType", log.messageType.Replace("'", "''"), DbType.String, size: 50);
- parameters.Add("Data", log.data.Replace("'", "''"), DbType.String);
- parameters.Add("Msg", log.errorMsg.Replace("'", "''"), DbType.String, size: 200);
- parameters.Add("IsSent", log.isSent, DbType.Boolean);
- parameters.Add("EVSEEndPoint", log.clientData.Endpoint == null ? "123" : log.clientData.Endpoint.ToString(), DbType.String, size: 25);
- parameters.Add("Session", log.clientData.SessionID == null ? "123" : log.clientData.SessionID, DbType.String, size: 36);
- t1 = watch.ElapsedMilliseconds;
- using var sqlConnection = await sqlConnectionFactory.CreateAsync();
- t2 = watch.ElapsedMilliseconds;
- await sqlConnection.ExecuteAsync(command, parameters);
- watch.Stop();
- t3 = watch.ElapsedMilliseconds;
- if (t3 > 1000)
- {
- logger.LogWarning("ConnectionLog Dapper {0}/{1}/{2}/{3}", t0, t1, t2, t3);
- }
- }
- private void InitInsertConnectonLogHandler()
- {
- if (insertConnectonLogHandler is not null)
- {
- throw new Exception($"{nameof(InitInsertConnectonLogHandler)} should only called once");
- }
- insertConnectonLogHandler = new GroupHandler<MachineLog>(
- BundleInsertWithDapper,
- logger,
- workerCnt: 20
- );
- }
- private async Task WriteMachineLogEF(MachineLog log)
- {
- var watcher = Stopwatch.StartNew();
- try
- {
- if (log.clientData == null || string.IsNullOrEmpty(log.data)) return;
- if (log.clientData.ChargeBoxId == null)
- {
- logger.LogCritical(log.clientData.Path.ToString() + "]********************session ChargeBoxId null sessionId=" + log.clientData.SessionID);
- }
- string sp = "[dbo].[uspInsertMachineConnectionLog] @CreatedOn," +
- "@ChargeBoxId,@MessageType,@Data,@Msg,@IsSent,@EVSEEndPoint,@Session";
- var dd = DateTime.UtcNow;
- SqlParameter[] parameter =
- {
- new SqlParameter("CreatedOn", SqlDbType.DateTime){ Value = dd },
- new SqlParameter("ChargeBoxId", SqlDbType.NVarChar, 50){ Value= log.clientData.ChargeBoxId==null?"unknown":log.clientData.ChargeBoxId.Replace("'","''") },
- new SqlParameter("MessageType", SqlDbType.NVarChar , 50){ Value = log.messageType.Replace("'","''")},
- new SqlParameter("Data", SqlDbType.NVarChar, -1) { Value = log.data.Replace("'", "''") },
- new SqlParameter("Msg", SqlDbType.NVarChar, 200) { Value = log.errorMsg.Replace("'", "''") },
- new SqlParameter("IsSent", SqlDbType.Bit) { Value = log.isSent },
- new SqlParameter("EVSEEndPoint", SqlDbType.NVarChar, 25) { Value = log.clientData.Endpoint == null ? "123" : log.clientData.Endpoint.ToString() },
- new SqlParameter("Session", SqlDbType.NVarChar, 36) { Value = log.clientData.SessionID == null ? "123" : log.clientData.SessionID }
- };
- using (var db = await connectionLogdbContextFactory.CreateDbContextAsync())
- {
- await db.Database.ExecuteSqlRawAsync(sp, parameter);
- }
- }
- catch (Exception ex)
- {
- logger.LogError(ex.ToString());
- }
- watcher.Stop();
- if (watcher.ElapsedMilliseconds > 1000)
- {
- logger.LogWarning("WriteMachineLog too long {0}", watcher.ElapsedMilliseconds);
- }
- }
- private async Task BundleInsertWithDapper(BundleHandlerData<MachineLog> bundleHandlerData)
- {
- var watch = Stopwatch.StartNew();
- var times = new List<long>();
- var workTime = DateTime.UtcNow;
- var parmsList = bundleHandlerData.Datas.ToList();
- if (parmsList.Count == 0)
- {
- return;
- }
- var candidate = parmsList[0];
- if (!await GetTableExist(workTime))
- {
- times.Add(watch.ElapsedMilliseconds);
- await WriteMachineLogEF(candidate);
- times.Add(watch.ElapsedMilliseconds);
- if (watch.ElapsedMilliseconds > 500)
- {
- logger.LogWarning($"ConnectionLog InsertWithDapper {string.Join("/", times)}");
- }
- parmsList.Remove(candidate);
- bundleHandlerData.AddCompletedData(candidate);
- }
- times.Add(watch.ElapsedMilliseconds);
- using SqlConnection sqlConnection = await sqlConnectionFactory.CreateAsync();
- //using var trans = await sqlConnection.BeginTransactionAsync();
- times.Add(watch.ElapsedMilliseconds);
- var tableName = GetTableName(workTime);
- string withHourIndexCommand = $"""
- INSERT INTO {tableName} (CreatedOn, ChargeBoxId, MessageType, Data, Msg, IsSent, EVSEEndPoint, Session, HourIndex)
- VALUES (@CreatedOn, @ChargeBoxId, @MessageType, @Data, @Msg, @IsSent, @EVSEEndPoint, @Session, @HourIndex);
- """;
- string command = $"""
- INSERT INTO {tableName} (CreatedOn, ChargeBoxId, MessageType, Data, Msg, IsSent, EVSEEndPoint, Session)
- VALUES (@CreatedOn, @ChargeBoxId, @MessageType, @Data, @Msg, @IsSent, @EVSEEndPoint, @Session);
- """;
- foreach (var log in parmsList)
- {
- var parameters = new DynamicParameters();
- parameters.Add("CreatedOn", workTime, DbType.DateTime);
- parameters.Add("ChargeBoxId", log.clientData.ChargeBoxId == null ? "unknown" : log.clientData.ChargeBoxId.Replace("'", "''"), DbType.String, size: 50); ;
- parameters.Add("MessageType", log.messageType.Replace("'", "''"), DbType.String, size: 50);
- parameters.Add("Data", log.data.Replace("'", "''"), DbType.String);
- parameters.Add("Msg", log.errorMsg.Replace("'", "''"), DbType.String, size: 200);
- parameters.Add("IsSent", log.isSent, DbType.Boolean);
- parameters.Add("EVSEEndPoint", log.clientData.Endpoint == null ? "123" : log.clientData.Endpoint.ToString(), DbType.String, size: 25);
- parameters.Add("Session", log.clientData.SessionID == null ? "123" : log.clientData.SessionID, DbType.String, size: 36);
- parameters.Add("HourIndex", workTime.Hour, DbType.Int32);
- try
- {
- await sqlConnection.ExecuteAsync(withHourIndexCommand, parameters);
- }
- catch
- {
- logger.LogInformation("Connection Log insert with HourIndex failed, insert without HourIndex");
- await sqlConnection.ExecuteAsync(command, parameters);
- }
- bundleHandlerData.AddCompletedData(log);
- }
- times.Add(watch.ElapsedMilliseconds);
- //await trans.CommitAsync();
- watch.Stop();
- if (watch.ElapsedMilliseconds > 1000)
- {
- logger.LogWarning($"MachineLog Bundle Dapper {string.Join("/", times)} coint:{bundleHandlerData.Datas.Count()}");
- }
- return;
- }
- private async Task BulkInsertWithBulkCopy(IEnumerable<MachineLog> parms)
- {
- var watcher = Stopwatch.StartNew();
- long t0 = 0, t1 = 0, t2 = 0, t3 = 0, t4 = 0;
- var parmsList = parms.ToList();
- if (parmsList.Count == 0)
- {
- return;
- }
- var workTime = DateTime.UtcNow;
- if (!await GetTableExist(workTime))
- {
- var candidate = parmsList.First();
- await WriteMachineLogEF(candidate);
- parmsList.Remove(candidate);
- }
- t0 = watcher.ElapsedMilliseconds;
- var table = new DataTable();
- table.Columns.Add("CreatedOn");
- table.Columns.Add("ChargeBoxId");
- table.Columns.Add("MessageType");
- table.Columns.Add("Data");
- table.Columns.Add("Msg");
- table.Columns.Add("IsSent");
- table.Columns.Add("EVSEEndPoint");
- table.Columns.Add("Session");
- foreach (var param in parmsList)
- {
- var row = table.NewRow();
- row["CreatedOn"] = workTime;
- row["ChargeBoxId"] = param.clientData.ChargeBoxId == null ? "unknown" : param.clientData.ChargeBoxId.Replace("'", "''");
- row["MessageType"] = param.messageType.Replace("'", "''");
- row["Data"] = param.data.Replace("'", "''");
- row["Msg"] = param.errorMsg.Replace("'", "''");
- row["IsSent"] = param.isSent;
- row["EVSEEndPoint"] = param.clientData.Endpoint == null ? "123" : param.clientData.Endpoint.ToString();
- row["Session"] = param.clientData.SessionID == null ? "123" : param.clientData.SessionID;
- table.Rows.Add(row);
- }
- t1 = watcher.ElapsedMilliseconds;
- using SqlConnection sqlConnection = await sqlConnectionFactory.CreateAsync();
- using SqlBulkCopy sqlBulkCopy = new SqlBulkCopy(sqlConnection);
- t2 = watcher.ElapsedMilliseconds;
- sqlBulkCopy.BatchSize = parmsList.Count();
- sqlBulkCopy.DestinationTableName = GetTableName(workTime);
- sqlBulkCopy.ColumnMappings.Add("CreatedOn", "CreatedOn");
- sqlBulkCopy.ColumnMappings.Add("ChargeBoxId", "ChargeBoxId");
- sqlBulkCopy.ColumnMappings.Add("MessageType", "MessageType");
- sqlBulkCopy.ColumnMappings.Add("Data", "Data");
- sqlBulkCopy.ColumnMappings.Add("Msg", "Msg");
- sqlBulkCopy.ColumnMappings.Add("IsSent", "IsSent");
- sqlBulkCopy.ColumnMappings.Add("EVSEEndPoint", "EVSEEndPoint");
- sqlBulkCopy.ColumnMappings.Add("Session", "Session");
- t3 = watcher.ElapsedMilliseconds;
- await sqlBulkCopy.WriteToServerAsync(table);
- watcher.Stop();
- t4 = watcher.ElapsedMilliseconds;
- if (t4 > 500)
- {
- logger.LogWarning("ConnectionLog BulkInsertWithBulkCopy Slow {0}/{1}/{2}/{3}/{4}/{5}", t0, t1, t2, t3, t4, parms.Count());
- }
- }
- private async ValueTask<bool> GetTableExist(DateTime tableDateTime)
- {
- var tableName = GetTableName(tableDateTime);
- if (_existTables.Contains(tableName))
- {
- return true;
- }
- FormattableString checkTableSql = $"SELECT Count(*) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = {tableName}";
- using var db = await connectionLogdbContextFactory.CreateDbContextAsync();
- var resultList = await db.Database.SqlQuery<int>(checkTableSql)?.ToListAsync();
- if (resultList is not null && resultList.Count > 0 && resultList[0] > 0)
- {
- _existTables.Enqueue(tableName);
- if (_existTables.Count > 30)
- {
- _existTables.TryDequeue(out _);
- }
- return true;
- }
- return false;
- }
- private static string GetTableName(DateTime dateTime)
- => $"MachineConnectionLog{dateTime:yyMMdd}";
- private int GetLimit(IConfiguration configuration)
- {
- var limitConfig = configuration[LimitConfigKey];
- int limit = 10;
- if (limitConfig != default)
- {
- int.TryParse(limitConfig, out limit);
- }
- return limit;
- }
- }
- internal record MachineLog(WsClientData clientData, string data, string messageType, string errorMsg, bool isSent);
|