|
- using Dapper;
- using EVCB_OCPP.Domain;
- using EVCB_OCPP.WSServer.Service;
- using Microsoft.Data.SqlClient;
- using Microsoft.EntityFrameworkCore;
- using Microsoft.Extensions.Configuration;
- using Microsoft.Extensions.Logging;
- using System;
- using System.Collections.Concurrent;
- using System.Collections.Generic;
- using System.Data;
- using System.Diagnostics;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
- namespace EVCB_OCPP.WSServer.Helper;
- public class MeterValueGroupSingleHandler
- {
- public MeterValueGroupSingleHandler(
- IDbContextFactory<MeterValueDBContext> meterValueDbContextFactory,
- SqlConnectionFactory<MeterValueDBContext> connectionFactory,
- IConfiguration configuration,
- ILogger<MeterValueGroupSingleHandler> logger)
- {
- this.meterValueDbContextFactory = meterValueDbContextFactory;
- this.connectionFactory = connectionFactory;
- this.configuration = configuration;
- this.logger = logger;
- //singleWorkLock = new (_WorkerCnt);
- //singleHandleTask = StartHandleTask();
- //this.meterValueConnectionString = configuration.GetConnectionString("MeterValueDBContext");
- var workerCnt = 20;
- _handleTasks = new Task[workerCnt];
- for (int cnt = 0; cnt < workerCnt; cnt++)
- {
- _handleTasks[cnt] = StartHandleTask();
- }
- }
- private readonly IDbContextFactory<MeterValueDBContext> meterValueDbContextFactory;
- private readonly SqlConnectionFactory<MeterValueDBContext> connectionFactory;
- private readonly IConfiguration configuration;
- //private readonly Func<IEnumerable<T>, Task> handleFunc;
- private readonly ILogger logger;
- //private readonly string meterValueConnectionString;
- private readonly BlockingCollection<(InsertMeterValueParam param, SemaphoreSlim waitLock)> blockQueue = new();
- private static Queue<string> _existTables = new();
- //private SemaphoreSlim singleWorkLock;// = new SemaphoreSlim(1);
- private bool IsStarted = false;
- private Task singleHandleTask;
- private Task[] _handleTasks;
- public Task HandleAsync(InsertMeterValueParam param)
- {
- IsStarted = true;
- SemaphoreSlim reqLock = new(0);
- blockQueue.Add((param, reqLock));
- //TryStartHandler();
- return reqLock.WaitAsync();
- }
- //private void TryStartHandler()
- //{
- // if (!singleWorkLock.Wait(0))
- // {
- // return;
- // }
- // if (waitList.Count == 0)
- // {
- // singleWorkLock.Release();
- // return;
- // }
- // singleHandleTask = StartHandleTask();
- //}
- private Task StartHandleTask()
- {
- return Task.Run(async () => {
- while (true)
- {
- var handleList = new List<(InsertMeterValueParam param, SemaphoreSlim waitLock)>();
- try
- {
- var startData = blockQueue.Take();
- handleList.Add(startData);
- }
- catch (InvalidOperationException e)
- {
- logger.LogError(e, "blockQueue.Take Error");
- break;
- }
- var watch = Stopwatch.StartNew();
- long t0, t1, t2, t3;
- while (blockQueue.TryTake(out var data))
- {
- handleList.Add(data);
- }
- t0 = watch.ElapsedMilliseconds;
- var parms = handleList.Select(x => x.param).ToList();
- t1 = watch.ElapsedMilliseconds;
- await BundleInsertWithDapper(parms).ConfigureAwait(false);
- t2 = watch.ElapsedMilliseconds;
- foreach (var handled in handleList)
- {
- handled.waitLock.Release();
- }
- watch.Stop();
- t3 = watch.ElapsedMilliseconds;
- if (t3 > 1000)
- {
- logger.LogWarning("MeterValue StartHandleTask {0}/{1}/{2}/{3}", t0, t1, t2, t3);
- }
- }
- });
- }
- private async Task BundleInsertWithDapper(IEnumerable<InsertMeterValueParam> parms)
- {
- var watch = Stopwatch.StartNew();
- long t0, t1, t2, t3;
- var parmsList = parms.ToList();
- t0 = watch.ElapsedMilliseconds;
- foreach (var param in parms)
- {
- if (!await GetTableExist(param.createdOn))
- {
- await InsertWithStoredProcedure(param);
- parmsList.Remove(param);
- }
- t1 = watch.ElapsedMilliseconds;
- watch.Stop();
- if (t1 > 500)
- {
- logger.LogWarning("MeterValue InsertWithStoredProcedure {0}/{1}", t0, t1);
- }
- }
- t1 = watch.ElapsedMilliseconds;
- //logger.LogInformation("MeterValue bundle insert cnt {0}", parmsList.Count);
- var gruopParams = parmsList.GroupBy(x => GetTableName(x.createdOn));
- t2 = watch.ElapsedMilliseconds;
- foreach (var group in gruopParams)
- {
- using SqlConnection sqlConnection = await connectionFactory.CreateAsync();
- using var tans = sqlConnection.BeginTransaction();
- var tableName = group.Key;
- string command = $"""
- INSERT INTO {tableName} (ConnectorId, Value, CreatedOn, ContextId, FormatId, MeasurandId, PhaseId, LocationId, UnitId, ChargeBoxId, TransactionId)
- VALUES (@ConnectorId, @Value, @CreatedOn, @ContextId, @FormatId, @MeasurandId, @PhaseId, @LocationId, @UnitId, @ChargeBoxId, @TransactionId);
- """;
- foreach (var param in group)
- {
- var parameters = new DynamicParameters();
- parameters.Add("ConnectorId", param.connectorId, DbType.Int16);
- parameters.Add("Value", param.value, DbType.Decimal, precision: 18, scale: 8);
- parameters.Add("CreatedOn", param.createdOn, DbType.DateTime);
- parameters.Add("ContextId", param.contextId, DbType.Int32);
- parameters.Add("FormatId", param.formatId, DbType.Int32);
- parameters.Add("MeasurandId", param.measurandId, DbType.Int32);
- parameters.Add("PhaseId", param.phaseId, DbType.Int32);
- parameters.Add("LocationId", param.locationId, DbType.Int32);
- parameters.Add("UnitId", param.unitId, DbType.Int32);
- parameters.Add("ChargeBoxId", param.chargeBoxId, DbType.String, size: 50);
- parameters.Add("TransactionId", param.transactionId, DbType.Int32);
- await sqlConnection.ExecuteAsync(command, parameters, tans);
- }
- await tans.CommitAsync();
- }
- watch.Stop();
- t3 = watch.ElapsedMilliseconds;
- if (t3 > 400)
- {
- logger.LogWarning("MeterValue Dapper {0}/{1}/{2}/{3}", t0, t1, t2, t3);
- }
- }
- private async ValueTask<bool> GetTableExist(DateTime tableDateTime)
- {
- var tableName = GetTableName(tableDateTime);
- if (_existTables.Contains(tableName))
- {
- return true;
- }
- FormattableString checkTableSql = $"SELECT Count(*) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = {tableName}";
- using var db = await meterValueDbContextFactory.CreateDbContextAsync();
- var resultList = await db.Database.SqlQuery<int>(checkTableSql)?.ToListAsync();
- if (resultList is not null && resultList.Count > 0 && resultList[0] > 0)
- {
- _existTables.Enqueue(tableName);
- if (_existTables.Count > 30)
- {
- _existTables.TryDequeue(out _);
- }
- return true;
- }
- return false;
- }
- private async Task InsertWithStoredProcedure(InsertMeterValueParam param)
- {
- using var db = await meterValueDbContextFactory.CreateDbContextAsync();
- string sp = "[dbo].[uspInsertMeterValueRecord] @ChargeBoxId," +
- "@ConnectorId,@Value,@CreatedOn,@ContextId,@FormatId,@MeasurandId,@PhaseId,@LocationId,@UnitId,@TransactionId";
- List<SqlParameter> parameter = new List<SqlParameter>();
- parameter.AddInsertMeterValueRecordSqlParameters(
- chargeBoxId: param.chargeBoxId
- , connectorId: (byte)param.connectorId
- , value: param.value
- , createdOn: param.createdOn
- , contextId: param.contextId
- , formatId: param.formatId
- , measurandId: param.measurandId
- , phaseId: param.phaseId
- , locationId: param.locationId
- , unitId: param.unitId
- , transactionId: param.transactionId);
- db.Database.ExecuteSqlRaw(sp, parameter.ToArray());
- }
- private static string GetTableName(DateTime dateTime)
- => $"ConnectorMeterValueRecord{dateTime:yyMMdd}";
- }
|