|
@@ -3,9 +3,14 @@ using EVCB_OCPP.Packet.Messages.SubTypes;
|
|
|
using EVCB_OCPP.WSServer.Helper;
|
|
|
using Microsoft.Data.SqlClient;
|
|
|
using Microsoft.EntityFrameworkCore;
|
|
|
+using Microsoft.Extensions.Configuration;
|
|
|
using Microsoft.Extensions.Logging;
|
|
|
using System;
|
|
|
+using System.Collections.Concurrent;
|
|
|
using System.Collections.Generic;
|
|
|
+using System.Configuration;
|
|
|
+using System.Data;
|
|
|
+using System.Diagnostics;
|
|
|
using System.Linq;
|
|
|
using System.Text;
|
|
|
using System.Threading.Tasks;
|
|
@@ -16,25 +21,197 @@ public class MeterValueDbService
|
|
|
{
|
|
|
private readonly IDbContextFactory<MeterValueDBContext> meterValueDbContextFactory;
|
|
|
private readonly ILoggerFactory loggerFactory;
|
|
|
-
|
|
|
-
|
|
|
- public MeterValueDbService(IDbContextFactory<MeterValueDBContext> meterValueDbContextFactory, ILoggerFactory loggerFactory)
|
|
|
+ private readonly QueueSemaphore insertSemaphore;
|
|
|
+ private readonly string meterValueConnectionString;
|
|
|
+ private readonly ILogger logger;
|
|
|
+ private GroupSingleHandler<InsertMeterValueParam> insertMeterValueHandler;
|
|
|
+ private Queue<string> _existTables = new();
|
|
|
+
|
|
|
+ public MeterValueDbService(
|
|
|
+ IDbContextFactory<MeterValueDBContext> meterValueDbContextFactory,
|
|
|
+ ILogger<MeterValueDbService> logger,
|
|
|
+ ILoggerFactory loggerFactory,
|
|
|
+ IConfiguration configuration)
|
|
|
{
|
|
|
this.meterValueDbContextFactory = meterValueDbContextFactory;
|
|
|
this.loggerFactory = loggerFactory;
|
|
|
-
|
|
|
+ this.meterValueConnectionString = configuration.GetConnectionString("MeterValueDBContext");
|
|
|
+ this.logger = logger;
|
|
|
+
|
|
|
+ InitInsertMeterValueHandler();
|
|
|
+
|
|
|
+ var insertLimit = GetInsertLimit(configuration);
|
|
|
+ insertSemaphore = new QueueSemaphore(insertLimit);
|
|
|
+
|
|
|
}
|
|
|
|
|
|
- public async Task InsertAsync(string chargeBoxId, byte connectorId, decimal value, DateTime createdOn
|
|
|
+ public Task InsertAsync(string chargeBoxId, byte connectorId, decimal value, DateTime createdOn
|
|
|
, int contextId, int formatId, int measurandId, int phaseId
|
|
|
, int locationId, int unitId, int transactionId)
|
|
|
{
|
|
|
- using var db = await meterValueDbContextFactory.CreateDbContextAsync();
|
|
|
+
|
|
|
+
|
|
|
|
|
|
- string sp = "[dbo].[uspInsertMeterValueRecord] @ChargeBoxId, @ConnectorId,@Value,@CreatedOn,@ContextId,@FormatId,@MeasurandId,@PhaseId,@LocationId,@UnitId,@TransactionId";
|
|
|
+
|
|
|
|
|
|
var param = new InsertMeterValueParam(chargeBoxId, connectorId, value, createdOn, contextId, formatId, measurandId, phaseId, locationId, unitId, transactionId);
|
|
|
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ return insertMeterValueHandler.HandleAsync(param);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void InitInsertMeterValueHandler()
|
|
|
+ {
|
|
|
+ if (insertMeterValueHandler is not null)
|
|
|
+ {
|
|
|
+ throw new Exception($"{nameof(InitInsertMeterValueHandler)} should only called once");
|
|
|
+ }
|
|
|
+
|
|
|
+ insertMeterValueHandler = new GroupSingleHandler<InsertMeterValueParam>(
|
|
|
+ BulkInsertWithCache,
|
|
|
+
|
|
|
+ logger
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ private async Task BundleInsertWithStoredProcedure(IEnumerable<InsertMeterValueParam> parms)
|
|
|
+ {
|
|
|
+ foreach (var param in parms)
|
|
|
+ {
|
|
|
+ await InsertWithStoredProcedure(param);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private async Task BulkInsertWithCache(IEnumerable<InsertMeterValueParam> parms)
|
|
|
+ {
|
|
|
+ var watcher = Stopwatch.StartNew();
|
|
|
+ long t0 = 0, t1 = 0, t2 = 0, t3 = 0, t4 = 0;
|
|
|
+
|
|
|
+ var parmsList = parms.ToList();
|
|
|
+ foreach (var param in parms)
|
|
|
+ {
|
|
|
+ if (!await GetTableExist(param.createdOn))
|
|
|
+ {
|
|
|
+ await InsertWithStoredProcedure(param);
|
|
|
+ parmsList.Remove(param);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ var gruopParams = parmsList.GroupBy(x => GetTableName(x.createdOn));
|
|
|
+
|
|
|
+ t0 = watcher.ElapsedMilliseconds;
|
|
|
+ foreach (var group in gruopParams)
|
|
|
+ {
|
|
|
+ var table = new DataTable();
|
|
|
+ table.Columns.Add("ChargeBoxId");
|
|
|
+ table.Columns.Add("ConnectorId");
|
|
|
+ table.Columns.Add("Value");
|
|
|
+ table.Columns.Add("CreatedOn");
|
|
|
+ table.Columns.Add("ContextId");
|
|
|
+ table.Columns.Add("FormatId");
|
|
|
+ table.Columns.Add("MeasurandId");
|
|
|
+ table.Columns.Add("PhaseId");
|
|
|
+ table.Columns.Add("LocationId");
|
|
|
+ table.Columns.Add("UnitId");
|
|
|
+ table.Columns.Add("TransactionId");
|
|
|
+
|
|
|
+ foreach (var param in group)
|
|
|
+ {
|
|
|
+ var row = table.NewRow();
|
|
|
+ row["ChargeBoxId"] = param.chargeBoxId;
|
|
|
+ row["ConnectorId"] = param.connectorId;
|
|
|
+ row["Value"] = param.value;
|
|
|
+ row["CreatedOn"] = param.createdOn;
|
|
|
+ row["ContextId"] = param.contextId;
|
|
|
+ row["FormatId"] = param.formatId;
|
|
|
+ row["MeasurandId"] = param.measurandId;
|
|
|
+ row["PhaseId"] = param.phaseId;
|
|
|
+ row["LocationId"] = param.locationId;
|
|
|
+ row["UnitId"] = param.unitId;
|
|
|
+ row["TransactionId"] = param.transactionId;
|
|
|
+
|
|
|
+ table.Rows.Add(row);
|
|
|
+ }
|
|
|
+ t1 = watcher.ElapsedMilliseconds;
|
|
|
+ using SqlConnection sqlConnection = new SqlConnection(meterValueConnectionString);
|
|
|
+ sqlConnection.Open();
|
|
|
+ using SqlBulkCopy sqlBulkCopy = new SqlBulkCopy(sqlConnection);
|
|
|
+ t2 = watcher.ElapsedMilliseconds;
|
|
|
+ sqlBulkCopy.BatchSize = group.Count();
|
|
|
+ sqlBulkCopy.DestinationTableName = group.Key;
|
|
|
+
|
|
|
+ sqlBulkCopy.ColumnMappings.Add("ChargeBoxId", "ChargeBoxId");
|
|
|
+ sqlBulkCopy.ColumnMappings.Add("ConnectorId", "ConnectorId");
|
|
|
+ sqlBulkCopy.ColumnMappings.Add("Value", "Value");
|
|
|
+ sqlBulkCopy.ColumnMappings.Add("CreatedOn", "CreatedOn");
|
|
|
+ sqlBulkCopy.ColumnMappings.Add("ContextId", "ContextId");
|
|
|
+ sqlBulkCopy.ColumnMappings.Add("FormatId", "FormatId");
|
|
|
+ sqlBulkCopy.ColumnMappings.Add("MeasurandId", "MeasurandId");
|
|
|
+ sqlBulkCopy.ColumnMappings.Add("PhaseId", "PhaseId");
|
|
|
+ sqlBulkCopy.ColumnMappings.Add("LocationId", "LocationId");
|
|
|
+ sqlBulkCopy.ColumnMappings.Add("UnitId", "UnitId");
|
|
|
+ sqlBulkCopy.ColumnMappings.Add("TransactionId", "TransactionId");
|
|
|
+ t3 = watcher.ElapsedMilliseconds;
|
|
|
+ sqlBulkCopy.WriteToServer(table);
|
|
|
+ }
|
|
|
+ watcher.Stop();
|
|
|
+ t4 = watcher.ElapsedMilliseconds;
|
|
|
+
|
|
|
+ if (t4 > 500)
|
|
|
+ {
|
|
|
+ logger.LogWarning("BulkInsertWithCache Slow {0}/{1}/{2}/{3}/{4}",t0,t1,t2,t3,t4);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private async ValueTask<bool> GetTableExist(DateTime tableDateTime)
|
|
|
+ {
|
|
|
+ var tableName = GetTableName(tableDateTime);
|
|
|
+ if (_existTables.Contains(tableName))
|
|
|
+ {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ FormattableString checkTableSql = $"SELECT Count(*) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = {tableName}";
|
|
|
+
|
|
|
+ using var db = await meterValueDbContextFactory.CreateDbContextAsync();
|
|
|
+ var resultList = db.Database.SqlQuery<int>(checkTableSql)?.ToList();
|
|
|
+
|
|
|
+ if (resultList is not null && resultList.Count > 0 && resultList[0] > 0)
|
|
|
+ {
|
|
|
+ _existTables.Enqueue(tableName);
|
|
|
+ if (_existTables.Count > 30)
|
|
|
+ {
|
|
|
+ _existTables.TryDequeue(out _);
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ private async Task InsertWithStoredProcedure(InsertMeterValueParam param)
|
|
|
+ {
|
|
|
+ using var db = await meterValueDbContextFactory.CreateDbContextAsync();
|
|
|
+
|
|
|
+ string sp = "[dbo].[uspInsertMeterValueRecord] @ChargeBoxId," +
|
|
|
+"@ConnectorId,@Value,@CreatedOn,@ContextId,@FormatId,@MeasurandId,@PhaseId,@LocationId,@UnitId,@TransactionId";
|
|
|
+
|
|
|
List<SqlParameter> parameter = new List<SqlParameter>();
|
|
|
parameter.AddInsertMeterValueRecordSqlParameters(
|
|
|
chargeBoxId: param.chargeBoxId
|
|
@@ -49,48 +226,22 @@ public class MeterValueDbService
|
|
|
, unitId: param.unitId
|
|
|
, transactionId: param.transactionId);
|
|
|
|
|
|
- await db.Database.ExecuteSqlRawAsync(sp, parameter.ToArray());
|
|
|
-
|
|
|
+ db.Database.ExecuteSqlRaw(sp, parameter.ToArray());
|
|
|
}
|
|
|
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
+ private static string GetTableName(DateTime dateTime)
|
|
|
+ => $"ConnectorMeterValueRecord{dateTime:yyMMdd}";
|
|
|
+
|
|
|
+ private int GetInsertLimit(IConfiguration configuration)
|
|
|
+ {
|
|
|
+ var limitConfig = configuration["MeterValueDbInsertLimit"];
|
|
|
+ int limit = 10;
|
|
|
+ if (limitConfig != default)
|
|
|
+ {
|
|
|
+ int.TryParse(limitConfig, out limit);
|
|
|
+ }
|
|
|
+ return limit;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public record InsertMeterValueParam(string chargeBoxId, byte connectorId, decimal value, DateTime createdOn
|