Bläddra i källkod

optimize GroupHandler

Robert 1 år sedan
förälder
incheckning
d5fabfbcb3

+ 147 - 0
EVCB_OCPP.WSServer/Helper/GroupHandler.cs

@@ -0,0 +1,147 @@
+using Microsoft.Extensions.Logging;
+using Microsoft.IdentityModel.Tokens;
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace EVCB_OCPP.WSServer.Helper;
+
+public class GroupHandler<T>
+{
+    public GroupHandler(
+        Func<BundleHandlerData<T>, Task> handleFunc,
+        ILogger logger, int workerCnt = 1, int maxRetry = 10)
+    {
+        this.handleFunc = handleFunc;
+        this.logger = logger;
+
+        this.maxRetry = maxRetry;
+        workersLock = new SemaphoreSlim(workerCnt);
+        //singleWorkLock = new(_WorkerCnt);
+    }
+
+    private readonly Func<BundleHandlerData<T>, Task> handleFunc;
+    private readonly ILogger logger;
+    private readonly int maxRetry;
+    private readonly ConcurrentQueue<WaitParam<T>> waitList = new();
+
+    private SemaphoreSlim workersLock;// = new SemaphoreSlim(1);
+
+    public async Task HandleAsync(T param)
+    {
+        var waitData = new WaitParam<T>() { Data = param, Waiter = new SemaphoreSlim(0), Exception = null };
+        waitList.Enqueue(waitData);
+        TryStartHandler();
+        await waitData.Waiter.WaitAsync();
+        if (waitData.Exception is not null)
+        {
+            throw waitData.Exception;
+        }
+    }
+
+    private void TryStartHandler()
+    {
+        if (!workersLock.Wait(0))
+        {
+            return;
+        }
+
+        if (waitList.Count == 0)
+        {
+            workersLock.Release();
+            return;
+        }
+
+        _ = StartHandleTask();
+    }
+
+    private async Task StartHandleTask()
+    {
+        var timer = Stopwatch.StartNew();
+        List<long> times = new();
+
+        var requests = new List<WaitParam<T>>();
+
+        while (waitList.TryDequeue(out var handle))
+        {
+            requests.Add(handle);
+        }
+        times.Add(timer.ElapsedMilliseconds);
+
+        int cnt = 0;
+        Exception lastException = null;
+        var datas = requests.Select(x => x.Data).ToList();
+
+        for (; cnt < maxRetry; cnt++)
+        {
+            var bundleHandledata = new BundleHandlerData<T>(datas);
+            try
+            {
+                await handleFunc(bundleHandledata);
+            }
+            catch (Exception e)
+            {
+                lastException = e;
+            }
+
+            var completedRequests = requests.Where(x => bundleHandledata.CompletedDatas.Contains(x.Data)).ToList();
+            foreach (var request in completedRequests)
+            {
+                request.Waiter.Release();
+            }
+
+            datas = datas.Except(bundleHandledata.CompletedDatas).ToList();
+
+            if (datas.IsNullOrEmpty())
+            {
+                break;
+            }
+            logger.LogError(lastException?.Message);
+            times.Add(timer.ElapsedMilliseconds);
+        }
+
+        var uncompletedRequests = requests.Where(x => datas.Contains(x.Data)).ToList();
+        foreach (var request in uncompletedRequests)
+        {
+            request.Exception = lastException;
+            request.Waiter.Release();
+        }
+        workersLock.Release();
+
+        timer.Stop();
+        if (timer.ElapsedMilliseconds > 1000)
+        {
+            logger.LogWarning($"StartHandleTask {string.Join("/", times)}");
+        }
+
+        TryStartHandler();
+    }
+}
+
+public class BundleHandlerData<T>
+{
+    public List<T> Datas { get; set; }
+    public List<T> CompletedDatas { get; set; }
+
+    public BundleHandlerData(List<T> datas)
+    {
+        Datas = datas;
+        CompletedDatas = new();
+    }
+
+    public void AddCompletedData(T competedData)
+    {
+        CompletedDatas.Add(competedData);
+    }
+}
+
+internal class WaitParam<T>
+{
+    public T Data { get; init; }
+    public SemaphoreSlim Waiter { get; init; }
+    public Exception Exception { get; set; }
+}

