Robert 1 жил өмнө
parent
commit
49b980ab74

+ 147 - 0
EVCB_OCPP.WSServer/Helper/GroupHandler.cs

@@ -0,0 +1,147 @@
+using Microsoft.Extensions.Logging;
+using Microsoft.IdentityModel.Tokens;
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace EVCB_OCPP.WSServer.Helper;
+
+public class GroupHandler<T>
+{
+    public GroupHandler(
+        Func<BundleHandlerData<T>, Task> handleFunc,
+        ILogger logger, int workerCnt = 1, int maxRetry = 10)
+    {
+        this.handleFunc = handleFunc;
+        this.logger = logger;
+
+        this.maxRetry = maxRetry;
+        workersLock = new SemaphoreSlim(workerCnt);
+        //singleWorkLock = new(_WorkerCnt);
+    }
+
+    private readonly Func<BundleHandlerData<T>, Task> handleFunc;
+    private readonly ILogger logger;
+    private readonly int maxRetry;
+    private readonly ConcurrentQueue<WaitParam<T>> waitList = new();
+
+    private SemaphoreSlim workersLock;// = new SemaphoreSlim(1);
+
+    public async Task HandleAsync(T param)
+    {
+        var waitData = new WaitParam<T>() { Data = param, Waiter = new SemaphoreSlim(0), Exception = null };
+        waitList.Enqueue(waitData);
+        TryStartHandler();
+        await waitData.Waiter.WaitAsync();
+        if (waitData.Exception is not null)
+        {
+            throw waitData.Exception;
+        }
+    }
+
+    private void TryStartHandler()
+    {
+        if (!workersLock.Wait(0))
+        {
+            return;
+        }
+
+        if (waitList.Count == 0)
+        {
+            workersLock.Release();
+            return;
+        }
+
+        _ = StartHandleTask();
+    }
+
+    private async Task StartHandleTask()
+    {
+        var timer = Stopwatch.StartNew();
+        List<long> times = new();
+
+        var requests = new List<WaitParam<T>>();
+
+        while (waitList.TryDequeue(out var handle))
+        {
+            requests.Add(handle);
+        }
+        times.Add(timer.ElapsedMilliseconds);
+
+        int cnt = 0;
+        Exception lastException = null;
+        var datas = requests.Select(x => x.Data).ToList();
+
+        for (; cnt < maxRetry; cnt++)
+        {
+            var bundleHandledata = new BundleHandlerData<T>(datas);
+            try
+            {
+                await handleFunc(bundleHandledata);
+            }
+            catch (Exception e)
+            {
+                lastException = e;
+            }
+
+            var completedRequests = requests.Where(x => bundleHandledata.CompletedDatas.Contains(x.Data)).ToList();
+            foreach (var request in completedRequests)
+            {
+                request.Waiter.Release();
+            }
+
+            datas = datas.Except(bundleHandledata.CompletedDatas).ToList();
+
+            if (datas.IsNullOrEmpty())
+            {
+                break;
+            }
+            logger.LogError(lastException?.Message);
+            times.Add(timer.ElapsedMilliseconds);
+        }
+
+        var uncompletedRequests = requests.Where(x => datas.Contains(x.Data)).ToList();
+        foreach (var request in uncompletedRequests)
+        {
+            request.Exception = lastException;
+            request.Waiter.Release();
+        }
+        workersLock.Release();
+
+        timer.Stop();
+        if (timer.ElapsedMilliseconds > 1000)
+        {
+            logger.LogWarning($"StartHandleTask {string.Join("/", times)}");
+        }
+
+        TryStartHandler();
+    }
+}
+
+public class BundleHandlerData<T>
+{
+    public List<T> Datas { get; set; }
+    public List<T> CompletedDatas { get; set; }
+
+    public BundleHandlerData(List<T> datas)
+    {
+        Datas = datas;
+        CompletedDatas = new();
+    }
+
+    public void AddCompletedData(T competedData)
+    {
+        CompletedDatas.Add(competedData);
+    }
+}
+
+internal class WaitParam<T>
+{
+    public T Data { get; init; }
+    public SemaphoreSlim Waiter { get; init; }
+    public Exception Exception { get; set; }
+}

+ 0 - 119
EVCB_OCPP.WSServer/Helper/GroupSingleHandler.cs

