|
@@ -0,0 +1,243 @@
|
|
|
+using Dapper;
|
|
|
+using EVCB_OCPP.Domain;
|
|
|
+using EVCB_OCPP.WSServer.Service;
|
|
|
+using Microsoft.Data.SqlClient;
|
|
|
+using Microsoft.EntityFrameworkCore;
|
|
|
+using Microsoft.Extensions.Configuration;
|
|
|
+using Microsoft.Extensions.Logging;
|
|
|
+using System;
|
|
|
+using System.Collections.Concurrent;
|
|
|
+using System.Collections.Generic;
|
|
|
+using System.Data;
|
|
|
+using System.Diagnostics;
|
|
|
+using System.Linq;
|
|
|
+using System.Text;
|
|
|
+using System.Threading.Tasks;
|
|
|
+
|
|
|
+namespace EVCB_OCPP.WSServer.Helper;
|
|
|
+
|
|
|
+public class MeterValueGroupSingleHandler
|
|
|
+{
|
|
|
+ public MeterValueGroupSingleHandler(
|
|
|
+ IDbContextFactory<MeterValueDBContext> meterValueDbContextFactory,
|
|
|
+ IConfiguration configuration,
|
|
|
+ ILogger<MeterValueGroupSingleHandler> logger)
|
|
|
+ {
|
|
|
+ this.meterValueDbContextFactory = meterValueDbContextFactory;
|
|
|
+ this.configuration = configuration;
|
|
|
+ this.logger = logger;
|
|
|
+
|
|
|
+ //singleWorkLock = new (_WorkerCnt);
|
|
|
+ //singleHandleTask = StartHandleTask();
|
|
|
+ this.meterValueConnectionString = configuration.GetConnectionString("MeterValueDBContext");
|
|
|
+
|
|
|
+ var workerCnt = 20;
|
|
|
+ _handleTasks = new Task[workerCnt];
|
|
|
+ for (int cnt = 0; cnt < workerCnt; cnt++)
|
|
|
+ {
|
|
|
+ _handleTasks[cnt] = StartHandleTask();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private readonly IDbContextFactory<MeterValueDBContext> meterValueDbContextFactory;
|
|
|
+ private readonly IConfiguration configuration;
|
|
|
+
|
|
|
+ //private readonly Func<IEnumerable<T>, Task> handleFunc;
|
|
|
+ private readonly ILogger logger;
|
|
|
+ private readonly string meterValueConnectionString;
|
|
|
+ private readonly BlockingCollection<(InsertMeterValueParam param, SemaphoreSlim waitLock)> blockQueue = new();
|
|
|
+ private static Queue<string> _existTables = new();
|
|
|
+ //private SemaphoreSlim singleWorkLock;// = new SemaphoreSlim(1);
|
|
|
+ private bool IsStarted = false;
|
|
|
+ private Task singleHandleTask;
|
|
|
+ private Task[] _handleTasks;
|
|
|
+
|
|
|
+ public Task HandleAsync(InsertMeterValueParam param)
|
|
|
+ {
|
|
|
+ IsStarted = true;
|
|
|
+
|
|
|
+ SemaphoreSlim reqLock = new(0);
|
|
|
+ blockQueue.Add((param, reqLock));
|
|
|
+ //TryStartHandler();
|
|
|
+ return reqLock.WaitAsync();
|
|
|
+ }
|
|
|
+
|
|
|
+ //private void TryStartHandler()
|
|
|
+ //{
|
|
|
+ // if (!singleWorkLock.Wait(0))
|
|
|
+ // {
|
|
|
+ // return;
|
|
|
+ // }
|
|
|
+
|
|
|
+ // if (waitList.Count == 0)
|
|
|
+ // {
|
|
|
+ // singleWorkLock.Release();
|
|
|
+ // return;
|
|
|
+ // }
|
|
|
+
|
|
|
+ // singleHandleTask = StartHandleTask();
|
|
|
+ //}
|
|
|
+
|
|
|
+ private Task StartHandleTask()
|
|
|
+ {
|
|
|
+ return Task.Run(async () => {
|
|
|
+ while (true)
|
|
|
+ {
|
|
|
+ var handleList = new List<(InsertMeterValueParam param, SemaphoreSlim waitLock)>();
|
|
|
+ try
|
|
|
+ {
|
|
|
+ var startData = blockQueue.Take();
|
|
|
+ handleList.Add(startData);
|
|
|
+ }
|
|
|
+ catch (InvalidOperationException e)
|
|
|
+ {
|
|
|
+ logger.LogError(e, "blockQueue.Take Error");
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ var watch = Stopwatch.StartNew();
|
|
|
+ long t0, t1, t2, t3;
|
|
|
+
|
|
|
+ while (blockQueue.TryTake(out var data))
|
|
|
+ {
|
|
|
+ handleList.Add(data);
|
|
|
+ }
|
|
|
+ t0 = watch.ElapsedMilliseconds;
|
|
|
+ var parms = handleList.Select(x => x.param).ToList();
|
|
|
+ t1 = watch.ElapsedMilliseconds;
|
|
|
+ await BundleInsertWithDapper(parms).ConfigureAwait(false);
|
|
|
+ t2 = watch.ElapsedMilliseconds;
|
|
|
+
|
|
|
+ foreach (var handled in handleList)
|
|
|
+ {
|
|
|
+ handled.waitLock.Release();
|
|
|
+ }
|
|
|
+ watch.Stop();
|
|
|
+ t3 = watch.ElapsedMilliseconds;
|
|
|
+ if (t3 > 1000)
|
|
|
+ {
|
|
|
+ logger.LogWarning("MeterValue StartHandleTask {0}/{1}/{2}/{3}", t0, t1, t2, t3);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ private async Task BundleInsertWithDapper(IEnumerable<InsertMeterValueParam> parms)
|
|
|
+ {
|
|
|
+ var watch = Stopwatch.StartNew();
|
|
|
+ long t0, t1, t2, t3;
|
|
|
+
|
|
|
+ var parmsList = parms.ToList();
|
|
|
+ t0 = watch.ElapsedMilliseconds;
|
|
|
+ foreach (var param in parms)
|
|
|
+ {
|
|
|
+ if (!await GetTableExist(param.createdOn))
|
|
|
+ {
|
|
|
+ await InsertWithStoredProcedure(param);
|
|
|
+ parmsList.Remove(param);
|
|
|
+ }
|
|
|
+ t1 = watch.ElapsedMilliseconds;
|
|
|
+ watch.Stop();
|
|
|
+ if (t1 > 500)
|
|
|
+ {
|
|
|
+ logger.LogWarning("MeterValue InsertWithStoredProcedure {0}/{1}", t0, t1);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ t1 = watch.ElapsedMilliseconds;
|
|
|
+ //logger.LogInformation("MeterValue bundle insert cnt {0}", parmsList.Count);
|
|
|
+ var gruopParams = parmsList.GroupBy(x => GetTableName(x.createdOn));
|
|
|
+
|
|
|
+ t2 = watch.ElapsedMilliseconds;
|
|
|
+ foreach (var group in gruopParams)
|
|
|
+ {
|
|
|
+ using SqlConnection sqlConnection = new SqlConnection(meterValueConnectionString);
|
|
|
+ sqlConnection.Open();
|
|
|
+ using var tans = sqlConnection.BeginTransaction();
|
|
|
+
|
|
|
+ var tableName = group.Key;
|
|
|
+ string command = $"""
|
|
|
+ INSERT INTO {tableName} (ConnectorId, Value, CreatedOn, ContextId, FormatId, MeasurandId, PhaseId, LocationId, UnitId, ChargeBoxId, TransactionId)
|
|
|
+ VALUES (@ConnectorId, @Value, @CreatedOn, @ContextId, @FormatId, @MeasurandId, @PhaseId, @LocationId, @UnitId, @ChargeBoxId, @TransactionId);
|
|
|
+ """;
|
|
|
+ foreach (var param in group)
|
|
|
+ {
|
|
|
+ var parameters = new DynamicParameters();
|
|
|
+ parameters.Add("ConnectorId", param.connectorId, DbType.Int16);
|
|
|
+ parameters.Add("Value", param.value, DbType.Decimal, precision: 18, scale: 8);
|
|
|
+ parameters.Add("CreatedOn", param.createdOn, DbType.DateTime);
|
|
|
+ parameters.Add("ContextId", param.contextId, DbType.Int32);
|
|
|
+ parameters.Add("FormatId", param.formatId, DbType.Int32);
|
|
|
+ parameters.Add("MeasurandId", param.measurandId, DbType.Int32);
|
|
|
+ parameters.Add("PhaseId", param.phaseId, DbType.Int32);
|
|
|
+ parameters.Add("LocationId", param.locationId, DbType.Int32);
|
|
|
+ parameters.Add("UnitId", param.unitId, DbType.Int32);
|
|
|
+ parameters.Add("ChargeBoxId", param.chargeBoxId, DbType.String, size: 50);
|
|
|
+ parameters.Add("TransactionId", param.transactionId, DbType.Int32);
|
|
|
+ await sqlConnection.ExecuteAsync(command, parameters, tans);
|
|
|
+ }
|
|
|
+
|
|
|
+ await tans.CommitAsync();
|
|
|
+ }
|
|
|
+
|
|
|
+ watch.Stop();
|
|
|
+ t3 = watch.ElapsedMilliseconds;
|
|
|
+ if (t3 > 400)
|
|
|
+ {
|
|
|
+ logger.LogWarning("MeterValue Dapper {0}/{1}/{2}/{3}", t0, t1, t2, t3);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private async ValueTask<bool> GetTableExist(DateTime tableDateTime)
|
|
|
+ {
|
|
|
+ var tableName = GetTableName(tableDateTime);
|
|
|
+ if (_existTables.Contains(tableName))
|
|
|
+ {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ FormattableString checkTableSql = $"SELECT Count(*) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = {tableName}";
|
|
|
+
|
|
|
+ using var db = await meterValueDbContextFactory.CreateDbContextAsync();
|
|
|
+ var resultList = db.Database.SqlQuery<int>(checkTableSql)?.ToList();
|
|
|
+
|
|
|
+ if (resultList is not null && resultList.Count > 0 && resultList[0] > 0)
|
|
|
+ {
|
|
|
+ _existTables.Enqueue(tableName);
|
|
|
+ if (_existTables.Count > 30)
|
|
|
+ {
|
|
|
+ _existTables.TryDequeue(out _);
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ private async Task InsertWithStoredProcedure(InsertMeterValueParam param)
|
|
|
+ {
|
|
|
+ using var db = await meterValueDbContextFactory.CreateDbContextAsync();
|
|
|
+
|
|
|
+ string sp = "[dbo].[uspInsertMeterValueRecord] @ChargeBoxId," +
|
|
|
+"@ConnectorId,@Value,@CreatedOn,@ContextId,@FormatId,@MeasurandId,@PhaseId,@LocationId,@UnitId,@TransactionId";
|
|
|
+
|
|
|
+ List<SqlParameter> parameter = new List<SqlParameter>();
|
|
|
+ parameter.AddInsertMeterValueRecordSqlParameters(
|
|
|
+ chargeBoxId: param.chargeBoxId
|
|
|
+ , connectorId: (byte)param.connectorId
|
|
|
+ , value: param.value
|
|
|
+ , createdOn: param.createdOn
|
|
|
+ , contextId: param.contextId
|
|
|
+ , formatId: param.formatId
|
|
|
+ , measurandId: param.measurandId
|
|
|
+ , phaseId: param.phaseId
|
|
|
+ , locationId: param.locationId
|
|
|
+ , unitId: param.unitId
|
|
|
+ , transactionId: param.transactionId);
|
|
|
+
|
|
|
+ db.Database.ExecuteSqlRaw(sp, parameter.ToArray());
|
|
|
+ }
|
|
|
+
|
|
|
+ private static string GetTableName(DateTime dateTime)
|
|
|
+ => $"ConnectorMeterValueRecord{dateTime:yyMMdd}";
|
|
|
+}
|