Browse Source

1. adjust GroupHandler
2. set CheckEVSEOnlineJob use GroupHandler

Robert 1 year ago
parent
commit
4966dfe2a5

+ 134 - 0
EVCB_OCPP.TaskScheduler/Helper/GroupHandler.cs

@@ -0,0 +1,134 @@
+using Microsoft.Extensions.Logging;
+using Microsoft.IdentityModel.Tokens;
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace EVCB_OCPP.TaskScheduler.Helper;
+
+public class GroupHandler<T>
+{
+    public GroupHandler(
+        Func<IEnumerable<T>, Task<BundleHandlerResult<T>>> handleFunc,
+        ILogger logger, int workerCnt = 1, int maxRetry = 10)
+    {
+        this.handleFunc = handleFunc;
+        this.logger = logger;
+
+        this.maxRetry = maxRetry;
+        workersLock = new SemaphoreSlim(workerCnt);
+        //singleWorkLock = new(_WorkerCnt);
+    }
+
+    private readonly Func<IEnumerable<T> , Task<BundleHandlerResult<T>>> handleFunc;
+    private readonly ILogger logger;
+    private readonly int maxRetry;
+    private readonly ConcurrentQueue<WaitParam<T>> waitList = new();
+
+    private SemaphoreSlim workersLock;// = new SemaphoreSlim(1);
+
+    public async Task HandleAsync(T param)
+    {
+        var waitData = new WaitParam<T>() { Data = param, Waiter = new SemaphoreSlim(0), Exception = null  };
+        waitList.Enqueue(waitData);
+        TryStartHandler();
+        await waitData.Waiter.WaitAsync();
+        if (waitData.Exception is not null)
+        {
+            throw waitData.Exception;
+        }
+    }
+
+    private void TryStartHandler()
+    {
+        if (!workersLock.Wait(0))
+        {
+            return;
+        }
+
+        if (waitList.Count == 0)
+        {
+            workersLock.Release();
+            return;
+        }
+
+        _ = StartHandleTask();
+    }
+
+    private async Task StartHandleTask()
+    {
+        var timer = Stopwatch.StartNew();
+        List<long> times = new();
+
+        var requests = new List<WaitParam<T>>();
+
+        while (waitList.TryDequeue(out var handle))
+        {
+            requests.Add(handle);
+        }
+        times.Add(timer.ElapsedMilliseconds);
+
+        int cnt = 0;
+        Exception lastException = null;
+        var datas = requests.Select(x => x.Data).ToList();
+
+        for (; cnt < maxRetry; cnt ++)
+        {
+            var result = await handleFunc(datas);
+            var completedRequests = requests.Where(x => result.CompletedDatas.Contains(x.Data)).ToList();
+            foreach (var request in completedRequests)
+            {
+                request.Waiter.Release();
+            }
+
+            datas = datas.Except(result.CompletedDatas).ToList();
+            lastException = result.Exception;
+            if (datas.IsNullOrEmpty())
+            {
+                break;
+            }
+            logger.LogError(lastException.Message);
+            times.Add(timer.ElapsedMilliseconds);
+        }
+
+        var uncompletedRequests = requests.Where(x => datas.Contains(x.Data)).ToList();
+        foreach(var request in uncompletedRequests)
+        {
+            request.Exception = lastException;
+            request.Waiter.Release();
+        }
+        workersLock.Release();
+
+        timer.Stop();
+        if (timer.ElapsedMilliseconds > 1000)
+        {
+            logger.LogWarning($"StartHandleTask {string.Join("/", times)}");
+        }
+
+        TryStartHandler();
+    }
+}
+
+public class BundleHandlerResult<T>
+{
+    public IEnumerable<T> CompletedDatas { get; set; }
+    public Exception Exception { get; set; }
+
+    public BundleHandlerResult(IEnumerable<T> completedDatas, Exception exception)
+    {
+        CompletedDatas = completedDatas;
+        Exception = exception;
+    }
+}
+
+internal class WaitParam<T>
+{
+    public T Data { get; init; }
+    public SemaphoreSlim Waiter { get; init; }
+    public Exception Exception { get; set; }
+}

+ 0 - 120
EVCB_OCPP.TaskScheduler/Helper/GroupSingleHandler.cs