+ 0 - 119
EVCB_OCPP.WSServer/Helper/GroupSingleHandler.cs

@@ -1,119 +0,0 @@
-using Microsoft.Extensions.Logging;
-using System;
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.Diagnostics;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-
-namespace EVCB_OCPP.WSServer.Helper;
-
-public class GroupSingleHandler<T>
-{
-    public GroupSingleHandler(Func<IEnumerable<T>, Task> handleFunc, ILogger logger, int workerCnt = 1)
-    {
-        this.handleFunc = handleFunc;
-        this.logger = logger;
-
-        WorkerCnt = workerCnt;
-        //singleWorkLock = new(_WorkerCnt);
-    }
-
-    private int _WorkerCnt = 1;
-    public int WorkerCnt
-    {
-        get => _WorkerCnt;
-        set
-        {
-            if (IsStarted)
-            {
-                throw new Exception($"{nameof(WorkerCnt)} must not be changed afted {nameof(HandleAsync)} is called");
-            }
-
-            _WorkerCnt = value;
-            singleWorkLock = new(_WorkerCnt);
-        }
-    }
-
-    private readonly Func<IEnumerable<T>, Task> handleFunc;
-    private readonly ILogger logger;
-    private readonly ConcurrentQueue<(T param, SemaphoreSlim waitLock)> waitList = new();
-    private SemaphoreSlim singleWorkLock;// = new SemaphoreSlim(1);
-    private bool IsStarted = false;
-    private Task singleHandleTask;
-
-    public Task HandleAsync(T param)
-    {
-        IsStarted = true;
-
-        SemaphoreSlim reqLock = new(0);
-        waitList.Enqueue((param, reqLock));
-        TryStartHandler();
-        return reqLock.WaitAsync();
-    }
-
-    private void TryStartHandler()
-    {
-        if (!singleWorkLock.Wait(0))
-        {
-            return;
-        }
-
-        if (waitList.Count == 0)
-        {
-            singleWorkLock.Release();
-            return;
-        }
-
-        singleHandleTask = StartHandleTask();
-    }
-
-    private async Task StartHandleTask()
-    {
-        var timer = Stopwatch.StartNew();
-        long t0 = 0, t1 = 0, t2 = 0;
-
-        var handleList = new List<(T param, SemaphoreSlim waitLock)>();
-
-        while (waitList.TryDequeue(out var handle))
-        {
-            handleList.Add(handle);
-        }
-        t0 = timer.ElapsedMilliseconds;
-
-        int cnt = 0;
-        do
-        {
-            cnt++;
-            try
-            {
-                var task = handleFunc(handleList.Select(x => x.param));
-                await task;
-                t1 = timer.ElapsedMilliseconds;
-                break;
-            }
-            catch (Exception e)
-            {
-                logger.LogError(e, "Trying Cnt {0}", cnt);
-                logger.LogError(e.Message);
-            }
-        }
-        while (true);
-
-        foreach (var handled in handleList)
-        {
-            handled.waitLock.Release();
-        }
-        singleWorkLock.Release();
-
-        timer.Stop();
-        t2= timer.ElapsedMilliseconds;
-        if (t2 >1000)
-        {
-            logger.LogWarning("StartHandleTask {0}/{1}/{2}", t0, t1, t2);
-        }
-
-        TryStartHandler();
-    }
-}

+ 7 - 5
EVCB_OCPP.WSServer/Service/ConnectionLogdbService.cs

@@ -52,7 +52,7 @@ public class ConnectionLogdbService : IConnectionLogdbService
     private readonly QueueHandler<MachineLog> queueHandler;
     //private readonly string connectionLogdbConnectionString;
     private readonly Queue<string> _existTables = new();
