#include "Module_OcppBackend.h" typedef enum boolean { FALSE, TRUE } BOOL; struct lws *wsi_client; struct lws_context *context; static int req_SendQueue = 0; pthread_t tid_connectServer; pthread_t tid_ProcQueue; pthread_t tid_Watchdog; struct StartTime { unsigned int connect; unsigned int bootNotification; }startTime; //========================================== // Function prototype //========================================== void ReceivedMessage(void *in, size_t len); int SendBufLen=0;//(1024*4);//(1024*3); unsigned char SendBuffer[1024*4]={0}; static int ConnectionEstablished=0; static int TransactionMessageResend = 1; // the number of retry to submit a transaction-related message when the Central System fails to process it. static int TransactionQueueNum = 0; static int OfflineTransactionQueueNum = 0; // Number of offline transactions static int OfflineTransaction = 0; static int IsUsing = FALSE; int defaultWaitingTime = 10; //10 second char OcppPath[160]={0}; char OcppProtocol[10]={0},OcppHost[50]={0}, OcppTempPath[50]={0}; int OcppPort=0; unsigned char StartTransactionIdTagTemp[20]={0}; uint32_t startTimeDog; uint32_t startTimeQueue; uint8_t isWebsocketSendable = 1; uint8_t counterLwsRestart = 0;; uint8_t counterQueueSent = 0; sqlite3 *db; char *errMsg = NULL; static char *createsql = "CREATE TABLE IF NOT EXISTS log_buffer(" "idx integer primary key," "user_id text," "cmd_sn text," "charger_id text," "gun_type text," "gun_no text," "rfid_no text," "stime text," "etime text," "time_len text," "s_soc text," "e_soc text," "stop_reason text," "power text," "meter_before text," "meter_after text," "charge_price text," "reserve text," "surplus_before text," "surplus_after text," "service_price text," "is_pay text," "charge_strategy text," "charge_parameter text," "vin text," "vehicle_no text," "start_method text," "card_type text," "is_upload text," "guid text UNIQUE," "is_buf2OK text);"; static char *sqlOcppAuthCache = "create table if not exists ocpp_auth_cache (idx integer primary key," "idtag text UNIQUE," "parent_idtag text," "expir_date text," "status text);"; static char *sqlOcppAuthLocal = "create table if not exists ocpp_auth_local (idx integer primary key," "idtag text UNIQUE," "parent_idtag text," "expir_date text," "status text," "version text);"; //================================= // Common routine //================================= int GetTransactionQueueNum(void) { return TransactionQueueNum; } //========================================== // Web socket tranceive routine //========================================== int SendData(struct lws *wsi) { int n; int len; unsigned char out[LWS_SEND_BUFFER_PRE_PADDING + 4096 + LWS_SEND_BUFFER_POST_PADDING] = {0}; // Only disable isWebsocketSendable operation initiated by charger if((strstr((char*)SendBuffer, "\"Authorize\"") != NULL) || (strstr((char*)SendBuffer, "\"BootNotification\"") != NULL) || (strstr((char*)SendBuffer, "\"DataTransfer\"") != NULL) || (strstr((char*)SendBuffer, "\"DiagnosticsStatusNotification\"") != NULL) || (strstr((char*)SendBuffer, "\"FirmwareStatusNotification\"") != NULL) || (strstr((char*)SendBuffer, "\"Heartbeat\"") != NULL) || (strstr((char*)SendBuffer, "\"MeterValues\"") != NULL) || (strstr((char*)SendBuffer, "\"StartTransaction\"") != NULL) || (strstr((char*)SendBuffer, "\"StatusNotification\"") != NULL) || (strstr((char*)SendBuffer, "\"StopTransaction\"") != NULL)) { isWebsocketSendable = 0; } len = strlen((char *)SendBuffer); if(len == 0) return 0; memcpy (out + LWS_SEND_BUFFER_PRE_PADDING, SendBuffer, len ); DEBUG_OCPPMESSAGE_INFO("===========> %s\n", out + LWS_SEND_BUFFER_PRE_PADDING); n = lws_write(wsi, out + LWS_SEND_BUFFER_PRE_PADDING, len, LWS_WRITE_TEXT); memset(SendBuffer, 0, len); SendBufLen = 0; return n; } static int OCPP16Callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { char buf[256]={0}, hash[20]={0}, key_b64[40]={0}, tempin[65536]={0}, sstr[65536]={0}; uint8_t auth_b64[256]={0}, boxId[128]={0}, password[64]={0}; int c = 0; char *loc; switch (reason) { case LWS_CALLBACK_PROTOCOL_INIT: DEBUG_INFO("LWS_CALLBACK_PROTOCOL_INIT\n"); break; case LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH: DEBUG_INFO("LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH\n"); DEBUG_OCPPMESSAGE_INFO("----- Handshake: Client Request START -----\n"); lws_hdr_copy(wsi, buf, ARRAY_SIZE(buf) - 1, _WSI_TOKEN_CLIENT_URI); DEBUG_OCPPMESSAGE_INFO("GET %s HTTP/1.1 \n", buf); lws_hdr_copy(wsi, buf, ARRAY_SIZE(buf) - 1, _WSI_TOKEN_CLIENT_HOST); DEBUG_OCPPMESSAGE_INFO("Host: %s\n", buf); DEBUG_OCPPMESSAGE_INFO("Upgrade: websocket\n"); DEBUG_OCPPMESSAGE_INFO("Connection: Upgrade\n"); lws_b64_encode_string(hash, 16, key_b64, ARRAY_SIZE(key_b64));// Sec-WebSocket-Key DEBUG_OCPPMESSAGE_INFO("Sec-WebSocket-Key: %s\n", key_b64); lws_hdr_copy(wsi, buf, ARRAY_SIZE(buf) - 1, _WSI_TOKEN_CLIENT_SENT_PROTOCOLS); DEBUG_OCPPMESSAGE_INFO("Sec-WebSocket-Protocol: %s\n", buf); DEBUG_OCPPMESSAGE_INFO("Sec-WebSocket-Version: %d\n", SPEC_LATEST_SUPPORTED); DEBUG_OCPPMESSAGE_INFO("----- Handshake: Client Request END -----\n"); DEBUG_OCPPMESSAGE_INFO("----- Handshake: Server response START -----\n"); lws_hdr_copy(wsi, buf, ARRAY_SIZE(buf) - 1, WSI_TOKEN_HTTP); DEBUG_OCPPMESSAGE_INFO("HTTP/1.1 %s\n", buf); lws_hdr_copy(wsi, buf, ARRAY_SIZE(buf) - 1, WSI_TOKEN_UPGRADE); DEBUG_OCPPMESSAGE_INFO("Upgrade: %s\n", buf); lws_hdr_copy(wsi, buf, ARRAY_SIZE(buf) - 1, WSI_TOKEN_CONNECTION); DEBUG_OCPPMESSAGE_INFO("Connection: %s\n", buf); lws_hdr_copy(wsi, buf, ARRAY_SIZE(buf) - 1, WSI_TOKEN_ACCEPT); DEBUG_OCPPMESSAGE_INFO("Sec-WebSocket-Accept: %s\n", buf); lws_hdr_copy(wsi, buf, ARRAY_SIZE(buf) - 1, WSI_TOKEN_PROTOCOL); DEBUG_OCPPMESSAGE_INFO("Sec-WebSocket-Protocol: %s\n", buf); DEBUG_OCPPMESSAGE_INFO("----- Handshake: Server response END -----\n"); SetOcppVersion((uint8_t*)buf); break; case LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION: DEBUG_INFO("LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION\n"); break; case LWS_CALLBACK_WSI_DESTROY: DEBUG_INFO("LWS_CALLBACK_WSI_DESTROY\n"); pthread_detach(tid_connectServer); SetServerSign(FALSE); ConnectionEstablished = 0; context = NULL; break; case LWS_CALLBACK_LOCK_POLL: break; case LWS_CALLBACK_ADD_POLL_FD: DEBUG_INFO("LWS_CALLBACK_ADD_POLL_FD\n"); break; case LWS_CALLBACK_DEL_POLL_FD: DEBUG_INFO("LWS_CALLBACK_DEL_POLL_FD\n"); break; case LWS_CALLBACK_UNLOCK_POLL: break; case LWS_CALLBACK_CHANGE_MODE_POLL_FD: break; case LWS_CALLBACK_WSI_CREATE: DEBUG_INFO("LWS_CALLBACK_WSI_CREATE\n"); break; case LWS_CALLBACK_GET_THREAD_ID: break; case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER: DEBUG_INFO("LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER\n"); unsigned char** pos = (unsigned char**)in; unsigned char* end = (*pos) + len; switch(GetOcppSecurityProfile()) { case 1: case 2: case 3: GetOcppChargerBoxId(boxId); GetOcppSecurityPassword(password); sprintf(buf, "%s:%s", boxId, password); lws_b64_encode_string(buf, strlen(buf), (char*)auth_b64, ARRAY_SIZE(auth_b64)); sprintf(buf, "Basic %s", auth_b64); if (lws_add_http_header_by_token(wsi, WSI_TOKEN_HTTP_AUTHORIZATION, (uint8_t *)buf, strlen(buf), pos, end)) { DEBUG_ERROR("lws_add_http_header_by_token : WSI_TOKEN_HTTP_AUTHORIZATION\n"); return -1; } DEBUG_OCPPMESSAGE_INFO("Authorization: %s\n", buf); break; case 0: default: break; } break; case LWS_CALLBACK_CLIENT_ESTABLISHED: //3 DEBUG_INFO("LWS_CALLBACK_CLIENT_ESTABLISHED\n"); char frontUUID[100] ={0}; char frontData[QUEUE_MESSAGE_LENGTH] ={0}; int queueNotEmpty = FALSE; //connected ConnectionEstablished=1; SetOcppConnStatus(TRUE); queueNotEmpty = queue_operation(QUEUE_OPERATION_SHOWFRONT,frontUUID, frontData); if(queueNotEmpty == TRUE) { OfflineTransaction = 1; // 0: no packets in queue. 1: There are packets in queue. } TransactionMessageResend = 1; //get offline number queue_operation(QUEUE_OPERATION_SHOWQUEUE,"",""); OfflineTransactionQueueNum =TransactionQueueNum ; break; case LWS_CALLBACK_CLIENT_CONNECTION_ERROR://1 DEBUG_ERROR("LWS_CALLBACK_CLIENT_CONNECTION_ERROR %s\n", (char *)in ); //disconnected ConnectionEstablished=0; DEBUG_OCPPMESSAGE_INFO("===== Handshake: Client START =====\n"); lws_hdr_copy(wsi, buf, ARRAY_SIZE(buf) - 1, _WSI_TOKEN_CLIENT_URI); DEBUG_OCPPMESSAGE_INFO("GET %s HTTP/1.1 \n", buf); lws_hdr_copy(wsi, buf, ARRAY_SIZE(buf) - 1, _WSI_TOKEN_CLIENT_HOST); DEBUG_OCPPMESSAGE_INFO("Host: %s\n", buf); DEBUG_OCPPMESSAGE_INFO("Upgrade: websocket\n"); DEBUG_OCPPMESSAGE_INFO("Connection: Upgrade\n"); lws_b64_encode_string(hash, 16, key_b64, ARRAY_SIZE(key_b64));// Sec-WebSocket-Key DEBUG_OCPPMESSAGE_INFO("Sec-WebSocket-Key: %s\n", key_b64); lws_hdr_copy(wsi, buf, ARRAY_SIZE(buf) - 1, _WSI_TOKEN_CLIENT_SENT_PROTOCOLS); DEBUG_OCPPMESSAGE_INFO("Sec-WebSocket-Protocol: %s\n", buf); DEBUG_OCPPMESSAGE_INFO("Sec-WebSocket-Version: %d\n", SPEC_LATEST_SUPPORTED); DEBUG_OCPPMESSAGE_INFO("===== Handshake: Client END =====\n"); DEBUG_OCPPMESSAGE_INFO("===== Handshake: Server response START =====\n"); lws_hdr_copy(wsi, buf, ARRAY_SIZE(buf) - 1, WSI_TOKEN_HTTP); DEBUG_OCPPMESSAGE_INFO("HTTP/1.1 %s\n", buf); lws_hdr_copy(wsi, buf, ARRAY_SIZE(buf) - 1, WSI_TOKEN_UPGRADE); DEBUG_OCPPMESSAGE_INFO("Upgrade: %s\n", buf); lws_hdr_copy(wsi, buf, ARRAY_SIZE(buf) - 1, WSI_TOKEN_CONNECTION); DEBUG_OCPPMESSAGE_INFO("Connection: %s\n", buf); lws_hdr_copy(wsi, buf, ARRAY_SIZE(buf) - 1, WSI_TOKEN_ACCEPT); DEBUG_OCPPMESSAGE_INFO("Sec-WebSocket-Accept: %s\n", buf); lws_hdr_copy(wsi, buf, ARRAY_SIZE(buf) - 1, WSI_TOKEN_PROTOCOL); DEBUG_OCPPMESSAGE_INFO("Sec-WebSocket-Protocol: %s\n", buf); DEBUG_OCPPMESSAGE_INFO("===== Handshake: Server response END =====\n"); break; case LWS_CALLBACK_CLOSED://4 DEBUG_INFO("LWS_CALLBACK_CLOSED\n"); //disconnected ConnectionEstablished=0; break; case LWS_CALLBACK_CLIENT_WRITEABLE://10 //if(need to send message and its relevant data already store into SendBuffer) SendData(wsi); //lws_rx_flow_control( wsi, 1 ); break; case LWS_CALLBACK_CLIENT_RECEIVE://8 ((char *)in)[len] = '\0'; DEBUG_OCPPMESSAGE_INFO("<===== %s\n", (char *)in); //**********Receive Message**********/ c = 0; loc = strstr((const char *)in, "][2,"); if(loc == NULL) { loc = strstr((const char *)in, "][3,"); if(loc == NULL) { loc = strstr((const char *)in, "][4,"); } } memset(sstr, 0, ARRAY_SIZE(sstr) ); if(loc != NULL) { DEBUG_INFO("There are continuous second packet []\n"); while (loc[1+c] != '\0') { sstr[c] = loc[1+c]; c++; } sstr[c] = '\0'; strcpy(tempin, sstr); DEBUG_INFO("Final Receive: %s\n", tempin); } else { strcpy(tempin,(char *)in); } ReceivedMessage((void *)strtrim(tempin), strlen(tempin)); isWebsocketSendable = 1; break; case LWS_CALLBACK_CLIENT_RECEIVE_PONG: DEBUG_INFO("LWS_CALLBACK_CLIENT_RECEIVE_PONG\n"); break; case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION: DEBUG_INFO("LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION\n"); break; case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS: DEBUG_INFO("LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS\n"); break; case LWS_CALLBACK_PROTOCOL_DESTROY: DEBUG_INFO("LWS_CALLBACK_PROTOCOL_DESTROY\n"); break; case LWS_CALLBACK_RECEIVE_PONG: DEBUG_INFO("LWS_CALLBACK_RECEIVE_PONG\n"); break; default: DEBUG_INFO("Reason = %d\n", reason); break; } return 0; } static struct lws_protocols protocols[] = { { "ocpp1.6", OCPP16Callback, 65536,//65536,//10240, 65536,//65536,//10240, }, { "ocpp2.0", OCPP16Callback, 65536,//65536,//10240, 65536,//65536,//10240, }, { "ocpp1.6,ocpp2.0", OCPP16Callback, 65536,//65536,//10240, 65536,//65536,//10240, }, { NULL, NULL, 0 /* End of list */ } }; void* ConnectWsServer(void* data) //int ConnectWsServer() { struct lws_context_creation_info ContextInfo; struct lws_client_connect_info ConnInfo; int use_ssl=0; // If internet available synchronize datetime with ntp server if(GetInternetConn() == 1) { system("pkill ntpd"); DEBUG_INFO("NTP synchronize with Microsoft\n", system("/usr/sbin/ntpd -nqp time.windows.com &")); DEBUG_INFO("NTP synchronize with China\n", system("/usr/sbin/ntpd -nqp cn.ntp.org.cn &")); DEBUG_INFO("NTP synchronize with Taiwan\n", system("/usr/sbin/ntpd -nqp tock.stdtime.gov.tw &")); DEBUG_INFO("NTP synchronize with Europe\n", system("/usr/sbin/ntpd -nqp 0.europe.pool.ntp.org &")); } if(context!=NULL) { pthread_detach(pthread_self()); lws_context_destroy(context); ConnectionEstablished=0; context = NULL; } memset(&ContextInfo, 0, sizeof(struct lws_context_creation_info)); if((GetOcppServerURL()==0) || (GetOcppPort() == 0) || (GetOcppPath()==0)) { DEBUG_ERROR("OCPP URL is NULL or OCPP Port is zero or OCPP Path is NULL\n"); goto end; } if((strcmp(OcppProtocol,"ws")==0)&&(strlen(OcppProtocol)== 2)) { DEBUG_INFO("Web socket is non-security mode.\n"); use_ssl=0; } else if((strcmp(OcppProtocol,"wss")==0)&&(strlen(OcppProtocol)== 3)) { DEBUG_INFO("Web socket is security mode.\n"); use_ssl=1; } ContextInfo.port = CONTEXT_PORT_NO_LISTEN; ContextInfo.iface = NULL; ContextInfo.ssl_private_key_password = NULL; ContextInfo.ssl_cert_filepath = NULL;//"./ssl_key/client_cert.pem"; ContextInfo.ssl_private_key_filepath = NULL;//"./ssl_key/client_key.pem"; ContextInfo.ssl_ca_filepath = "/root/cacert.pem";//"./cacert.pem"; ContextInfo.ssl_cipher_list = NULL; //use default one ContextInfo.gid = -1; ContextInfo.uid = -1; if(use_ssl) { ContextInfo.options |= LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT ; } ContextInfo.protocols = protocols; ContextInfo.timeout_secs = GetWebSocketPingInterval();//WebSocketPingInterval;//30;//9999;//30; ContextInfo.ws_ping_pong_interval = GetWebSocketPingInterval();//WebSocketPingInterval;//30;//0 for none, else interval in seconds context = lws_create_context(&ContextInfo); if (context == NULL) { DEBUG_ERROR("lws_create_context NG"); goto end; } memset(&ConnInfo,0,sizeof(struct lws_client_connect_info)); // fill up below information ConnInfo.context = context; ConnInfo.address=(const char *)OcppHost; DEBUG_INFO("ConnInfo.address: %s\n", ConnInfo.address); ConnInfo.port = GetOcppPort(); DEBUG_INFO("ConnInfo.port: %d\n", ConnInfo.port); ConnInfo.path=(const char *)OcppPath; DEBUG_INFO("ConnInfo.path: %s\n", ConnInfo.path); char addr_port[256] = { 0 }; sprintf(addr_port, "%s:%u", ConnInfo.address, (ConnInfo.port & 65535) ); ConnInfo.host= addr_port; // ConnInfo.address;//lws_canonical_hostname(context); //ConnInfo.origin="origin"; ConnInfo.protocol = protocols[0].name; ConnInfo.ietf_version_or_minus_one = -1; if(use_ssl) { #ifdef TLS_VALID_CERT_EXPIRED ConnInfo.ssl_connection = LCCSCF_USE_SSL | LCCSCF_ALLOW_SELFSIGNED | LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK; DEBUG_INFO("TLS does not allow expired certification.\n"); #else ConnInfo.ssl_connection = LCCSCF_USE_SSL | LCCSCF_ALLOW_SELFSIGNED | LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK | LCCSCF_ALLOW_EXPIRED; DEBUG_INFO("TLS allow expired certification.\n"); #endif } wsi_client = lws_client_connect_via_info(&ConnInfo); if (!wsi_client) { DEBUG_ERROR("lws_client_connect_via_info NG\n"); goto end; } end: pthread_exit(NULL/*(void *) fname*/); } int isQueueOverSize() { FILE *fp; uint32_t file_size; uint8_t result = FALSE; fp = fopen("/Storage/OCPP/TransactionRelatedQueue" , "r"); if(fp != NULL) { fseek(fp, 0L, SEEK_END); file_size = ftell(fp); if(file_size > (500*1024*1024)) { result = TRUE; DEBUG_WARN("Queue file over size.\n"); } fclose(fp); } return result; } int showfront(char *uuid, char *data) { FILE *fp; int result = FALSE; // 1: TRUE 0:FALSE char str[QUEUE_MESSAGE_LENGTH]={0}; char sstr[50]={ 0 };//sstr[200]={ 0 }; int c = 0; char *loc; char rmFileCmd[100]={0}; struct stat stats; stat("/Storage/OCPP", &stats); // Check for directory existence if (S_ISDIR(stats.st_mode) == 1) { //DEBUG_INFO("\n OCPP directory exist \n"); } else { DEBUG_INFO("\n OCPP directory not exist, create dir \n"); sprintf(rmFileCmd,"mkdir -p %s","/Storage/OCPP"); system(rmFileCmd); } memset(rmFileCmd, 0, ARRAY_SIZE(rmFileCmd)); if((access("/Storage/OCPP/TransactionRelatedQueue",F_OK))!=-1) { //DEBUG_INFO("TransactionRelatedQueue exist.\n"); } else { //DEBUG_INFO("TransactionRelatedQueue not exist\n"); FILE *log = fopen("/Storage/OCPP/TransactionRelatedQueue", "w+"); if(log == NULL) { DEBUG_INFO("Can't Create File TransactionRelatedQueue \n"); return FALSE; } else { fclose(log); } } /* opening file for reading */ fp = fopen("/Storage/OCPP/TransactionRelatedQueue" , "r"); if(fp == NULL) { DEBUG_INFO("Error opening TransactionRelatedQueue file"); return FALSE; } if( fgets (str, QUEUE_MESSAGE_LENGTH, fp)!=NULL ) { /* writing content to stdout */ //DEBUG_INFO("str=%s",str); if ((str[0] == '\n')||(strcmp(str,"")==0)) { DEBUG_INFO("It is a blank line"); fclose(fp); memset(rmFileCmd, 0, ARRAY_SIZE(rmFileCmd)); sprintf(rmFileCmd,"rm -f %s","/Storage/OCPP/TransactionRelatedQueue"); system(rmFileCmd); result = FALSE; return result; } else { //puts(str); //----------------uuid--------------// loc = strstr(str, "\""); memset(sstr ,0, ARRAY_SIZE(sstr) ); c = 0; while (loc[1+c] != '\"') { sstr[c] = loc[1+c]; c++; } sstr[c] = '\0'; //DEBUG_INFO("\n uuid:%s", sstr); //DEBUG_INFO("\n data:%s", str); strcpy(uuid,sstr); strcpy(data,str); result = TRUE; } } else { //DEBUG_INFO("queue is null\n"); strcpy(uuid,""); strcpy(data,""); result = FALSE; } fclose(fp); return result; } int addq(char *uuid, char *data) { FILE *outfile; char rmFileCmd[100]={0}; struct stat stats; stat("/Storage/OCPP", &stats); DEBUG_INFO("addq\n"); // Check for directory existence if (S_ISDIR(stats.st_mode) == 1) { //DEBUG_INFO("\n OCPP directory exist \n"); } else { //DEBUG_INFO("\n OCPP directory not exist, create dir \n"); sprintf(rmFileCmd,"mkdir -p %s","/Storage/OCPP"); system(rmFileCmd); } memset(rmFileCmd, 0, ARRAY_SIZE(rmFileCmd)); if((access("/Storage/OCPP/TransactionRelatedQueue",F_OK))!=-1) { //DEBUG_INFO("TransactionRelatedQueue exist.\n"); } else { //DEBUG_INFO("TransactionRelatedQueue not exist\n"); FILE *log = fopen("/Storage/OCPP/TransactionRelatedQueue", "w+"); if(log == NULL) { //DEBUG_INFO("Can't Create File TransactionRelatedQueue \n"); return FALSE; } else { fclose(log); } } // open file for writing outfile = fopen ("/Storage/OCPP/TransactionRelatedQueue", "a"); DEBUG_INFO("data = %s\n",data); fputs(data, outfile); fputs("\n", outfile); fclose (outfile); TransactionQueueNum = TransactionQueueNum + 1; if(OfflineTransaction == 1) // 0: no offline Transaction 1: offline Transaction { OfflineTransactionQueueNum = OfflineTransactionQueueNum + 1; } DEBUG_INFO("add queue end\n"); return FALSE; } //---------------- delq(): delete the top item --------------// int delq() { char tempfile[] = "/Storage/OCPP/delqtemp.json"; FILE *infile; FILE *outfile; int resultRename=0; char filename[60]={0}; char rmFileCmd[100]={0}; struct stat stats; stat("/Storage/OCPP", &stats); DEBUG_INFO("delq()\n"); // Check for directory existence if (S_ISDIR(stats.st_mode) == 1) { //DEBUG_INFO("\n OCPP directory exist \n"); } else { //DEBUG_INFO("\n OCPP directory not exist, create dir \n"); sprintf(rmFileCmd,"mkdir -p %s","/Storage/OCPP"); system(rmFileCmd); } memset(rmFileCmd, 0, ARRAY_SIZE(rmFileCmd)); if((access("/Storage/OCPP/TransactionRelatedQueue",F_OK))!=-1) { //DEBUG_INFO("TransactionRelatedQueue exist.\n"); } else { //DEBUG_INFO("TransactionRelatedQueue not exist\n"); FILE *log = fopen("/Storage/OCPP/TransactionRelatedQueue", "w+"); if(log == NULL) { //DEBUG_INFO("log is NULL\n"); return 0; } else { fclose(log); } } // open file for writing strcpy(filename, "/Storage/OCPP/TransactionRelatedQueue"); infile = fopen ("/Storage/OCPP/TransactionRelatedQueue", "r"); outfile = fopen (tempfile, "w"); /*检测到文件结束标识返回1,否则返回0。*/ //DEBUG_INFO("feof(infile) =%d\n",feof(infile)); int c; c = fgetc(infile); //printf("file c:%d\n",c); rewind(infile); if(c == EOF) { //DEBUG_INFO("TransactionRelatedQueue is NULL\n"); fclose(infile); fclose(outfile); sprintf(rmFileCmd,"rm -f %s",tempfile); system(rmFileCmd); } else { char buf[QUEUE_MESSAGE_LENGTH]={0}; int i = 0; //DEBUG_INFO("Orignal File is not NULL\n"); while (fgets(buf, sizeof(buf), infile) != NULL) { //printf("Orignal File get strings \n"); buf[strlen(buf) - 1] = '\0'; // eat the newline fgets() stores if(i==0) { TransactionQueueNum = TransactionQueueNum - 1; TransactionMessageResend = 1; DEBUG_INFO("delete the item\n"); } if(i != 0) { fprintf(outfile,"%s\n", buf); } i = i + 1; } fclose(infile); fclose(outfile); sprintf(rmFileCmd,"rm -f %s",filename); system(rmFileCmd); resultRename = rename(tempfile, filename); if(resultRename == 0) { //DEBUG_INFO("TransactionRelatedQueue file renamed successfully"); } else { //DEBUG_INFO("Error: unable to rename the TransactionRelatedQueue file"); } DEBUG_INFO("delq() end\n"); } return 0; } int showqueue() { char rmFileCmd[100]={0}; struct stat stats; stat("/Storage/OCPP", &stats); // Check for directory existence if (S_ISDIR(stats.st_mode) == 1) { //DEBUG_INFO("\n OCPP directory exist \n"); } else { //DEBUG_INFO("\n OCPP directory not exist, create dir \n"); sprintf(rmFileCmd,"mkdir -p %s","/Storage/OCPP"); system(rmFileCmd); } memset(rmFileCmd, 0, ARRAY_SIZE(rmFileCmd)); if((access("/Storage/OCPP/TransactionRelatedQueue",F_OK))!=-1) { //DEBUG_INFO("TransactionRelatedQueue exist.\n"); } else { //DEBUG_INFO("TransactionRelatedQueue not exist\n"); FILE *log = fopen("/Storage/OCPP/TransactionRelatedQueue", "w+"); if(log == NULL) { DEBUG_INFO("log is NULL\n"); return FALSE; } else { fclose(log); } } FILE *fp = fopen("/Storage/OCPP/TransactionRelatedQueue", "r"); char line[QUEUE_MESSAGE_LENGTH]={0}; // check if file exist (and you can open it) or not if (fp == NULL) { DEBUG_INFO("can open file TransactionRelatedQueue!"); return FALSE; } TransactionQueueNum = 0; // the number of packets in queue while(fgets(line, sizeof line, fp) != NULL) { //DEBUG_INFO("%s\n", line); TransactionQueueNum = TransactionQueueNum + 1; //the number of packets in queue } fclose(fp); return TRUE; } int sentqueue(){ FILE *fp; int result = FALSE; // 1: TRUE 0:FALSE int temptransactionId = 0, gettransactionId = 0; int tempconnectorId = 0; //int gunIndex = 0; char guid[37]={0}; char tempdata[65]={0}; char key_value[65]={0}; int IsStopTransaction = FALSE; //int IsconnectorIdNULL = FALSE; //int IsIdtagNULL = FALSE; char str[QUEUE_MESSAGE_LENGTH]={0}; char strcomposite[QUEUE_MESSAGE_LENGTH]={0}; char rmFileCmd[100]={0}; char connectorStr[2]={0}; struct stat stats; char sstr[28]={0}; unsigned char IdtagStr[20]={0}; unsigned char timestampStr[30]={0}; int tempmeterStart = 0; int tempreservationId = 0; int c = 0; char *loc; DEBUG_INFO("Sent queue.\n"); stat("/Storage/OCPP", &stats); // Check for directory existence if (S_ISDIR(stats.st_mode) == 1) { //DEBUG_INFO("\n OCPP directory exist \n"); } else { //DEBUG_INFO("\n OCPP directory not exist, create dir \n"); sprintf(rmFileCmd,"mkdir -p %s","/Storage/OCPP"); system(rmFileCmd); } memset(rmFileCmd, 0, ARRAY_SIZE(rmFileCmd)); /* opening file for reading */ fp = fopen("/Storage/OCPP/TransactionRelatedQueue" , "r"); if(fp == NULL) { DEBUG_INFO("Error opening file"); return FALSE; } if( fgets (str, QUEUE_MESSAGE_LENGTH, fp)!=NULL ) { //---- writing content to stdout ---// //*********************Start: StopTransaction***************************/ loc = strstr(str, "StopTransaction"); c = 0; memset(sstr ,0, ARRAY_SIZE(sstr) ); if(loc != NULL) { IsStopTransaction = TRUE; } memset(connectorStr,0,ARRAY_SIZE(connectorStr)); strncpy(connectorStr, str, 1); tempconnectorId = atoi(connectorStr); //*********************End: StopTransaction***************************/ #if 0 //*********************Start:connectorId***************************/ loc = strstr(str, "connectorId"); c = 0; memset(sstr ,0, ARRAY_SIZE(sstr) ); if(loc != NULL) { while (loc[strlen("connectorId")+2+c] != ',') { sstr[c] = loc[strlen("connectorId")+2+c]; c++; } sstr[c] = '\0'; tempconnectorId = atoi(sstr); } // else // { // IsconnectorIdNULL = TRUE; // } //*********************End:connectorId***************************/ #endif //*********************Start:idTag***************************/ loc = strstr(str, "idTag"); c = 0; memset(sstr ,0, ARRAY_SIZE(sstr) ); if(loc != NULL) { while (loc[3+strlen("idTag")+c] != '\"') { sstr[c] = loc[3+strlen("idTag")+c]; c++; } sstr[c] = '\0'; strcpy((char*)IdtagStr, sstr); } // else // { // IsIdtagNULL = TRUE; // } //*********************End:idTag***************************/ //*********************Start: StartTransaction***************************/ loc = strstr(str, "StartTransaction"); c = 0; memset(sstr ,0, ARRAY_SIZE(sstr) ); if(loc != NULL) { // [2,0200000000000000000000000001584415776,StartTransaction,{connectorId:1,idTag:123,meterStart:100,reservationId:0,timestamp:2020-03-17T03:29:36Z}] //DEBUG_INFO("\n sent queue StartTransaction\n"); if(tempconnectorId > 0) { sprintf(tempdata, "StartTransaction,%d", (tempconnectorId-1)); } //GUID memset(sstr ,0, ARRAY_SIZE(sstr) ); c=0; while (str[6+c] != '\"') { sstr[c] = str[6+c]; c++; } sstr[c] = '\0'; strcpy(guid, sstr); //Idtag loc = strstr(str, "idTag"); memset(sstr ,0, ARRAY_SIZE(sstr) ); c=0; while (loc[3+strlen("idTag")+c] != '\"') { sstr[c] = loc[3+strlen("idTag")+c]; c++; } sstr[c] = '\0'; strcpy((char*)IdtagStr, sstr); //meterStart loc = strstr(str, "meterStart"); c = 0; memset(sstr ,0, ARRAY_SIZE(sstr) ); if(loc != NULL) { while (loc[strlen("meterStart")+2+c] != ',') { sstr[c] = loc[strlen("meterStart")+2+c]; c++; } sstr[c] = '\0'; tempmeterStart = atoi(sstr); } //reservationId loc = strstr(str, "reservationId"); c = 0; memset(sstr ,0, ARRAY_SIZE(sstr) ); if(loc != NULL) { while (loc[strlen("reservationId")+2+c] != ',') { sstr[c] = loc[strlen("reservationId")+2+c]; c++; } sstr[c] = '\0'; tempreservationId = atoi(sstr); } //timestamp loc = strstr(str, "timestamp"); memset(sstr ,0, ARRAY_SIZE(sstr) ); c=0; while (loc[3+strlen("timestamp")+c] != '\"') { sstr[c] = loc[3+strlen("timestamp")+c]; c++; } sstr[c] = '\0'; strcpy((char*)timestampStr, sstr); if(hashmap_operation(HASH_OP_GET, guid, key_value) == TRUE) { //DEBUG_INFO("\n 1. sent queue guid=%s\n",guid); } else { hashmap_operation(HASH_OP_ADD, guid, tempdata); FillStartTransaction(tempconnectorId, IdtagStr, tempmeterStart, tempreservationId, timestampStr); //DEBUG_INFO("\n 2. sent queue guid=%s\n",guid); } } //*********************End: StartTransaction***************************/ //****************transactionId********************/ c=0; loc = strstr(str, "transactionId"); memset(sstr ,0, ARRAY_SIZE(sstr) ); if(loc != NULL) { // Only MeterValue with transactionId & StopTransaction will arrive here while ((loc[strlen("transactionId")+2+c] != '}') && (loc[strlen("transactionId")+2+c] != ',')) { sstr[c] = loc[strlen("transactionId")+2+c]; c++; } sstr[c] = '\0'; temptransactionId = atoi(sstr); //Get IdTag from StartTransaction , store to StartTransactionIdTagTemp, For StopTransaction usage in Queue (StartTransaction. StopTransaction user id different), get actual TransactionId GetStartTransactionIdTag(tempconnectorId-1); gettransactionId = GetTransactionId(tempconnectorId, (uint8_t *)StartTransactionIdTagTemp, IsStopTransaction); DEBUG_INFO("queue map transactionId = %d\n", gettransactionId); DEBUG_INFO("original connectorId = %d\n", tempconnectorId); DEBUG_INFO("original transactionId = %d\n", temptransactionId); DEBUG_INFO("StartTransactionIdTagTemp = %s\n", StartTransactionIdTagTemp); if((gettransactionId != 0)&&(temptransactionId != gettransactionId)) { //replace transactionId of metervalue or stopTransaction strncpy(strcomposite,str, (loc-str)+2+strlen("transactionId")); sprintf(strcomposite+((loc-str)+2+strlen("transactionId")),"%d",gettransactionId); strcat(strcomposite, loc+strlen("transactionId")+2+c); // 把 字串中transactionId後面的字串串接到 strcomposite後面 LWS_Send(strcomposite+2); // skip 2 bytes String -> Connector ID, } else { LWS_Send(str+2); // skip 2 bytes String -> Connector ID gettransactionId = temptransactionId; } DEBUG_INFO("Final transactionId = %d\n", gettransactionId); if(IsStopTransaction == TRUE)//if((IsStopTransaction == TRUE)&&(gettransactionId != 0)) { SetTransactionIdZero(gettransactionId); } } else { // MeterValue without transactionId & StartTransaction arrive here LWS_Send(str+2); } result = TRUE; } else { result = FALSE; } fclose(fp); return result; } void* processTransactionQueue(void* data) { char frontUUID[100] ={0}; char frontData[QUEUE_MESSAGE_LENGTH/*1024*4*/] ={0}; int queueNotEmpty = FALSE; while(1) { if(!req_SendQueue && ((((time((time_t*)NULL) - startTimeQueue) > (TransactionMessageRetryIntervalGet()>10?TransactionMessageRetryIntervalGet():10))) || (isWebsocketSendable && ((time((time_t*)NULL) - startTimeQueue) >= ((counterQueueSent>=20)?5:0))))) { if(FirstHeartBeatResponse() == 1) { memset(frontUUID, 0, ARRAY_SIZE(frontUUID)); memset(frontData, 0, ARRAY_SIZE(frontData)); queueNotEmpty = FALSE; queueNotEmpty = queue_operation(QUEUE_OPERATION_SHOWFRONT,frontUUID, frontData);//showfront(frontUUID, frontData); ---> remove temporally if((queueNotEmpty == TRUE) && (GetOcppConnStatus() == 1)) //OcppConnStatus 0: disconnected, 1: connected { if(isWebsocketSendable) DEBUG_INFO("isWebsocketSendable on.\n"); if((((time((time_t*)NULL) - startTimeQueue) > (TransactionMessageRetryIntervalGet()>10?TransactionMessageRetryIntervalGet():10)))) DEBUG_INFO("Queue timer(%d) over spec(%d).\n", (time((time_t*)NULL) - startTimeQueue), TransactionMessageRetryIntervalGet()); if((OfflineTransaction == 1) && (OfflineTransactionQueueNum != 0)) //OfflineTransaction 0: no offline Transaction 1: offline Transaction { DEBUG_INFO("Sent message from queue request off-line first.\n"); req_SendQueue = 1; // 0: no packets to send 1: send the top packet in queue OfflineTransactionQueueNum = OfflineTransactionQueueNum - 1; if(OfflineTransactionQueueNum == 0) { OfflineTransaction = 0; } } else { if(TransactionMessageResend <= TransactionMessageAttemptsGet()) // { DEBUG_INFO("Sent message from queue request.\n"); DEBUG_INFO("TransactionMessageResend = %d\n",TransactionMessageResend); req_SendQueue = 1; TransactionMessageResend += 1; } else { DEBUG_INFO("Transaction message resend(%d) over spec(%d) message abandon.\n", TransactionMessageResend, TransactionMessageAttemptsGet()); queue_operation(QUEUE_OPERATION_DEL,"",""); //// delete item TransactionMessageResend = 1; } } } } if(GetOcppConnStatus() == 0) { if(queueNotEmpty == TRUE) { OfflineTransaction = 1; // 0: no offline Transaction 1: offline Transaction } } // Refresh queue timer startTimeQueue = time((time_t*)NULL); if((counterQueueSent >= 10) || (queueNotEmpty == FALSE)) { counterQueueSent = 0; } else { counterQueueSent += 1; } } usleep(500000); } pthread_exit(NULL); // return 0; } void* processWatchdog() { for(;;) { if(((time((time_t*)NULL) - startTimeDog) > 10) && (context != NULL)) { DEBUG_INFO("LWS watch dog timeout.\n"); lws_cancel_service(context); lws_cancel_service_pt(wsi_client); if(counterLwsRestart >= 2) { DEBUG_INFO("LWS watch dog timeout over 3 count.\n"); system("pkill OcppBackend"); } else counterLwsRestart++; startTimeDog = time((time_t*)NULL); } /* if(system("pidof -s Module_PhBackend > /dev/null") != 0) { DEBUG_INFO("Module_PhBackend not running, restart it.\r\n"); system("/root/Module_PhBackend &"); }*/ sleep(1); } pthread_exit(NULL); // } void CheckTransactionPacket(char *uuid) { char frontUUID[100]={0}; char frontData[QUEUE_MESSAGE_LENGTH]={0}; int queueNotEmpty = FALSE; int cmpResult = 0; queueNotEmpty = queue_operation(QUEUE_OPERATION_SHOWFRONT,frontUUID, frontData);//showfront(frontUUID, frontData); ---> remove temporally if(queueNotEmpty == TRUE) { cmpResult = strcmp(frontUUID, uuid); if (cmpResult == 0) { DEBUG_INFO("Receive queue response match.\n"); queue_operation(QUEUE_OPERATION_DEL,"","");//delq(); ---> remove temporally TransactionMessageResend = 1; } else DEBUG_INFO("Receive queue response mismatch.\n"); } } int queue_operation(int type, char *frontUUID, char *frontData) { int result=0; while(1) { if (!IsUsing ) { IsUsing = TRUE; if(type == QUEUE_OPERATION_SHOWQUEUE) // show items in queue { result = showqueue(); } else if(type == QUEUE_OPERATION_SHOWFRONT) // show first item { result = showfront(frontUUID, frontData); } else if(type == QUEUE_OPERATION_DEL) // delete item { result = delq(); } else if(type == QUEUE_OPERATION_SENT) // sent items in queue { result = sentqueue(); } else if(type == QUEUE_OPERATION_ADD) // add items to the queue { // If queue file over size only add start * stop transaction message if(!isQueueOverSize() || (strstr(frontData, "MeterValues") == NULL)) { result = addq(frontUUID, frontData); } } IsUsing = FALSE; break; } usleep(100000); } return result; } int removeMessageSentFile(void) { char rmFileCmd[100]={0}; struct stat stats; stat("/Storage/OCPP", &stats); // Check for directory existence if(S_ISDIR(stats.st_mode) == 1) { //DEBUG_INFO("\n OCPP directory exist \n"); } else { DEBUG_INFO("\n directory not exist, create dir \n"); sprintf(rmFileCmd,"mkdir -p %s","/Storage/OCPP"); system(rmFileCmd); } memset(rmFileCmd, 0, ARRAY_SIZE(rmFileCmd)); if((access("/Storage/OCPP/MessageSent",F_OK))!=-1) { DEBUG_INFO("MessageSent file exist.\n"); sprintf(rmFileCmd,"rm -f %s","/Storage/OCPP/MessageSent"); system(rmFileCmd); } memset(rmFileCmd, 0, ARRAY_SIZE(rmFileCmd)); return 0; } static int changeChageWebSocketPingInterval = FALSE; void ChageWebSocketPingInterval(int WebSocketPingInterval) { changeChageWebSocketPingInterval = TRUE; } //================================================ // Main process //================================================ int main(void) { char rmFileCmd[100]={0}; struct stat stats; DEBUG_INFO("Module_OcppBackend task initialization...\n"); //lws_set_log_level(LLL_PARSER | LLL_HEADER | LLL_ERR | LLL_WARN | LLL_NOTICE | LLL_INFO | LLL_DEBUG | LLL_EXT | LLL_CLIENT | LLL_LATENCY , NULL); if(ProcessShareMemory()== FAIL) { return FAIL; } // Check & create OCPP dir stat("/Storage/OCPP", &stats); if(S_ISDIR(stats.st_mode) != 1) { DEBUG_INFO("OCPP directory not exist, create dir \n"); sprintf(rmFileCmd,"mkdir -p /Storage/OCPP"); system(rmFileCmd); } memset(rmFileCmd, 0, ARRAY_SIZE(rmFileCmd)); // Create Process: Resend Transaction pthread_create(&tid_ProcQueue, NULL, processTransactionQueue, NULL); pthread_create(&tid_Watchdog, NULL, processWatchdog, NULL); // Sqlite3 initial sqlite3_config(SQLITE_CONFIG_URI,1); if(sqlite3_open("file:/Storage/OCPP/charger.db", &db)) { DEBUG_INFO( "Can't open database: %s\n", sqlite3_errmsg(db)); sqlite3_close( db ); exit(0); } else { DEBUG_INFO( "Opened database successfully\n"); } //Create Table log buffer if(sqlite3_exec(db, createsql, 0, 0, &errMsg) != SQLITE_OK) { DEBUG_INFO( "Create log buffer table error message: %s\n", errMsg); return 0; } else { DEBUG_INFO( "Opened log buffer table successfully\n"); } // Create Table OcppAuthCache if(sqlite3_exec(db, sqlOcppAuthCache, 0, 0, &errMsg) != SQLITE_OK) { DEBUG_INFO( "Create OcppAuthCache error message: %s\n", errMsg); return 0; } else { DEBUG_INFO( "Opened OcppAuthCache table successfully\n"); } // Create Table OcppAuthLocal if(sqlite3_exec(db, sqlOcppAuthLocal, 0, 0, &errMsg) != SQLITE_OK) { DEBUG_INFO( "Create Table OcppAuthLocal error %s\n",errMsg); return 0; } else { DEBUG_INFO( "Opened OcppAuthLocal table successfully\n"); } initialConfigurationTable(); removeMessageSentFile(); for(;;) { startTimeDog = time((time_t*)NULL); counterLwsRestart = 0; // Connect server if(ConnectionEstablished==0) // Check InternetConn 0: disconnected, 1: connected { isWebsocketSendable = 1; SetOcppConnStatus(FALSE); SetServerSign(FALSE); InitialSystemValue(); if((time((time_t*)NULL)-startTime.connect) >= 30) { DEBUG_INFO("Server connecting...\n"); pthread_create(&tid_connectServer, NULL, ConnectWsServer, NULL); startTime.connect=time((time_t*)NULL); } CheckSystemValue(); } else { // Sign in if((GetServerSign() == FALSE) && ( ((GetBootNotificationInterval() != 0) && ((time((time_t*)NULL)-startTime.bootNotification)>= GetBootNotificationInterval())) || ((time((time_t*)NULL)-startTime.bootNotification) >= defaultWaitingTime) ) ) { sendBootNotificationRequest(); startTime.bootNotification=time((time_t*)NULL); } // On line operation if(GetServerSign() == TRUE) { // Send message from queue if((req_SendQueue == 1) && isWebsocketSendable) { queue_operation(QUEUE_OPERATION_SENT, "", ""); req_SendQueue = 0; } // Check System Value CheckSystemValue(); if(GetHeartBeatWithNOResponse() >= 30) { lws_context_destroy(context); ConnectionEstablished = 0; context = NULL; SetHeartBeatWithNOResponse(); DEBUG_WARN("Heartbeat re-send over 30 count.\n"); } if((changeChageWebSocketPingInterval == TRUE) || (GetOcppConnStatus() == 0)) { DEBUG_INFO("GetOcppConnStatus() = %d\n", GetOcppConnStatus()); if(changeChageWebSocketPingInterval) { DEBUG_INFO("Websocket ping interval changed request.\n"); changeChageWebSocketPingInterval = FALSE; } lws_context_destroy(context); ConnectionEstablished = 0; context = NULL; } } } do { lws_service(context, 0);//timeout_ms }while((SendBufLen>0) && (context!=NULL) && GetInternetConn()); usleep(100000); } pthread_join(tid_ProcQueue, NULL); pthread_join(tid_Watchdog, NULL); return FAIL; }