@@ -1,120 +0,0 @@
-using Microsoft.Extensions.Logging;
-using System;
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.Diagnostics;
-using System.Linq;
-using System.Text;
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace EVCB_OCPP.TaskScheduler.Helper;
-
-public class GroupSingleHandler<T>
-{
-    public GroupSingleHandler(Func<IEnumerable<T>, Task> handleFunc, ILogger logger, int workerCnt = 1)
-    {
-        this.handleFunc = handleFunc;
-        this.logger = logger;
-
-        WorkerCnt = workerCnt;
-        //singleWorkLock = new(_WorkerCnt);
-    }
-
-    private int _WorkerCnt = 1;
-    public int WorkerCnt
-    {
-        get => _WorkerCnt;
-        set
-        {
-            if (IsStarted)
-            {
-                throw new Exception($"{nameof(WorkerCnt)} must not be changed afted {nameof(HandleAsync)} is called");
-            }
-
-            _WorkerCnt = value;
-            singleWorkLock = new(_WorkerCnt);
-        }
-    }
-
-    private readonly Func<IEnumerable<T>, Task> handleFunc;
-    private readonly ILogger logger;
-    private readonly ConcurrentQueue<(T param, SemaphoreSlim waitLock)> waitList = new();
-    private SemaphoreSlim singleWorkLock;// = new SemaphoreSlim(1);
-    private bool IsStarted = false;
-    private Task singleHandleTask;
-
-    public Task HandleAsync(T param)
-    {
-        IsStarted = true;
-
-        SemaphoreSlim reqLock = new(0);
-        waitList.Enqueue((param, reqLock));
-        TryStartHandler();
-        return reqLock.WaitAsync();
-    }
-
-    private void TryStartHandler()
-    {
-        if (!singleWorkLock.Wait(0))
-        {
-            return;
-        }
-
-        if (waitList.Count == 0)
-        {
-            singleWorkLock.Release();
-            return;
-        }
-
-        singleHandleTask = StartHandleTask();
-    }
-
-    private async Task StartHandleTask()
-    {
-        var timer = Stopwatch.StartNew();
-        long t0 = 0, t1 = 0, t2 = 0;
-
-        var handleList = new List<(T param, SemaphoreSlim waitLock)>();
-
-        while (waitList.TryDequeue(out var handle))
-        {
-            handleList.Add(handle);
-        }
-        t0 = timer.ElapsedMilliseconds;
-
-        int cnt = 0;
-        do
-        {
-            cnt++;
-            try
-            {
-                var task = handleFunc(handleList.Select(x => x.param));
-                await task;
-                t1 = timer.ElapsedMilliseconds;
-                break;
-            }
-            catch (Exception e)
-            {
-                logger.LogError(e, "Trying Cnt {0}", cnt);
-                logger.LogError(e.Message);
-            }
-        }
-        while (true);
-
-        foreach (var handled in handleList)
-        {
-            handled.waitLock.Release();
-        }
-        singleWorkLock.Release();
-
-        timer.Stop();
-        t2 = timer.ElapsedMilliseconds;
-        if (t2 > 1000)
-        {
-            logger.LogWarning("StartHandleTask {0}/{1}/{2}", t0, t1, t2);
-        }
-
-        TryStartHandler();
-    }
-}

+ 7 - 2
EVCB_OCPP.TaskScheduler/Jobs/CheckEVSEOnlineJob.cs