@@ -1,119 +0,0 @@
-using Microsoft.Extensions.Logging;
-using System;
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.Diagnostics;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-
-namespace EVCB_OCPP.WSServer.Helper;
-
-public class GroupSingleHandler<T>
-{
-    public GroupSingleHandler(Func<IEnumerable<T>, Task> handleFunc, ILogger logger, int workerCnt = 1)
-    {
-        this.handleFunc = handleFunc;
-        this.logger = logger;
-
-        WorkerCnt = workerCnt;
-        //singleWorkLock = new(_WorkerCnt);
-    }
-
-    private int _WorkerCnt = 1;
-    public int WorkerCnt
-    {
-        get => _WorkerCnt;
-        set
-        {
-            if (IsStarted)
-            {
-                throw new Exception($"{nameof(WorkerCnt)} must not be changed afted {nameof(HandleAsync)} is called");
-            }
-
-            _WorkerCnt = value;
-            singleWorkLock = new(_WorkerCnt);
-        }
-    }
-
-    private readonly Func<IEnumerable<T>, Task> handleFunc;
-    private readonly ILogger logger;
-    private readonly ConcurrentQueue<(T param, SemaphoreSlim waitLock)> waitList = new();
-    private SemaphoreSlim singleWorkLock;// = new SemaphoreSlim(1);
-    private bool IsStarted = false;
-    private Task singleHandleTask;
-
-    public Task HandleAsync(T param)
-    {
-        IsStarted = true;
-
-        SemaphoreSlim reqLock = new(0);
-        waitList.Enqueue((param, reqLock));
-        TryStartHandler();
-        return reqLock.WaitAsync();
-    }
-
-    private void TryStartHandler()
-    {
-        if (!singleWorkLock.Wait(0))
-        {
-            return;
-        }
-
-        if (waitList.Count == 0)
-        {
-            singleWorkLock.Release();
-            return;
-        }
-
-        singleHandleTask = StartHandleTask();
-    }
-
-    private async Task StartHandleTask()
-    {
-        var timer = Stopwatch.StartNew();
-        long t0 = 0, t1 = 0, t2 = 0;
-
-        var handleList = new List<(T param, SemaphoreSlim waitLock)>();
-
-        while (waitList.TryDequeue(out var handle))
-        {
-            handleList.Add(handle);
-        }
-        t0 = timer.ElapsedMilliseconds;
-
-        int cnt = 0;
-        do
-        {
-            cnt++;
-            try
-            {
-                var task = handleFunc(handleList.Select(x => x.param));
-                await task;
-                t1 = timer.ElapsedMilliseconds;
-                break;
-            }
-            catch (Exception e)
-            {
-                logger.LogError(e, "Trying Cnt {0}", cnt);
-                logger.LogError(e.Message);
-            }
-        }
-        while (true);
-
-        foreach (var handled in handleList)
-        {
-            handled.waitLock.Release();
-        }
-        singleWorkLock.Release();
-
-        timer.Stop();
-        t2= timer.ElapsedMilliseconds;
-        if (t2 >1000)
-        {
-            logger.LogWarning("StartHandleTask {0}/{1}/{2}", t0, t1, t2);
-        }
-
-        TryStartHandler();
-    }
-}

+ 24 - 24
EVCB_OCPP.WSServer/Service/ConnectionLogdbService.cs

@@ -52,7 +52,7 @@ public class ConnectionLogdbService : IConnectionLogdbService
     private readonly QueueHandler<MachineLog> queueHandler;
     //private readonly string connectionLogdbConnectionString;
     private readonly Queue<string> _existTables = new();
-    private GroupSingleHandler<MachineLog> insertConnectonLogHandler;
+    private GroupHandler<MachineLog> insertConnectonLogHandler;
 
     public void WarmUpLog()
     {
@@ -135,10 +135,8 @@ public class ConnectionLogdbService : IConnectionLogdbService
             throw new Exception($"{nameof(InitInsertConnectonLogHandler)} should only called once");
         }
 
-        insertConnectonLogHandler = new GroupSingleHandler<MachineLog>(
-            //BulkInsertWithBulkCopy,
+        insertConnectonLogHandler = new GroupHandler<MachineLog>(
             BundleInsertWithDapper,
-            //loggerFactory.CreateLogger("InsertMeterValueHandler")
             logger,
             workerCnt: 20
             );
@@ -187,45 +185,45 @@ public class ConnectionLogdbService : IConnectionLogdbService
         }
     }
 
