using Dapper; using EVCB_OCPP.Domain; using EVCB_OCPP.Domain.ConnectionFactory; using EVCB_OCPP.WSServer.Service.DbService; using Microsoft.Data.SqlClient; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Data; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading.Tasks; namespace EVCB_OCPP.WSServer.Helper; public class MeterValueGroupSingleHandler { public MeterValueGroupSingleHandler( IDbContextFactory meterValueDbContextFactory, ISqlConnectionFactory connectionFactory, IConfiguration configuration, ILogger logger) { this.meterValueDbContextFactory = meterValueDbContextFactory; this.connectionFactory = connectionFactory; this.configuration = configuration; this.logger = logger; //singleWorkLock = new (_WorkerCnt); //singleHandleTask = StartHandleTask(); //this.meterValueConnectionString = configuration.GetConnectionString("MeterValueDBContext"); var workerCnt = 20; _handleTasks = new Task[workerCnt]; for (int cnt = 0; cnt < workerCnt; cnt++) { _handleTasks[cnt] = StartHandleTask(); } } private readonly IDbContextFactory meterValueDbContextFactory; private readonly ISqlConnectionFactory connectionFactory; private readonly IConfiguration configuration; //private readonly Func, Task> handleFunc; private readonly ILogger logger; //private readonly string meterValueConnectionString; private readonly BlockingCollection<(InsertMeterValueParam param, SemaphoreSlim waitLock)> blockQueue = new(); private static Queue _existTables = new(); //private SemaphoreSlim singleWorkLock;// = new SemaphoreSlim(1); private bool IsStarted = false; private Task singleHandleTask; private Task[] _handleTasks; public Task HandleAsync(InsertMeterValueParam param) { IsStarted = true; SemaphoreSlim reqLock = new(0); blockQueue.Add((param, reqLock)); //TryStartHandler(); return reqLock.WaitAsync(); } //private void TryStartHandler() //{ // if (!singleWorkLock.Wait(0)) // { // return; // } // if (waitList.Count == 0) // { // singleWorkLock.Release(); // return; // } // singleHandleTask = StartHandleTask(); //} private Task StartHandleTask() { return Task.Run(async () => { while (true) { var handleList = new List<(InsertMeterValueParam param, SemaphoreSlim waitLock)>(); try { var startData = blockQueue.Take(); handleList.Add(startData); } catch (InvalidOperationException e) { logger.LogError(e, "blockQueue.Take Error"); break; } var watch = Stopwatch.StartNew(); long t0, t1, t2, t3; while (blockQueue.TryTake(out var data)) { handleList.Add(data); } t0 = watch.ElapsedMilliseconds; var parms = handleList.Select(x => x.param).ToList(); t1 = watch.ElapsedMilliseconds; await BundleInsertWithDapper(parms).ConfigureAwait(false); t2 = watch.ElapsedMilliseconds; foreach (var handled in handleList) { handled.waitLock.Release(); } watch.Stop(); t3 = watch.ElapsedMilliseconds; if (t3 > 1000) { logger.LogWarning("MeterValue StartHandleTask {0}/{1}/{2}/{3}", t0, t1, t2, t3); } } }); } private async Task BundleInsertWithDapper(IEnumerable parms) { var watch = Stopwatch.StartNew(); long t0, t1, t2, t3; var parmsList = parms.ToList(); t0 = watch.ElapsedMilliseconds; foreach (var param in parms) { if (!await GetTableExist(param.createdOn)) { await InsertWithStoredProcedure(param); parmsList.Remove(param); } t1 = watch.ElapsedMilliseconds; watch.Stop(); if (t1 > 500) { logger.LogWarning("MeterValue InsertWithStoredProcedure {0}/{1}", t0, t1); } } t1 = watch.ElapsedMilliseconds; //logger.LogInformation("MeterValue bundle insert cnt {0}", parmsList.Count); var gruopParams = parmsList.GroupBy(x => GetTableName(x.createdOn)); t2 = watch.ElapsedMilliseconds; foreach (var group in gruopParams) { using SqlConnection sqlConnection = await connectionFactory.CreateAsync(); using var tans = sqlConnection.BeginTransaction(); var tableName = group.Key; string command = $""" INSERT INTO {tableName} (ConnectorId, Value, CreatedOn, ContextId, FormatId, MeasurandId, PhaseId, LocationId, UnitId, ChargeBoxId, TransactionId) VALUES (@ConnectorId, @Value, @CreatedOn, @ContextId, @FormatId, @MeasurandId, @PhaseId, @LocationId, @UnitId, @ChargeBoxId, @TransactionId); """; foreach (var param in group) { var parameters = new DynamicParameters(); parameters.Add("ConnectorId", param.connectorId, DbType.Int16); parameters.Add("Value", param.value, DbType.Decimal, precision: 18, scale: 8); parameters.Add("CreatedOn", param.createdOn, DbType.DateTime); parameters.Add("ContextId", param.contextId, DbType.Int32); parameters.Add("FormatId", param.formatId, DbType.Int32); parameters.Add("MeasurandId", param.measurandId, DbType.Int32); parameters.Add("PhaseId", param.phaseId, DbType.Int32); parameters.Add("LocationId", param.locationId, DbType.Int32); parameters.Add("UnitId", param.unitId, DbType.Int32); parameters.Add("ChargeBoxId", param.chargeBoxId, DbType.String, size: 50); parameters.Add("TransactionId", param.transactionId, DbType.Int32); await sqlConnection.ExecuteAsync(command, parameters, tans); } await tans.CommitAsync(); } watch.Stop(); t3 = watch.ElapsedMilliseconds; if (t3 > 400) { logger.LogWarning("MeterValue Dapper {0}/{1}/{2}/{3}", t0, t1, t2, t3); } } private async ValueTask GetTableExist(DateTime tableDateTime) { var tableName = GetTableName(tableDateTime); if (_existTables.Contains(tableName)) { return true; } FormattableString checkTableSql = $"SELECT Count(*) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = {tableName}"; using var db = await meterValueDbContextFactory.CreateDbContextAsync(); var resultList = await db.Database.SqlQuery(checkTableSql)?.ToListAsync(); if (resultList is not null && resultList.Count > 0 && resultList[0] > 0) { _existTables.Enqueue(tableName); if (_existTables.Count > 30) { _existTables.TryDequeue(out _); } return true; } return false; } private async Task InsertWithStoredProcedure(InsertMeterValueParam param) { using var db = await meterValueDbContextFactory.CreateDbContextAsync(); string sp = "[dbo].[uspInsertMeterValueRecord] @ChargeBoxId," + "@ConnectorId,@Value,@CreatedOn,@ContextId,@FormatId,@MeasurandId,@PhaseId,@LocationId,@UnitId,@TransactionId"; List parameter = new List(); parameter.AddInsertMeterValueRecordSqlParameters( chargeBoxId: param.chargeBoxId , connectorId: (byte)param.connectorId , value: param.value , createdOn: param.createdOn , contextId: param.contextId , formatId: param.formatId , measurandId: param.measurandId , phaseId: param.phaseId , locationId: param.locationId , unitId: param.unitId , transactionId: param.transactionId); db.Database.ExecuteSqlRaw(sp, parameter.ToArray()); } private static string GetTableName(DateTime dateTime) => $"ConnectorMeterValueRecord{dateTime:yyMMdd}"; }