-    private GroupSingleHandler<MachineLog> insertConnectonLogHandler;
+    private GroupHandler<MachineLog> insertConnectonLogHandler;
 
     public void WarmUpLog()
     {
@@ -135,7 +135,7 @@ public class ConnectionLogdbService : IConnectionLogdbService
             throw new Exception($"{nameof(InitInsertConnectonLogHandler)} should only called once");
         }
 
-        insertConnectonLogHandler = new GroupSingleHandler<MachineLog>(
+        insertConnectonLogHandler = new GroupHandler<MachineLog>(
             //BulkInsertWithBulkCopy,
             BundleInsertWithDapper,
             //loggerFactory.CreateLogger("InsertMeterValueHandler")
@@ -187,13 +187,13 @@ public class ConnectionLogdbService : IConnectionLogdbService
         }
     }
 
-    private async Task BundleInsertWithDapper(IEnumerable<MachineLog> parms)
+    private async Task BundleInsertWithDapper(BundleHandlerData<MachineLog> bundleHandlerData)
     {
         var watch = Stopwatch.StartNew();
         long t0, t1, t2, t3, t4;
         var workTime = DateTime.UtcNow;
 
-        var parmsList = parms.ToList();
+        var parmsList = bundleHandlerData.Datas.ToList();
 
         if (parmsList.Count == 0)
         {
@@ -211,6 +211,7 @@ public class ConnectionLogdbService : IConnectionLogdbService
                 logger.LogWarning("ConnectionLog InsertWithDapper {0}/{1}", t0, t1);
             }
             parmsList.Remove(candidate);
+            bundleHandlerData.AddCompletedData(candidate);
         }
 
         t0 = watch.ElapsedMilliseconds;
@@ -242,6 +243,7 @@ public class ConnectionLogdbService : IConnectionLogdbService
             await sqlConnection.ExecuteAsync(command, parameters
                 //, trans
                 );
+            bundleHandlerData.AddCompletedData(log);
         }
 
         t3 = watch.ElapsedMilliseconds;
@@ -251,7 +253,7 @@ public class ConnectionLogdbService : IConnectionLogdbService
         t4 = watch.ElapsedMilliseconds;
         if (t4 > 1000)
         {
-            logger.LogWarning("MachineLog Bundle Dapper {0}/{1}/{2}/{3}/{4}/{5}", t0, t1, t2, t3, t4, parms.Count());
+            logger.LogWarning("MachineLog Bundle Dapper {0}/{1}/{2}/{3}/{4}/{5}", t0, t1, t2, t3, t4, bundleHandlerData.Datas.Count());
         }
     }
 

+ 19 - 15
EVCB_OCPP.WSServer/Service/MainDbService.cs

@@ -80,9 +80,9 @@ public class MainDbService : IMainDbService
     //private string connectionString;
     private readonly QueueSemaphore startupSemaphore;
     private readonly SemaphoreSlim opSemaphore;
-    private GroupSingleHandler<StatusNotificationParam> statusNotificationHandler;
-    private GroupSingleHandler<UpdateMachineBasicInfoParam> updateMachineBasicInfoHandler;
-    private GroupSingleHandler<ServerMessage> addServerMessageHandler;
+    private GroupHandler<StatusNotificationParam> statusNotificationHandler;
+    private GroupHandler<UpdateMachineBasicInfoParam> updateMachineBasicInfoHandler;
+    private GroupHandler<ServerMessage> addServerMessageHandler;
 
     public async Task<MachineAndCustomerInfo> GetMachineIdAndCustomerInfo(string ChargeBoxId)
     {
@@ -376,7 +376,7 @@ public class MainDbService : IMainDbService
             throw new Exception($"{nameof(InitUpdateConnectorStatusHandler)} should only called once");
         }
 