-    private async Task BundleInsertWithDapper(IEnumerable<MachineLog> parms)
+    private async Task BundleInsertWithDapper(BundleHandlerData<MachineLog> bundleHandlerData)
     {
         var watch = Stopwatch.StartNew();
-        long t0, t1, t2, t3, t4;
+        var times = new List<long>();
         var workTime = DateTime.UtcNow;
 
-        var parmsList = parms.ToList();
+        var parmsList = bundleHandlerData.Datas.ToList();
 
         if (parmsList.Count == 0)
         {
-            return;
+            return ;
         }
+
         var candidate = parmsList[0];
         if (!await GetTableExist(workTime))
         {
-            t0 = watch.ElapsedMilliseconds;
+            times.Add(watch.ElapsedMilliseconds);
             await WriteMachineLogEF(candidate);
-            watch.Stop();
-            t1 = watch.ElapsedMilliseconds;
-            if (t1 > 500)
+            times.Add(watch.ElapsedMilliseconds);
+
+            if (watch.ElapsedMilliseconds > 500)
             {
-                logger.LogWarning("ConnectionLog InsertWithDapper {0}/{1}", t0, t1);
+                logger.LogWarning($"ConnectionLog InsertWithDapper {string.Join("/", times)}");
             }
             parmsList.Remove(candidate);
+            bundleHandlerData.AddCompletedData(candidate);
         }
 
-        t0 = watch.ElapsedMilliseconds;
-
-        t1 = watch.ElapsedMilliseconds;
+        times.Add(watch.ElapsedMilliseconds);
         using SqlConnection sqlConnection = await sqlConnectionFactory.CreateAsync();
         //using var trans = await sqlConnection.BeginTransactionAsync();
 
-        t2 = watch.ElapsedMilliseconds;
+        times.Add(watch.ElapsedMilliseconds);
         var tableName = GetTableName(workTime);
 
         string command = $"""
-            INSERT INTO {tableName} (CreatedOn, ChargeBoxId, MessageType, Data, Msg, IsSent, EVSEEndPoint, Session)
-            VALUES (@CreatedOn, @ChargeBoxId, @MessageType, @Data, @Msg, @IsSent, @EVSEEndPoint, @Session);
-            """;
+        INSERT INTO {tableName} (CreatedOn, ChargeBoxId, MessageType, Data, Msg, IsSent, EVSEEndPoint, Session)
+        VALUES (@CreatedOn, @ChargeBoxId, @MessageType, @Data, @Msg, @IsSent, @EVSEEndPoint, @Session);
+        """;
 
         foreach (var log in parmsList)
         {
@@ -242,17 +240,19 @@ public class ConnectionLogdbService : IConnectionLogdbService
             await sqlConnection.ExecuteAsync(command, parameters
                 //, trans
                 );
+            bundleHandlerData.AddCompletedData(log);
         }
 
-        t3 = watch.ElapsedMilliseconds;
+        times.Add(watch.ElapsedMilliseconds);
         //await trans.CommitAsync();
 
         watch.Stop();
-        t4 = watch.ElapsedMilliseconds;
-        if (t4 > 1000)
+        if (watch.ElapsedMilliseconds > 1000)
         {
-            logger.LogWarning("MachineLog Bundle Dapper {0}/{1}/{2}/{3}/{4}/{5}", t0, t1, t2, t3, t4, parms.Count());
+            logger.LogWarning($"MachineLog Bundle Dapper {string.Join("/", times)} coint:{bundleHandlerData.Datas.Count()}");
         }
+
+        return ;
     }
 
     private async Task BulkInsertWithBulkCopy(IEnumerable<MachineLog> parms)

+ 28 - 27
EVCB_OCPP.WSServer/Service/MainDbService.cs

@@ -80,9 +80,9 @@ public class MainDbService : IMainDbService
     //private string connectionString;
     private readonly QueueSemaphore startupSemaphore;
     private readonly SemaphoreSlim opSemaphore;
-    private GroupSingleHandler<StatusNotificationParam> statusNotificationHandler;
-    private GroupSingleHandler<UpdateMachineBasicInfoParam> updateMachineBasicInfoHandler;
-    private GroupSingleHandler<ServerMessage> addServerMessageHandler;
+    private GroupHandler<StatusNotificationParam> statusNotificationHandler;
+    private GroupHandler<UpdateMachineBasicInfoParam> updateMachineBasicInfoHandler;
+    private GroupHandler<ServerMessage> addServerMessageHandler;
 
     public async Task<MachineAndCustomerInfo> GetMachineIdAndCustomerInfo(string ChargeBoxId)
     {
@@ -376,7 +376,7 @@ public class MainDbService : IMainDbService
             throw new Exception($"{nameof(InitUpdateConnectorStatusHandler)} should only called once");
         }
 
-        statusNotificationHandler = new GroupSingleHandler<StatusNotificationParam>(
+        statusNotificationHandler = new GroupHandler<StatusNotificationParam>(
             handleFunc: BundleUpdateConnectorStatusDapper,
             logger: loggerFactory.CreateLogger("StatusNotificationHandler"),
             workerCnt: 1);
@@ -388,7 +388,7 @@ public class MainDbService : IMainDbService
             throw new Exception($"{nameof(InitAddServerMessageHandler)} should only called once");
         }
 
-        addServerMessageHandler = new GroupSingleHandler<ServerMessage>(
+        addServerMessageHandler = new GroupHandler<ServerMessage>(
             handleFunc: BundleAddServerMessage,
             logger: loggerFactory.CreateLogger("AddServerMessageHandler"));
     }
@@ -400,7 +400,7 @@ public class MainDbService : IMainDbService
             throw new Exception($"{nameof(InitUpdateMachineBasicInfoHandler)} should only called once");
         }
 
-        updateMachineBasicInfoHandler = new GroupSingleHandler<UpdateMachineBasicInfoParam>(
+        updateMachineBasicInfoHandler = new GroupHandler<UpdateMachineBasicInfoParam>(
             handleFunc: BundelUpdateMachineBasicInfo,
             logger: loggerFactory.CreateLogger("UpdateMachineBasicInfoHandler"),
             workerCnt: 10);
@@ -427,12 +427,12 @@ public class MainDbService : IMainDbService
         //using var semaphoreWrapper = await startupSemaphore.GetToken();
     }
 
-    private async Task BundelUpdateMachineBasicInfo(IEnumerable<UpdateMachineBasicInfoParam> pams)
+    private async Task BundelUpdateMachineBasicInfo(BundleHandlerData<UpdateMachineBasicInfoParam> bundleHandlerData)
     {
         using var db = await contextFactory.CreateDbContextAsync();
         using var trans = await db.Database.BeginTransactionAsync();
 
-        pams = pams.DistinctBy(x => x.ChargeBoxId);
+        var pams = bundleHandlerData.Datas.DistinctBy(x => x.ChargeBoxId);
 
         foreach (var pam in pams)
         {
@@ -450,6 +450,8 @@ public class MainDbService : IMainDbService
 
         await db.SaveChangesAsync();
         await trans.CommitAsync();
+
+        bundleHandlerData.CompletedDatas.AddRange(bundleHandlerData.Datas);
     }
 
     private async Task UpdateConnectorStatusEF(string Id, ConnectorStatus Status)
@@ -638,13 +640,11 @@ public class MainDbService : IMainDbService
 
 
 
-    private Task BundleUpdateConnectorStatusDapper(IEnumerable<StatusNotificationParam> statusNotifications)
+    private async Task BundleUpdateConnectorStatusDapper(BundleHandlerData<StatusNotificationParam> bundleHandlerData)
     {
         using var conn = sqlConnectionFactory.Create();
-
-        foreach (var status in statusNotifications)
+        foreach (var status in bundleHandlerData.Datas)
         {
-
             var parameters = new DynamicParameters();
             parameters.Add("@Id", status.Id, DbType.String, ParameterDirection.Input, 36);
             parameters.Add("@CreatedOn", status.Status.CreatedOn, DbType.DateTime, ParameterDirection.Input);
@@ -654,34 +654,35 @@ public class MainDbService : IMainDbService
             parameters.Add("@VendorId", status.Status.VendorId, DbType.String, ParameterDirection.Input, 255);
             parameters.Add("@VendorErrorCode", status.Status.VendorErrorCode, DbType.String, ParameterDirection.Input, 100);
 
-            conn.Execute("""
-                update ConnectorStatus
-                set
-                CreatedOn = @CreatedOn,
-                Status = @Status,
-                ChargePointErrorCodeId = @ChargePointErrorCodeId,
-                ErrorInfo = @ErrorInfo,
-                VendorId = @VendorId,
-                VendorErrorCode = @VendorErrorCode
-                where Id = @Id
-                """, parameters);
+            await conn.ExecuteAsync("""
+                    update ConnectorStatus
+                    set
+                    CreatedOn = @CreatedOn,
+                    Status = @Status,
+                    ChargePointErrorCodeId = @ChargePointErrorCodeId,
+                    ErrorInfo = @ErrorInfo,
+                    VendorId = @VendorId,
+                    VendorErrorCode = @VendorErrorCode
+                    where Id = @Id
+                    """, parameters);
+            bundleHandlerData.AddCompletedData(status);
         }
-        return Task.CompletedTask;
     }
 
-    private async Task BundleAddServerMessage(IEnumerable<ServerMessage> messages)
+    private async Task BundleAddServerMessage(BundleHandlerData<ServerMessage> bundleHandlerData)
     {
         using var db = await contextFactory.CreateDbContextAsync();
         using var trans = await db.Database.BeginTransactionAsync();
 
-        foreach (var message in messages)
+        foreach (var message in bundleHandlerData.Datas)
         {
             await db.ServerMessage.AddAsync(message);
         }
 
         await db.SaveChangesAsync();
         await trans.CommitAsync();
-        //db.ChangeTracker.Clear();
+
+        bundleHandlerData.CompletedDatas.AddRange(bundleHandlerData.Datas);
     }
 
     private async Task AddServerMessageEF(ServerMessage message)

+ 14 - 6
EVCB_OCPP.WSServer/Service/MeterValueDbService.cs

@@ -22,7 +22,7 @@ public class MeterValueDbService
     private readonly QueueSemaphore insertSemaphore;
     private readonly ILogger logger;
     private readonly Queue<string> _existTables = new();
-    private GroupSingleHandler<InsertMeterValueParam> insertMeterValueHandler;
+    private GroupHandler<InsertMeterValueParam> insertMeterValueHandler;
 
     public MeterValueDbService(
         IDbContextFactory<MeterValueDBContext> meterValueDbContextFactory,
@@ -108,7 +108,7 @@ public class MeterValueDbService
             throw new Exception($"{nameof(InitInsertMeterValueHandler)} should only called once");
         }
 
-        insertMeterValueHandler = new GroupSingleHandler<InsertMeterValueParam>(
+        insertMeterValueHandler = new GroupHandler<InsertMeterValueParam>(
             //BulkInsertWithBulkCopy,
             BundleInsertWithDapper,
             //loggerFactory.CreateLogger("InsertMeterValueHandler")
@@ -180,18 +180,25 @@ public class MeterValueDbService
         }
     }
 
-    private async Task BundleInsertWithDapper(IEnumerable<InsertMeterValueParam> parms)
+    private Task BundleInsertWithDapper(IEnumerable<InsertMeterValueParam> param)
     {
+        return BundleInsertWithDapper(new BundleHandlerData<InsertMeterValueParam>(param.ToList()));
+    }
+
+    private async Task BundleInsertWithDapper(BundleHandlerData<InsertMeterValueParam> bundleHandlerData)
+    {
+        List<InsertMeterValueParam> completedParams = new ();
         var watch = Stopwatch.StartNew();
         long t0, t1, t2, t3, t4;
 
-        var parmsList = parms.ToList();
-        foreach (var param in parms)
+        var parmsList = bundleHandlerData.Datas.ToList();
+        foreach (var param in bundleHandlerData.Datas)
         {
             if (!await GetTableExist(param.createdOn))
             {
                 await InsertWithStoredProcedure(param);
                 parmsList.Remove(param);
+                bundleHandlerData.AddCompletedData(param);
             }
         }
 
@@ -214,6 +221,7 @@ public class MeterValueDbService
                 await InsertWithNoCheckDapper(tableName, param, sqlConnection
                     //, trans
                     );
+                bundleHandlerData.AddCompletedData(param);
             }
         }
 
@@ -224,7 +232,7 @@ public class MeterValueDbService
         t4 = watch.ElapsedMilliseconds;
         if (t4 > 500)
         {
-            logger.LogWarning("MeterValue Dapper {0}/{1}/{2}/{3}/{4}/{5}", t0, t1, t2, t3, t4, parms.Count());
+            logger.LogWarning("MeterValue Dapper {0}/{1}/{2}/{3}/{4}/{5}", t0, t1, t2, t3, t4,  bundleHandlerData.CompletedDatas.Count());
         }
     }
 

+ 7 - 0
Prod_Build.bat

@@ -0,0 +1,7 @@
+for /f %%i in ('git rev-parse --short HEAD') do set ssha=%%i
+FOR /f %%i IN ( version.txt ) DO set version=%%i
+git push
+git tag -a %version%
+git push --follow-tags
+docker build ./ -t evdevcontainerregistry.azurecr.io/server:%version% --label "git-commit=%ssha%"
+docker push evdevcontainerregistry.azurecr.io/server:%version%

+ 1 - 0
version.txt

@@ -0,0 +1 @@
+DockerTrans_v1.0.1