@@ -27,11 +27,13 @@ namespace EVCB_OCPP.TaskScheduler.Jobs
         public CheckEVSEOnlineJob(
             ILogger<CheckEVSEOnlineJob> logger,
             OnlineLogDbService onlineLogDbService,
+            MainDbService mainDbService,
             SqlConnectionFactory<MainDBContext> mainDbConnectionFactory,
             SqlConnectionFactory<OnlineLogDBContext> onlineLogDbConnectionFactory)
         {
             this.logger = logger;
             this.onlineLogDbService = onlineLogDbService;
+            this.mainDbService = mainDbService;
             this.mainDbConnectionFactory = mainDbConnectionFactory;
             this.onlineLogDbConnectionFactory = onlineLogDbConnectionFactory;
             //this.mainDBConnectString = configuration.GetConnectionString("MainDBContext");
@@ -40,6 +42,7 @@ namespace EVCB_OCPP.TaskScheduler.Jobs
 
         private readonly ILogger logger;
         private readonly OnlineLogDbService onlineLogDbService;
+        private readonly MainDbService mainDbService;
         private readonly SqlConnectionFactory<MainDBContext> mainDbConnectionFactory;
         private readonly SqlConnectionFactory<OnlineLogDBContext> onlineLogDbConnectionFactory;
 
@@ -68,7 +71,8 @@ namespace EVCB_OCPP.TaskScheduler.Jobs
                         for (int evseIndex = 0; evseIndex < _EVSEs.Count; evseIndex++)
                         {
                             var evse = _EVSEs[evseIndex];
-                            saveTasks.Add(UpdateEVSECurrentStatus(evse.CustomerId.ToString(), evse.ChargeBoxId, true, DefaultSetting.DefaultNullTime));
+                            //saveTasks.Add(UpdateEVSECurrentStatus(evse.CustomerId.ToString(), evse.ChargeBoxId, true, DefaultSetting.DefaultNullTime));
+                            saveTasks.Add(mainDbService.UpdateEvseOnlineStatus(evse.ChargeBoxId, online: true));
                             //saveTasks.Add(UpdateOnlineRecords(evse.ChargeBoxId, true, evse.HeartbeatUpdatedOn, null));
                             saveTasks.Add(onlineLogDbService.InsertOnlineLog(evse.ChargeBoxId, evse.HeartbeatUpdatedOn));
                         }
@@ -80,7 +84,8 @@ namespace EVCB_OCPP.TaskScheduler.Jobs
                         for (int evseIndex = 0; evseIndex < _EVSEs.Count; evseIndex++)
                         {
                             var evse = _EVSEs[evseIndex];
-                            saveTasks.Add(UpdateEVSECurrentStatus(evse.CustomerId.ToString(), evse.ChargeBoxId, false, evse.HeartbeatUpdatedOn));
+                            //saveTasks.Add(UpdateEVSECurrentStatus(evse.CustomerId.ToString(), evse.ChargeBoxId, false, evse.HeartbeatUpdatedOn));
+                            saveTasks.Add(mainDbService.UpdateEvseOnlineStatus(evse.ChargeBoxId, online: false, offlineTime: evse.HeartbeatUpdatedOn));
                             saveTasks.Add(onlineLogDbService.UpdateOnlineLog(evse.ChargeBoxId, evse.HeartbeatUpdatedOn));
                         }
                     }

+ 1 - 0
EVCB_OCPP.TaskScheduler/Program.cs

@@ -41,6 +41,7 @@ namespace EVCB_OCPP.TaskScheduler
                     services.AddTransient<ICustomerService, CommonCustomerService>();
                     services.AddSingleton<ICustomersService, CustomersService>();
 
+                    services.AddSingleton<MainDbService>();
                     services.AddSingleton<OnlineLogDbService>();
                     services.AddSingleton<DatabaseService>();
                     //services.AddTransient<OuterHttpClient>();

+ 0 - 1
EVCB_OCPP.TaskScheduler/Services/DatabaseService.cs

@@ -590,7 +590,6 @@ public class DatabaseService
         string sqlstatement = "SELECT [CarNum] FROM VehicleCustomer where IdTag = '" + startIdTag + "'";
         using (var dbConn = await webDbConnectionFactory.CreateAsync())
         {
-            dbConn.Open();
             toReturn = await dbConn.ExecuteScalarAsync<string>(sqlstatement);
         }
 

+ 95 - 0
EVCB_OCPP.TaskScheduler/Services/MainDbService.cs