-        statusNotificationHandler = new GroupSingleHandler<StatusNotificationParam>(
+        statusNotificationHandler = new GroupHandler<StatusNotificationParam>(
             handleFunc: BundleUpdateConnectorStatusDapper,
             logger: loggerFactory.CreateLogger("StatusNotificationHandler"),
             workerCnt: 1);
@@ -388,7 +388,7 @@ public class MainDbService : IMainDbService
             throw new Exception($"{nameof(InitAddServerMessageHandler)} should only called once");
         }
 
-        addServerMessageHandler = new GroupSingleHandler<ServerMessage>(
+        addServerMessageHandler = new GroupHandler<ServerMessage>(
             handleFunc: BundleAddServerMessage,
             logger: loggerFactory.CreateLogger("AddServerMessageHandler"));
     }
@@ -400,7 +400,7 @@ public class MainDbService : IMainDbService
             throw new Exception($"{nameof(InitUpdateMachineBasicInfoHandler)} should only called once");
         }
 
-        updateMachineBasicInfoHandler = new GroupSingleHandler<UpdateMachineBasicInfoParam>(
+        updateMachineBasicInfoHandler = new GroupHandler<UpdateMachineBasicInfoParam>(
             handleFunc: BundelUpdateMachineBasicInfo,
             logger: loggerFactory.CreateLogger("UpdateMachineBasicInfoHandler"),
             workerCnt: 10);
@@ -427,12 +427,12 @@ public class MainDbService : IMainDbService
         //using var semaphoreWrapper = await startupSemaphore.GetToken();
     }
 
