using Dapper; using EVCB_OCPP.Domain; using EVCB_OCPP.Domain.ConnectionFactory; using EVCB_OCPP.WSServer.Dto; using EVCB_OCPP.WSServer.Helper; using Microsoft.Data.SqlClient; using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Metadata.Internal; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using System.Data; using System.Data.Common; using System.Diagnostics; namespace EVCB_OCPP.WSServer.Service.DbService; public class MeterValueDbService { private readonly IDbContextFactory meterValueDbContextFactory; private readonly ISqlConnectionFactory sqlConnectionFactory; private readonly ILoggerFactory loggerFactory; private readonly MeterValueGroupSingleHandler meterValueGroupSingleHandler; private readonly QueueSemaphore insertSemaphore; private readonly ILogger logger; private readonly Queue _existTables = new(); private GroupHandler insertMeterValueHandler; public MeterValueDbService( IDbContextFactory meterValueDbContextFactory, ISqlConnectionFactory sqlConnectionFactory, ILogger logger, ILoggerFactory loggerFactory, IConfiguration configuration //, MeterValueGroupSingleHandler meterValueGroupSingleHandler ) { this.meterValueDbContextFactory = meterValueDbContextFactory; this.sqlConnectionFactory = sqlConnectionFactory; this.loggerFactory = loggerFactory; //this.meterValueGroupSingleHandler = meterValueGroupSingleHandler; //this.meterValueConnectionString = configuration.GetConnectionString("MeterValueDBContext"); this.logger = logger; InitInsertMeterValueHandler(); var insertLimit = GetInsertLimit(configuration); insertSemaphore = new QueueSemaphore(insertLimit); } public Task InsertAsync(string chargeBoxId, byte connectorId, decimal value, DateTime createdOn , int contextId, int formatId, int measurandId, int phaseId , int locationId, int unitId, int transactionId) { var param = new InsertMeterValueParam(chargeBoxId, connectorId, value, createdOn, contextId, formatId, measurandId, phaseId, locationId, unitId, transactionId); //return insertMeterValueHandler.HandleAsync(param); return InsertAsync(param); //return meterValueGroupSingleHandler.HandleAsync(param); } public Task InsertAsync(InsertMeterValueParam param) { //return InsertWithEF(param); return InsertWithDapper(param); } public Task InsertBundleAsync(IEnumerable param) { return BundleInsertWithDapper(param); } public async Task> GetTransactionSOC(int TxId, DateTime queryDate) { List SOCCollection = new List(); try { using var conn = await sqlConnectionFactory.CreateAsync(); var parameters = new DynamicParameters(); parameters.Add("@TransactionId", TxId, DbType.Int32, ParameterDirection.Input); string strSql = $""" SELECT [TransactionId],Min(Value) as MinSoC,Max(Value) as MaxSoC FROM [dbo].ConnectorMeterValueRecord{queryDate.Date.ToString("yyMMdd")} WHERE TransactionId=@TransactionId and MeasurandId=20 and Value!= 0 group by [TransactionId] """; var result = await conn.QueryFirstOrDefaultAsync(strSql, parameters); // SOCCollection = results.Select(decimal.Parse).ToList(); if (result != null) { SOCCollection.Add(result.MaxSoC); SOCCollection.Add(result.MinSoC); } } catch (Exception ex) { } return SOCCollection; } private void InitInsertMeterValueHandler() { if (insertMeterValueHandler is not null) { throw new Exception($"{nameof(InitInsertMeterValueHandler)} should only called once"); } insertMeterValueHandler = new GroupHandler( //BulkInsertWithBulkCopy, BundleInsertWithDapper, //loggerFactory.CreateLogger("InsertMeterValueHandler") logger, workerCnt: 1 ); } private async Task BundleInsertWithStoredProcedure(IEnumerable parms) { foreach (var param in parms) { await InsertWithStoredProcedure(param); } } private async Task InsertWithEF(InsertMeterValueParam param) { using var token = await insertSemaphore.GetToken(); using var db = await meterValueDbContextFactory.CreateDbContextAsync(); string sp = "[dbo].[uspInsertMeterValueRecord] @ChargeBoxId, @ConnectorId,@Value,@CreatedOn,@ContextId,@FormatId,@MeasurandId,@PhaseId,@LocationId,@UnitId,@TransactionId"; List parameter = new List(); parameter.AddInsertMeterValueRecordSqlParameters( chargeBoxId: param.chargeBoxId , connectorId: param.connectorId , value: param.value , createdOn: param.createdOn , contextId: param.contextId , formatId: param.formatId , measurandId: param.measurandId , phaseId: param.phaseId , locationId: param.locationId , unitId: param.unitId , transactionId: param.transactionId); await db.Database.ExecuteSqlRawAsync(sp, parameter.ToArray()); } private async Task InsertWithDapper(InsertMeterValueParam param) { var watch = Stopwatch.StartNew(); long t0, t1; if (!await GetTableExist(param.createdOn)) { t0 = watch.ElapsedMilliseconds; await InsertWithStoredProcedure(param); watch.Stop(); t1 = watch.ElapsedMilliseconds; if (t1 > 500) { logger.LogWarning("MeterValue InsertWithStoredProcedure {0}/{1}", t0, t1); } return; } t0 = watch.ElapsedMilliseconds; var tableName = GetTableName(param.createdOn); await InsertWithNoCheckDapper(tableName, param); watch.Stop(); t1 = watch.ElapsedMilliseconds; if (t1 > 700) { logger.LogWarning("MeterValue Dapper {0}/{1}", t0, t1); } } private Task BundleInsertWithDapper(IEnumerable param) { return BundleInsertWithDapper(new BundleHandlerData(param.ToList())); } private async Task BundleInsertWithDapper(BundleHandlerData bundleHandlerData) { List completedParams = new(); var watch = Stopwatch.StartNew(); long t0, t1, t2, t3, t4; var parmsList = bundleHandlerData.Datas.ToList(); foreach (var param in bundleHandlerData.Datas) { if (!await GetTableExist(param.createdOn)) { await InsertWithStoredProcedure(param); parmsList.Remove(param); bundleHandlerData.AddCompletedData(param); } } t0 = watch.ElapsedMilliseconds; //logger.LogInformation("MeterValue bundle insert cnt {0}", parmsList.Count); var gruopParams = parmsList.GroupBy(x => GetTableName(x.createdOn)); t1 = watch.ElapsedMilliseconds; using SqlConnection sqlConnection = await sqlConnectionFactory.CreateAsync(); //using var trans = await sqlConnection.BeginTransactionAsync(); t2 = watch.ElapsedMilliseconds; List ExecuteTasks = new List(); foreach (var group in gruopParams) { var tableName = group.Key; foreach (var param in group) { await InsertWithNoCheckDapper(tableName, param, sqlConnection //, trans ); bundleHandlerData.AddCompletedData(param); } } t3 = watch.ElapsedMilliseconds; //await trans.CommitAsync(); watch.Stop(); t4 = watch.ElapsedMilliseconds; if (t4 > 500) { logger.LogWarning("MeterValue Dapper {0}/{1}/{2}/{3}/{4}/{5}", t0, t1, t2, t3, t4, bundleHandlerData.CompletedDatas.Count()); } } private async Task InsertWithNoCheckDapper(string tableName, InsertMeterValueParam data, SqlConnection conn = null, DbTransaction trans = null) { string command = $""" INSERT INTO {tableName} (ConnectorId, Value, CreatedOn, ContextId, FormatId, MeasurandId, PhaseId, LocationId, UnitId, ChargeBoxId, TransactionId) VALUES (@ConnectorId, @Value, @CreatedOn, @ContextId, @FormatId, @MeasurandId, @PhaseId, @LocationId, @UnitId, @ChargeBoxId, @TransactionId); """; bool isLocalConnection = conn is null; SqlConnection connection = isLocalConnection ? await sqlConnectionFactory.CreateAsync() : conn; var parameters = new DynamicParameters(); parameters.Add("ConnectorId", data.connectorId, DbType.Int16); parameters.Add("Value", data.value, DbType.Decimal, precision: 18, scale: 8); parameters.Add("CreatedOn", data.createdOn, DbType.DateTime); parameters.Add("ContextId", data.contextId, DbType.Int32); parameters.Add("FormatId", data.formatId, DbType.Int32); parameters.Add("MeasurandId", data.measurandId, DbType.Int32); parameters.Add("PhaseId", data.phaseId, DbType.Int32); parameters.Add("LocationId", data.locationId, DbType.Int32); parameters.Add("UnitId", data.unitId, DbType.Int32); parameters.Add("ChargeBoxId", data.chargeBoxId, DbType.String, size: 50); parameters.Add("TransactionId", data.transactionId, DbType.Int32); await connection.ExecuteAsync(command, parameters, trans); if (isLocalConnection) { connection.Dispose(); } } private async Task BulkInsertWithBulkCopy(IEnumerable parms) { var watcher = Stopwatch.StartNew(); long t0 = 0, t1 = 0, t2 = 0, t3 = 0, t4 = 0; var parmsList = parms.ToList(); foreach (var param in parms) { if (!await GetTableExist(param.createdOn)) { await InsertWithStoredProcedure(param); parmsList.Remove(param); } } //logger.LogInformation("MeterValue bundle insert cnt {0}", parmsList.Count); var gruopParams = parmsList.GroupBy(x => GetTableName(x.createdOn)); t0 = watcher.ElapsedMilliseconds; foreach (var group in gruopParams) { var table = new DataTable(); table.Columns.Add("ChargeBoxId"); table.Columns.Add("ConnectorId"); table.Columns.Add("Value"); table.Columns.Add("CreatedOn"); table.Columns.Add("ContextId"); table.Columns.Add("FormatId"); table.Columns.Add("MeasurandId"); table.Columns.Add("PhaseId"); table.Columns.Add("LocationId"); table.Columns.Add("UnitId"); table.Columns.Add("TransactionId"); foreach (var param in group) { var row = table.NewRow(); row["ChargeBoxId"] = param.chargeBoxId; row["ConnectorId"] = param.connectorId; row["Value"] = param.value; row["CreatedOn"] = param.createdOn; row["ContextId"] = param.contextId; row["FormatId"] = param.formatId; row["MeasurandId"] = param.measurandId; row["PhaseId"] = param.phaseId; row["LocationId"] = param.locationId; row["UnitId"] = param.unitId; row["TransactionId"] = param.transactionId; table.Rows.Add(row); } t1 = watcher.ElapsedMilliseconds; using SqlConnection sqlConnection = await sqlConnectionFactory.CreateAsync(); using SqlBulkCopy sqlBulkCopy = new SqlBulkCopy(sqlConnection); t2 = watcher.ElapsedMilliseconds; sqlBulkCopy.BatchSize = group.Count(); sqlBulkCopy.DestinationTableName = group.Key; sqlBulkCopy.ColumnMappings.Add("ChargeBoxId", "ChargeBoxId"); sqlBulkCopy.ColumnMappings.Add("ConnectorId", "ConnectorId"); sqlBulkCopy.ColumnMappings.Add("Value", "Value"); sqlBulkCopy.ColumnMappings.Add("CreatedOn", "CreatedOn"); sqlBulkCopy.ColumnMappings.Add("ContextId", "ContextId"); sqlBulkCopy.ColumnMappings.Add("FormatId", "FormatId"); sqlBulkCopy.ColumnMappings.Add("MeasurandId", "MeasurandId"); sqlBulkCopy.ColumnMappings.Add("PhaseId", "PhaseId"); sqlBulkCopy.ColumnMappings.Add("LocationId", "LocationId"); sqlBulkCopy.ColumnMappings.Add("UnitId", "UnitId"); sqlBulkCopy.ColumnMappings.Add("TransactionId", "TransactionId"); t3 = watcher.ElapsedMilliseconds; await sqlBulkCopy.WriteToServerAsync(table); } watcher.Stop(); t4 = watcher.ElapsedMilliseconds; if (t4 > 500) { logger.LogWarning("MeterValue BulkInsertWithBulkCopy Slow {0}/{1}/{2}/{3}/{4}/{5}", t0, t1, t2, t3, t4, parms.Count()); } } private async ValueTask GetTableExist(DateTime tableDateTime) { var tableName = GetTableName(tableDateTime); if (_existTables.Contains(tableName)) { return true; } FormattableString checkTableSql = $"SELECT Count(*) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = {tableName}"; using var db = await meterValueDbContextFactory.CreateDbContextAsync(); var resultList = db.Database.SqlQuery(checkTableSql)?.ToList(); if (resultList is not null && resultList.Count > 0 && resultList[0] > 0) { _existTables.Enqueue(tableName); if (_existTables.Count > 30) { _existTables.TryDequeue(out _); } return true; } return false; } private async Task InsertWithStoredProcedure(InsertMeterValueParam param) { using var db = await meterValueDbContextFactory.CreateDbContextAsync(); string sp = "[dbo].[uspInsertMeterValueRecord] @ChargeBoxId," + "@ConnectorId,@Value,@CreatedOn,@ContextId,@FormatId,@MeasurandId,@PhaseId,@LocationId,@UnitId,@TransactionId"; List parameter = new List(); parameter.AddInsertMeterValueRecordSqlParameters( chargeBoxId: param.chargeBoxId , connectorId: param.connectorId , value: param.value , createdOn: param.createdOn , contextId: param.contextId , formatId: param.formatId , measurandId: param.measurandId , phaseId: param.phaseId , locationId: param.locationId , unitId: param.unitId , transactionId: param.transactionId); await db.Database.ExecuteSqlRawAsync(sp, parameter.ToArray()); } private static string GetTableName(DateTime dateTime) => $"ConnectorMeterValueRecord{dateTime:yyMMdd}"; private int GetInsertLimit(IConfiguration configuration) { var limitConfig = configuration["MeterValueDbInsertLimit"]; int limit = 10; if (limitConfig != default) { int.TryParse(limitConfig, out limit); } return limit; } } public record InsertMeterValueParam(string chargeBoxId, byte connectorId, decimal value, DateTime createdOn , int contextId, int formatId, int measurandId, int phaseId , int locationId, int unitId, int transactionId);