@@ -0,0 +1,95 @@
+using Dapper;
+using EVCB_OCPP.Domain;
+using EVCB_OCPP.TaskScheduler.Helper;
+using EVCB_OCPP.TaskScheduler.Models;
+using Microsoft.Extensions.Logging;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace EVCB_OCPP.TaskScheduler.Services;
+
+public class MainDbService
+{
+    public MainDbService(
+        SqlConnectionFactory<MainDBContext> connectionFactory
+        , ILogger<MainDbService> logger)
+    {
+        this.sqlConnectionFactory = connectionFactory;
+        this.logger = logger;
+
+        InitUpdateEvseOnlineStatusHandler();
+    }
+
+    private readonly SqlConnectionFactory<MainDBContext> sqlConnectionFactory;
+    private readonly ILogger<MainDbService> logger;
+    private GroupHandler<UpdateEvseOnlineModel> updateEvseOnlineStatusHandler;
+
+    public Task UpdateEvseOnlineStatus(string ChargeBoxId, bool online, DateTime? offlineTime = null)
+    {
+        return updateEvseOnlineStatusHandler.HandleAsync(new UpdateEvseOnlineModel() { ChargeBoxId = ChargeBoxId, online = online, offlineTime = offlineTime });
+    }
+
+    private void InitUpdateEvseOnlineStatusHandler()
+    {
+        if (updateEvseOnlineStatusHandler is not null)
+        {
+            throw new Exception($"{nameof(InitUpdateEvseOnlineStatusHandler)} should only called once");
+        }
+
+        updateEvseOnlineStatusHandler = new GroupHandler<UpdateEvseOnlineModel>(
+            //BulkInsertWithBulkCopy,
+            BundleUpdateEvseOnlineStatus,
+            //loggerFactory.CreateLogger("InsertMeterValueHandler")
+            logger,
+            workerCnt: 10
+            );
+    }
+
+    private async Task<BundleHandlerResult<UpdateEvseOnlineModel>> BundleUpdateEvseOnlineStatus(IEnumerable<UpdateEvseOnlineModel> datas)
+    {
+        List<UpdateEvseOnlineModel> completedDatas = new();
+        Exception exception = null;
+
+        using var dbConn = await sqlConnectionFactory.CreateAsync();
+        foreach (var data in datas)
+        {
+            try
+            {
+                string sqlString = $"""
+                UPDATE dbo.Machine
+                SET Online=@Online {(data.online? "": ", OfflineOn = @OfflineOn")}
+                WHERE chargeBoxId = @ChargeBoxId
+                """;
+
+                var parameters = new DynamicParameters();
+                parameters.Add("@Online", data.online, System.Data.DbType.Boolean);
+                parameters.Add("@ChargeBoxId", data.ChargeBoxId, System.Data.DbType.String, System.Data.ParameterDirection.Input, 50);
+
+                if (!data.online)
+                {
+                    parameters.Add("@OfflineOn", data.offlineTime, System.Data.DbType.DateTime);
+                }
+
+                await dbConn.ExecuteAsync(sqlString, parameters);
+                completedDatas.Add(data);
+            }
+            catch (Exception ex)
+            {
+                exception = ex;
+                logger.LogError("Update Data Error " + ex.ToString());
+                break;
+            }
+        }
+        return new BundleHandlerResult<UpdateEvseOnlineModel>(completedDatas, exception);
+    }
+}
+
+public record UpdateEvseOnlineModel
+{
+    public string ChargeBoxId;
+    public bool online;
+    public DateTime? offlineTime;
+}

+ 49 - 21
EVCB_OCPP.TaskScheduler/Services/OnlineLogDbService.cs

@@ -31,7 +31,7 @@ public class OnlineLogDbService
             throw new Exception($"{nameof(InitUpdateOnlineLogHandler)} should only called once");
         }
 