-    private async Task BundelUpdateMachineBasicInfo(IEnumerable<UpdateMachineBasicInfoParam> pams)
+    private async Task BundelUpdateMachineBasicInfo(BundleHandlerData<UpdateMachineBasicInfoParam> bundleHandlerData)
     {
         using var db = await contextFactory.CreateDbContextAsync();
         using var trans = await db.Database.BeginTransactionAsync();
 
-        pams = pams.DistinctBy(x => x.ChargeBoxId);
+        var pams = bundleHandlerData.Datas.DistinctBy(x => x.ChargeBoxId);
 
         foreach (var pam in pams)
         {
@@ -450,6 +450,8 @@ public class MainDbService : IMainDbService
 
         await db.SaveChangesAsync();
         await trans.CommitAsync();
+
+        bundleHandlerData.CompletedDatas.AddRange(bundleHandlerData.Datas);
     }
 
     private async Task UpdateConnectorStatusEF(string Id, ConnectorStatus Status)
@@ -638,11 +640,11 @@ public class MainDbService : IMainDbService
 
 
 
-    private Task BundleUpdateConnectorStatusDapper(IEnumerable<StatusNotificationParam> statusNotifications)
+    private async Task BundleUpdateConnectorStatusDapper(BundleHandlerData<StatusNotificationParam> bundleHandlerData)
     {
-        using var conn = sqlConnectionFactory.Create();
+        using var conn = await sqlConnectionFactory.CreateAsync();
 
-        foreach (var status in statusNotifications)
+        foreach (var status in bundleHandlerData.Datas)
         {
 
             var parameters = new DynamicParameters();
@@ -654,7 +656,7 @@ public class MainDbService : IMainDbService
             parameters.Add("@VendorId", status.Status.VendorId, DbType.String, ParameterDirection.Input, 255);
             parameters.Add("@VendorErrorCode", status.Status.VendorErrorCode, DbType.String, ParameterDirection.Input, 100);
 
-            conn.Execute("""
+            await conn.ExecuteAsync("""
                 update ConnectorStatus
                 set
                 CreatedOn = @CreatedOn,
@@ -665,22 +667,24 @@ public class MainDbService : IMainDbService
                 VendorErrorCode = @VendorErrorCode
                 where Id = @Id
                 """, parameters);
+            bundleHandlerData.AddCompletedData(status);
         }
-        return Task.CompletedTask;
     }
 
-    private async Task BundleAddServerMessage(IEnumerable<ServerMessage> messages)
+    private async Task BundleAddServerMessage(BundleHandlerData<ServerMessage> bundleHandlerData)
     {
         using var db = await contextFactory.CreateDbContextAsync();
         using var trans = await db.Database.BeginTransactionAsync();
 
-        foreach (var message in messages)
+        foreach (var message in bundleHandlerData.Datas)
         {
             await db.ServerMessage.AddAsync(message);
         }
 
         await db.SaveChangesAsync();
         await trans.CommitAsync();
+
+        bundleHandlerData.CompletedDatas.AddRange(bundleHandlerData.Datas);
         //db.ChangeTracker.Clear();
     }
 

+ 10 - 7
EVCB_OCPP.WSServer/Service/MeterValueDbService.cs

@@ -22,7 +22,7 @@ public class MeterValueDbService
     private readonly QueueSemaphore insertSemaphore;
     private readonly ILogger logger;
     private readonly Queue<string> _existTables = new();
-    private GroupSingleHandler<InsertMeterValueParam> insertMeterValueHandler;
+    private GroupHandler<InsertMeterValueParam> insertMeterValueHandler;
 
     public MeterValueDbService(
         IDbContextFactory<MeterValueDBContext> meterValueDbContextFactory,
@@ -66,7 +66,8 @@ public class MeterValueDbService
 
     public Task InsertBundleAsync(IEnumerable<InsertMeterValueParam> param)
     {
-        return BundleInsertWithDapper(param);
+        return BundleInsertWithDapper(new BundleHandlerData<InsertMeterValueParam>(param.ToList()));
+        //return BundleInsertWithDapper(param);
     }
 
     public async Task<List<int>> GetTransactionSOC(int TxId, DateTime queryDate)
@@ -108,7 +109,7 @@ public class MeterValueDbService
             throw new Exception($"{nameof(InitInsertMeterValueHandler)} should only called once");
         }
 
-        insertMeterValueHandler = new GroupSingleHandler<InsertMeterValueParam>(
+        insertMeterValueHandler = new GroupHandler<InsertMeterValueParam>(
             //BulkInsertWithBulkCopy,
             BundleInsertWithDapper,
             //loggerFactory.CreateLogger("InsertMeterValueHandler")
@@ -180,18 +181,19 @@ public class MeterValueDbService
         }
     }
 
-    private async Task BundleInsertWithDapper(IEnumerable<InsertMeterValueParam> parms)
+    private async Task BundleInsertWithDapper(BundleHandlerData<InsertMeterValueParam> bundleHandlerData)
     {
         var watch = Stopwatch.StartNew();
         long t0, t1, t2, t3, t4;
 
-        var parmsList = parms.ToList();
-        foreach (var param in parms)
+        var parmsList = bundleHandlerData.Datas.ToList();
+        foreach (var param in bundleHandlerData.Datas)
         {
             if (!await GetTableExist(param.createdOn))
             {
                 await InsertWithStoredProcedure(param);
                 parmsList.Remove(param);
+                bundleHandlerData.AddCompletedData(param);
             }
         }
 
@@ -214,6 +216,7 @@ public class MeterValueDbService
                 await InsertWithNoCheckDapper(tableName, param, sqlConnection
                     //, trans
                     );
+                bundleHandlerData.AddCompletedData(param);
             }
         }
 
@@ -224,7 +227,7 @@ public class MeterValueDbService
         t4 = watch.ElapsedMilliseconds;
         if (t4 > 500)
         {
-            logger.LogWarning("MeterValue Dapper {0}/{1}/{2}/{3}/{4}/{5}", t0, t1, t2, t3, t4, parms.Count());
+            logger.LogWarning("MeterValue Dapper {0}/{1}/{2}/{3}/{4}/{5}", t0, t1, t2, t3, t4, bundleHandlerData.Datas.Count());
         }
     }