-        updateOnlineLogHandler = new GroupSingleHandler<EVSEOnlineRecord>(
+        updateOnlineLogHandler = new GroupHandler<EVSEOnlineRecord>(
             //BulkInsertWithBulkCopy,
             BundleUpdateWithDapper,
             //loggerFactory.CreateLogger("InsertMeterValueHandler")
@@ -47,7 +47,7 @@ public class OnlineLogDbService
             throw new Exception($"{nameof(InitInsertOnlineLogHandler)} should only called once");
         }
 
-        insertOnlineLogHandler = new GroupSingleHandler<EVSEOnlineRecord>(
+        insertOnlineLogHandler = new GroupHandler<EVSEOnlineRecord>(
             //BulkInsertWithBulkCopy,
             BundleInsertWithDapper,
             //loggerFactory.CreateLogger("InsertMeterValueHandler")
@@ -59,8 +59,8 @@ public class OnlineLogDbService
     private readonly SqlConnectionFactory<OnlineLogDBContext> connectionFactory;
     private readonly ILogger<OnlineLogDbService> logger;
 
-    private GroupSingleHandler<EVSEOnlineRecord> insertOnlineLogHandler;
-    private GroupSingleHandler<EVSEOnlineRecord> updateOnlineLogHandler;
+    private GroupHandler<EVSEOnlineRecord> insertOnlineLogHandler;
+    private GroupHandler<EVSEOnlineRecord> updateOnlineLogHandler;
 
     internal Task UpdateOnlineLog(string chargeBoxId, DateTime offlineTime)
     {
@@ -100,8 +100,11 @@ public class OnlineLogDbService
         await dbConn.ExecuteAsync(sqlString, parameters);
     }
 
-    private async Task BundleUpdateWithDapper(IEnumerable<EVSEOnlineRecord> records)
+    private async Task<BundleHandlerResult<EVSEOnlineRecord>> BundleUpdateWithDapper(IEnumerable<EVSEOnlineRecord> records)
     {
+        List<EVSEOnlineRecord> comtpetedRecords = new();
+        Exception exception = null; 
+
         string sqlString = """
             UPDATE dbo.EVSEOnlineRecord 
             SET OfflineTime=@OfflineTime 
@@ -113,32 +116,57 @@ public class OnlineLogDbService
             )
             """;
 
-        using var dbConn = await connectionFactory.CreateAsync();
-        foreach (var record in records)
+        try
         {
-            var parameters = new DynamicParameters();
-            parameters.Add("@ChargeBoxId", record.ChargeBoxId, System.Data.DbType.String, System.Data.ParameterDirection.Input, 36);
-            parameters.Add("@OfflineTime", record.OfflineTime, System.Data.DbType.DateTime);
-            //parameters.Add("@Id", record.Id, System.Data.DbType.Int64);
-            await dbConn.ExecuteAsync(sqlString, parameters);
+            using var dbConn = await connectionFactory.CreateAsync();
+            foreach (var record in records)
+            {
+                var parameters = new DynamicParameters();
+                parameters.Add("@ChargeBoxId", record.ChargeBoxId, System.Data.DbType.String, System.Data.ParameterDirection.Input, 36);
+                parameters.Add("@OfflineTime", record.OfflineTime, System.Data.DbType.DateTime);
+                //parameters.Add("@Id", record.Id, System.Data.DbType.Int64);
+                await dbConn.ExecuteAsync(sqlString, parameters);
+                comtpetedRecords.Add(record);
+            }
         }
+        catch (Exception e)
+        {
+            exception = e;
+        }
+        
+        return new BundleHandlerResult<EVSEOnlineRecord>(comtpetedRecords, exception);
     }
 
-    private async Task BundleInsertWithDapper(IEnumerable<EVSEOnlineRecord> records)
+    private async Task<BundleHandlerResult<EVSEOnlineRecord>> BundleInsertWithDapper(IEnumerable<EVSEOnlineRecord> records)
     {
+        List<EVSEOnlineRecord> completedDatas = new();
+        Exception exception = null;
+
         string sqlString = """
             INSERT INTO dbo.EVSEOnlineRecord ("ChargeBoxId","OnlineTime","OfflineTime")
             VALUES( @ChargeBoxId, @OnlineTime, @OfflineTime);
             """;
-
-        using var dbConn = await connectionFactory.CreateAsync();
-        foreach (var record in records)
+        try
         {
-            var parameters = new DynamicParameters();
-            parameters.Add("@ChargeBoxId", record.ChargeBoxId, System.Data.DbType.String, System.Data.ParameterDirection.Input, 36);
-            parameters.Add("@OnlineTime", record.OnlineTime, System.Data.DbType.DateTime);
-            parameters.Add("@OfflineTime", DefaultSetting.DefaultNullTime, System.Data.DbType.DateTime);
-            await dbConn.ExecuteAsync(sqlString, parameters);
+            using var dbConn = await connectionFactory.CreateAsync();
+            foreach (var record in records)
+            {
+                var parameters = new DynamicParameters();
+                parameters.Add("@ChargeBoxId", record.ChargeBoxId, System.Data.DbType.String, System.Data.ParameterDirection.Input, 36);
+                parameters.Add("@OnlineTime", record.OnlineTime, System.Data.DbType.DateTime);
+                parameters.Add("@OfflineTime", DefaultSetting.DefaultNullTime, System.Data.DbType.DateTime);
+                await dbConn.ExecuteAsync(sqlString, parameters);
+                completedDatas.Add(record);
+            }
+        }
+        catch (Exception e)
+        {
+            logger.LogError(e.Message);
+            logger.LogError(e.StackTrace);
+
+            exception = e;
         }
+        
+        return new BundleHandlerResult<EVSEOnlineRecord>(completedDatas, exception);
     }
 }