#include "Module_OcppBackend20.h" #undef FALSE #undef TRUE typedef enum boolean { FALSE, TRUE } BOOL; struct lws *wsi_client; struct lws_context *context; static int req_SendQueue = 0; pthread_t tid_connectServer; pthread_t tid_ProcQueue; pthread_t tid_Watchdog; struct StartTime { struct timespec connect; struct timespec bootNotification; struct timespec reConnect; struct timespec disconnectServer; struct timespec startTimeQueue; struct timespec pingOn; }startTime; struct QueueOpInfo queueOpInfo; //========================================== // Function prototype //========================================== void ReceivedMessage(void *in, size_t len); int SendBufLen=0;//(1024*4);//(1024*3); unsigned char SendBuffer[WEBSOCKET_BUFFER_SIZE]={0}; static int ConnectionEstablished=0; int defaultWaitingTime = 10; //10 second char OcppPath[384]={0}; char OcppProtocol[10]={0},OcppHost[128]={0}, OcppTempPath[256]={0}; int OcppPort=0; unsigned char StartTransactionIdTagTemp[20]={0}; uint8_t isWebsocketSendable = 1; uint8_t isQueueSendable = 1; uint8_t counterQueueSent = 0; uint8_t counterConnect = 0; uint8_t counterPingSend = 0; //================================= // Common routine //================================= int GetTransactionQueueNum(void) { return queueOpInfo.TransactionQueueNum; } //========================================== // Web socket tranceive routine //========================================== int SendData(struct lws *wsi) { int n; int len; unsigned char out[LWS_SEND_BUFFER_PRE_PADDING + ARRAY_SIZE(SendBuffer) + LWS_SEND_BUFFER_POST_PADDING] = {0}; len = strlen((char *)SendBuffer); if(len == 0)return 0; if((strstr((char*)SendBuffer, "\"MeterValues\"") != NULL) || (strstr((char*)SendBuffer, "\"TransactionEvent\"") != NULL)) { isQueueSendable = OFF; } memcpy (out + LWS_SEND_BUFFER_PRE_PADDING, SendBuffer, len ); DEBUG_OCPPMESSAGE_INFO("===========> %s\n", out + LWS_SEND_BUFFER_PRE_PADDING); n = lws_write(wsi, out + LWS_SEND_BUFFER_PRE_PADDING, len, LWS_WRITE_TEXT); memset(SendBuffer, 0, len); SendBufLen = 0; return n; } int SendPing(struct lws *wsi) { uint8_t ping[LWS_PRE + 125]; DEBUG_OCPPMESSAGE_INFO("===========> Set PING packet.\n"); return lws_write(wsi, ping + LWS_PRE, 0, LWS_WRITE_PING); } static int OCPP20Callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { char buf[256]={0}, hash[20]={0}, key_b64[40]={0}, tempin[WEBSOCKET_BUFFER_SIZE]={0}, sstr[WEBSOCKET_BUFFER_SIZE]={0}; uint8_t auth_b64[256]={0}, boxId[128]={0}, password[64]={0}; int c = 0; char *loc; switch (reason) { case LWS_CALLBACK_PROTOCOL_INIT: DEBUG_INFO("LWS_CALLBACK_PROTOCOL_INIT\n"); break; case LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH: DEBUG_INFO("LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH\n"); DEBUG_OCPPMESSAGE_INFO("----- Handshake: Client Request START -----\n"); lws_hdr_copy(wsi, buf, ARRAY_SIZE(buf) - 1, _WSI_TOKEN_CLIENT_URI); DEBUG_OCPPMESSAGE_INFO("GET %s HTTP/1.1 \n", buf); lws_hdr_copy(wsi, buf, ARRAY_SIZE(buf) - 1, _WSI_TOKEN_CLIENT_HOST); DEBUG_OCPPMESSAGE_INFO("Host: %s\n", buf); DEBUG_OCPPMESSAGE_INFO("Upgrade: websocket\n"); DEBUG_OCPPMESSAGE_INFO("Connection: Upgrade\n"); lws_b64_encode_string(hash, 16, key_b64, ARRAY_SIZE(key_b64));// Sec-WebSocket-Key DEBUG_OCPPMESSAGE_INFO("Sec-WebSocket-Key: %s\n", key_b64); lws_hdr_copy(wsi, buf, ARRAY_SIZE(buf) - 1, _WSI_TOKEN_CLIENT_SENT_PROTOCOLS); DEBUG_OCPPMESSAGE_INFO("Sec-WebSocket-Protocol: %s\n", buf); DEBUG_OCPPMESSAGE_INFO("Sec-WebSocket-Version: %d\n", SPEC_LATEST_SUPPORTED); DEBUG_OCPPMESSAGE_INFO("----- Handshake: Client Request END -----\n"); DEBUG_OCPPMESSAGE_INFO("----- Handshake: Server response START -----\n"); lws_hdr_copy(wsi, buf, ARRAY_SIZE(buf) - 1, WSI_TOKEN_HTTP); DEBUG_OCPPMESSAGE_INFO("HTTP/1.1 %s\n", buf); lws_hdr_copy(wsi, buf, ARRAY_SIZE(buf) - 1, WSI_TOKEN_UPGRADE); DEBUG_OCPPMESSAGE_INFO("Upgrade: %s\n", buf); lws_hdr_copy(wsi, buf, ARRAY_SIZE(buf) - 1, WSI_TOKEN_CONNECTION); DEBUG_OCPPMESSAGE_INFO("Connection: %s\n", buf); lws_hdr_copy(wsi, buf, ARRAY_SIZE(buf) - 1, WSI_TOKEN_ACCEPT); DEBUG_OCPPMESSAGE_INFO("Sec-WebSocket-Accept: %s\n", buf); lws_hdr_copy(wsi, buf, ARRAY_SIZE(buf) - 1, WSI_TOKEN_PROTOCOL); DEBUG_OCPPMESSAGE_INFO("Sec-WebSocket-Protocol: %s\n", buf); DEBUG_OCPPMESSAGE_INFO("----- Handshake: Server response END -----\n"); break; case LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION: DEBUG_INFO("LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION\n"); break; case LWS_CALLBACK_WSI_DESTROY: DEBUG_INFO("LWS_CALLBACK_WSI_DESTROY\n"); pthread_detach(tid_connectServer); SetServerSign(FALSE); ConnectionEstablished = 0; context = NULL; break; case LWS_CALLBACK_LOCK_POLL: break; case LWS_CALLBACK_ADD_POLL_FD: DEBUG_INFO("LWS_CALLBACK_ADD_POLL_FD\n"); break; case LWS_CALLBACK_DEL_POLL_FD: DEBUG_INFO("LWS_CALLBACK_DEL_POLL_FD\n"); break; case LWS_CALLBACK_UNLOCK_POLL: break; case LWS_CALLBACK_CHANGE_MODE_POLL_FD: break; case LWS_CALLBACK_WSI_CREATE: DEBUG_INFO("LWS_CALLBACK_WSI_CREATE\n"); break; case LWS_CALLBACK_GET_THREAD_ID: break; case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER: DEBUG_INFO("LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER\n"); unsigned char** pos = (unsigned char**)in; unsigned char* end = (*pos) + len; switch(GetOcppSecurityProfile()) { case 1: case 2: case 3: GetOcppChargerBoxId(boxId); GetOcppSecurityPassword(password); sprintf(buf, "%s:%s", boxId, password); lws_b64_encode_string(buf, strlen(buf), (char*)auth_b64, ARRAY_SIZE(auth_b64)); sprintf(buf, "Basic %s", auth_b64); if (lws_add_http_header_by_token(wsi, WSI_TOKEN_HTTP_AUTHORIZATION, (uint8_t *)buf, strlen(buf), pos, end)) { DEBUG_ERROR("lws_add_http_header_by_token : WSI_TOKEN_HTTP_AUTHORIZATION\n"); return -1; } DEBUG_OCPPMESSAGE_INFO("SecurityProfile: %d\n", GetOcppSecurityProfile()); DEBUG_OCPPMESSAGE_INFO("Authorization: %s\n", buf); break; case 0: default: break; } break; case LWS_CALLBACK_CLIENT_ESTABLISHED: //3 DEBUG_INFO("LWS_CALLBACK_CLIENT_ESTABLISHED\n"); //connected ConnectionEstablished=1; SetOcppConnStatus(TRUE); refreshStartTimer(&startTime.pingOn); counterPingSend = 0; queueOpInfo.PreTransactionMessageResend = 0; break; case LWS_CALLBACK_CLIENT_CONNECTION_ERROR://1 DEBUG_ERROR("LWS_CALLBACK_CLIENT_CONNECTION_ERROR %s\n", (char *)in ); //disconnected ConnectionEstablished=0; DEBUG_OCPPMESSAGE_INFO("===== Handshake: Client START =====\n"); lws_hdr_copy(wsi, buf, ARRAY_SIZE(buf) - 1, _WSI_TOKEN_CLIENT_URI); DEBUG_OCPPMESSAGE_INFO("GET %s HTTP/1.1 \n", buf); lws_hdr_copy(wsi, buf, ARRAY_SIZE(buf) - 1, _WSI_TOKEN_CLIENT_HOST); DEBUG_OCPPMESSAGE_INFO("Host: %s\n", buf); DEBUG_OCPPMESSAGE_INFO("Upgrade: websocket\n"); DEBUG_OCPPMESSAGE_INFO("Connection: Upgrade\n"); lws_b64_encode_string(hash, 16, key_b64, ARRAY_SIZE(key_b64));// Sec-WebSocket-Key DEBUG_OCPPMESSAGE_INFO("Sec-WebSocket-Key: %s\n", key_b64); lws_hdr_copy(wsi, buf, ARRAY_SIZE(buf) - 1, _WSI_TOKEN_CLIENT_SENT_PROTOCOLS); DEBUG_OCPPMESSAGE_INFO("Sec-WebSocket-Protocol: %s\n", buf); DEBUG_OCPPMESSAGE_INFO("Sec-WebSocket-Version: %d\n", SPEC_LATEST_SUPPORTED); DEBUG_OCPPMESSAGE_INFO("===== Handshake: Client END =====\n"); DEBUG_OCPPMESSAGE_INFO("===== Handshake: Server response START =====\n"); lws_hdr_copy(wsi, buf, ARRAY_SIZE(buf) - 1, WSI_TOKEN_HTTP); DEBUG_OCPPMESSAGE_INFO("HTTP/1.1 %s\n", buf); lws_hdr_copy(wsi, buf, ARRAY_SIZE(buf) - 1, WSI_TOKEN_UPGRADE); DEBUG_OCPPMESSAGE_INFO("Upgrade: %s\n", buf); lws_hdr_copy(wsi, buf, ARRAY_SIZE(buf) - 1, WSI_TOKEN_CONNECTION); DEBUG_OCPPMESSAGE_INFO("Connection: %s\n", buf); lws_hdr_copy(wsi, buf, ARRAY_SIZE(buf) - 1, WSI_TOKEN_ACCEPT); DEBUG_OCPPMESSAGE_INFO("Sec-WebSocket-Accept: %s\n", buf); lws_hdr_copy(wsi, buf, ARRAY_SIZE(buf) - 1, WSI_TOKEN_PROTOCOL); DEBUG_OCPPMESSAGE_INFO("Sec-WebSocket-Protocol: %s\n", buf); DEBUG_OCPPMESSAGE_INFO("===== Handshake: Server response END =====\n"); break; case LWS_CALLBACK_CLOSED://4 DEBUG_INFO("LWS_CALLBACK_CLOSED\n"); //disconnected ConnectionEstablished=0; break; case LWS_CALLBACK_CLIENT_WRITEABLE://10 if(isWebsocketSendable && (0 < GetWebSocketPingInterval()) && (GetWebSocketPingInterval() <= getDiffSecNow(startTime.pingOn)) && (GetServerSign() == TRUE)) SendPing(wsi); else SendData(wsi); break; case LWS_CALLBACK_CLIENT_RECEIVE://8 ((char *)in)[len] = '\0'; DEBUG_OCPPMESSAGE_INFO("<==== %s\n", (char *)in); //**********Receive Message**********/ c = 0; loc = strstr((const char *)in, "][2,"); if(loc == NULL) { loc = strstr((const char *)in, "][3,"); if(loc == NULL) { loc = strstr((const char *)in, "][4,"); } } memset(sstr, 0, ARRAY_SIZE(sstr) ); if(loc != NULL) { DEBUG_INFO("There are continuous second packet []\n"); while (loc[1+c] != '\0') { sstr[c] = loc[1+c]; c++; } sstr[c] = '\0'; strcpy(tempin, sstr); DEBUG_INFO("Final Receive: %s\n", tempin); } else { strcpy(tempin,(char *)in); } ReceivedMessage((void *)strtrim(tempin), strlen(tempin)); isWebsocketSendable = 1; refreshStartTimer(&startTime.pingOn); break; case LWS_CALLBACK_CLIENT_RECEIVE_PONG: DEBUG_INFO("LWS_CALLBACK_CLIENT_RECEIVE_PONG\n"); DEBUG_OCPPMESSAGE_INFO("<==== Get PONG packet.\n"); refreshStartTimer(&startTime.pingOn); isWebsocketSendable = ON; counterPingSend = 0; break; case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION: DEBUG_INFO("LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION\n"); break; case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS: DEBUG_INFO("LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS\n"); break; case LWS_CALLBACK_PROTOCOL_DESTROY: DEBUG_INFO("LWS_CALLBACK_PROTOCOL_DESTROY\n"); break; case LWS_CALLBACK_RECEIVE_PONG: DEBUG_INFO("LWS_CALLBACK_RECEIVE_PONG\n"); break; case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: DEBUG_INFO("LWS_CALLBACK_WS_PEER_INITIATED_CLOSE\n"); break; case LWS_CALLBACK_CLOSED_CLIENT_HTTP: DEBUG_INFO("LWS_CALLBACK_CLOSED_CLIENT_HTTP\n"); if(GetInternetConn() == 1) { DEBUG_INFO("Download new CA certification.\n", system("wget --no-check-certificate -O /root/cacert.pem http://curl.haxx.se/ca/cacert.pem &")); } break; default: DEBUG_INFO("Reason = %d\n", reason); break; } return 0; } static struct lws_protocols protocols[] = { { "ocpp2.0.1", OCPP20Callback, WEBSOCKET_BUFFER_SIZE, WEBSOCKET_BUFFER_SIZE, }, { "ocpp2.0.1", OCPP20Callback, WEBSOCKET_BUFFER_SIZE, WEBSOCKET_BUFFER_SIZE, }, { NULL, NULL, 0 /* End of list */ } }; void* ConnectWsServer(void* data) //int ConnectWsServer() { struct lws_context_creation_info ContextInfo; struct lws_client_connect_info ConnInfo; int use_ssl=0; counterConnect += 1; // If internet available synchronize datetime with ntp server if(GetInternetConn() == 1) { system("pkill ntpd"); DEBUG_INFO("NTP synchronize with Microsoft\n", system("/usr/sbin/ntpd -nqp time.windows.com &")); DEBUG_INFO("NTP synchronize with China\n", system("/usr/sbin/ntpd -nqp cn.ntp.org.cn &")); DEBUG_INFO("NTP synchronize with Taiwan\n", system("/usr/sbin/ntpd -nqp tock.stdtime.gov.tw &")); DEBUG_INFO("NTP synchronize with Europe\n", system("/usr/sbin/ntpd -nqp 0.europe.pool.ntp.org &")); } if(context!=NULL) { pthread_detach(pthread_self()); lws_context_destroy(context); ConnectionEstablished=0; context = NULL; } checkNetworkProfile(); memset(&ContextInfo, 0, sizeof(struct lws_context_creation_info)); if((GetOcppServerURL()==0) || (GetOcppPort() == 0) || (GetOcppPath()==0)) { DEBUG_ERROR("OCPP URL is NULL or OCPP Port is zero or OCPP Path is NULL\n"); goto end; } if((strcmp(OcppProtocol,"ws")==0)&&(strlen(OcppProtocol)== 2)) { DEBUG_INFO("Web socket is non-security mode.\n"); use_ssl=0; } else if((strcmp(OcppProtocol,"wss")==0)&&(strlen(OcppProtocol)== 3)) { DEBUG_INFO("Web socket is security mode.\n"); use_ssl=1; } ContextInfo.port = CONTEXT_PORT_NO_LISTEN; ContextInfo.iface = NULL; ContextInfo.ssl_private_key_password = NULL; ContextInfo.ssl_cert_filepath = ((GetOcppSecurityProfile()==3) && (access("/Storage/OCPP/certCP.pem",F_OK) != -1))?"/Storage/OCPP/certCP.pem":NULL; ContextInfo.ssl_private_key_filepath = ((GetOcppSecurityProfile()==3) && (access("/Storage/OCPP/certCP.key",F_OK) != -1))?"/Storage/OCPP/certCP.key":NULL; ContextInfo.ssl_ca_filepath = "/root/certCA.pem"; ContextInfo.ssl_cipher_list = NULL; //use default one ContextInfo.gid = -1; ContextInfo.uid = -1; if(use_ssl) { ContextInfo.options |= LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT; } ContextInfo.protocols = protocols; ContextInfo.timeout_secs = GetBackendConnectionTimeout(); //ContextInfo.ws_ping_pong_interval = GetWebSocketPingInterval(); ContextInfo.ka_time = 20; ContextInfo.keepalive_timeout = 5; ContextInfo.ka_probes = 2; ContextInfo.ka_interval = 5; context = lws_create_context(&ContextInfo); if (context == NULL) { DEBUG_ERROR("lws_create_context NG"); goto end; } memset(&ConnInfo,0,sizeof(struct lws_client_connect_info)); // fill up below information ConnInfo.context = context; ConnInfo.address=(const char *)OcppHost; DEBUG_INFO("ConnInfo.address: %s\n", ConnInfo.address); ConnInfo.port = GetOcppPort(); DEBUG_INFO("ConnInfo.port: %d\n", ConnInfo.port); ConnInfo.path=(const char *)OcppPath; DEBUG_INFO("ConnInfo.path: %s\n", ConnInfo.path); char addr_port[256] = { 0 }; sprintf(addr_port, "%s:%u", ConnInfo.address, (ConnInfo.port & 65535) ); ConnInfo.host= addr_port; // ConnInfo.address;//lws_canonical_hostname(context); //ConnInfo.origin="origin"; ConnInfo.protocol = protocols[1].name; ConnInfo.ietf_version_or_minus_one = -1; if(use_ssl) { #ifdef TLS_VALID_CERT_EXPIRED ConnInfo.ssl_connection = LCCSCF_USE_SSL | LCCSCF_ALLOW_SELFSIGNED | LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK; DEBUG_INFO("TLS does not allow expired certification.\n"); #else ConnInfo.ssl_connection = LCCSCF_USE_SSL | LCCSCF_ALLOW_SELFSIGNED | LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK | LCCSCF_ALLOW_EXPIRED; DEBUG_INFO("TLS allow expired certification.\n"); #endif } wsi_client = lws_client_connect_via_info(&ConnInfo); if (!wsi_client) { DEBUG_ERROR("lws_client_connect_via_info NG\n"); //goto end; } counterConnect=0; DEBUG_INFO("counterConnect: %d\n", counterConnect); end: pthread_exit(NULL/*(void *) fname*/); } int isQueueOverSize() { FILE *fp; uint32_t file_size; uint8_t result = FALSE; fp = fopen("/Storage/OCPP/TransactionRelatedQueue20" , "r"); if(fp != NULL) { fseek(fp, 0L, SEEK_END); file_size = ftell(fp); if(file_size > (100*1024*1024)) { result = TRUE; DEBUG_WARN("Queue file over size.\n"); } fclose(fp); } return result; } int showfront(char *uuid, char *data) { FILE *fp; int result = FALSE; // 1: TRUE 0:FALSE char str[QUEUE_MESSAGE_LENGTH]={0}; char sstr[50]={ 0 };//sstr[200]={ 0 }; int c = 0; char *loc; char rmFileCmd[100]={0}; struct stat stats; stat("/Storage/OCPP", &stats); // Check for directory existence if (S_ISDIR(stats.st_mode) == 1) { //DEBUG_INFO("\n OCPP directory exist \n"); } else { DEBUG_INFO("\n OCPP directory not exist, create dir \n"); sprintf(rmFileCmd,"mkdir -p %s","/Storage/OCPP"); system(rmFileCmd); } memset(rmFileCmd, 0, ARRAY_SIZE(rmFileCmd)); if((access("/Storage/OCPP/TransactionRelatedQueue20",F_OK))!=-1) { //DEBUG_INFO("TransactionRelatedQueue20 exist.\n"); } else { //DEBUG_INFO("TransactionRelatedQueue20 not exist\n"); FILE *log = fopen("/Storage/OCPP/TransactionRelatedQueue20", "w+"); if(log == NULL) { DEBUG_INFO("Can't Create File TransactionRelatedQueue20 \n"); return FALSE; } else { fclose(log); } } /* opening file for reading */ fp = fopen("/Storage/OCPP/TransactionRelatedQueue20" , "r"); if(fp == NULL) { DEBUG_INFO("Error opening TransactionRelatedQueue20 file"); return FALSE; } if( fgets (str, QUEUE_MESSAGE_LENGTH, fp)!=NULL ) { /* writing content to stdout */ //DEBUG_INFO("str=%s",str); if ((str[0] == '\n')||(strcmp(str,"")==0)) { DEBUG_INFO("It is a blank line"); fclose(fp); memset(rmFileCmd, 0, ARRAY_SIZE(rmFileCmd)); sprintf(rmFileCmd,"rm -f %s","/Storage/OCPP/TransactionRelatedQueue20"); system(rmFileCmd); result = FALSE; return result; } else { //puts(str); //----------------uuid--------------// loc = strstr(str, "\""); memset(sstr ,0, ARRAY_SIZE(sstr) ); c = 0; while (loc[1+c] != '\"') { sstr[c] = loc[1+c]; c++; } sstr[c] = '\0'; //DEBUG_INFO("\n uuid:%s", sstr); //DEBUG_INFO("\n data:%s", str); strcpy(uuid,sstr); strcpy(data,str); result = TRUE; } } else { //DEBUG_INFO("queue is null\n"); strcpy(uuid,""); strcpy(data,""); result = FALSE; } fclose(fp); return result; } int addq(char *uuid, char *data) { FILE *outfile; char rmFileCmd[100]={0}; struct stat stats; stat("/Storage/OCPP", &stats); DEBUG_INFO("addq\n"); // Check for directory existence if (S_ISDIR(stats.st_mode) == 1) { //DEBUG_INFO("\n OCPP directory exist \n"); } else { //DEBUG_INFO("\n OCPP directory not exist, create dir \n"); sprintf(rmFileCmd,"mkdir -p %s","/Storage/OCPP"); system(rmFileCmd); } memset(rmFileCmd, 0, ARRAY_SIZE(rmFileCmd)); if((access("/Storage/OCPP/TransactionRelatedQueue20",F_OK))!=-1) { //DEBUG_INFO("TransactionRelatedQueue20 exist.\n"); } else { //DEBUG_INFO("TransactionRelatedQueue20 not exist\n"); FILE *log = fopen("/Storage/OCPP/TransactionRelatedQueue20", "w+"); if(log == NULL) { //DEBUG_INFO("Can't Create File TransactionRelatedQueue20 \n"); return FALSE; } else { fclose(log); } } // open file for writing outfile = fopen ("/Storage/OCPP/TransactionRelatedQueue20", "a"); DEBUG_INFO("data = %s\n",data); fputs(data, outfile); fputs("\n", outfile); fclose (outfile); queueOpInfo.TransactionQueueNum += 1; DEBUG_INFO("add queue end\n"); system("/bin/fsync -d /dev/mtdblock13;/bin/sync &"); return FALSE; } int delq() { char tempfile[] = "/Storage/OCPP/delqtemp.json"; FILE *infile; FILE *outfile; int resultRename=0; char filename[60]={0}; char rmFileCmd[100]={0}; struct stat stats; stat("/Storage/OCPP", &stats); DEBUG_INFO("delq()\n"); // Check for directory existence if (S_ISDIR(stats.st_mode) == 1) { //DEBUG_INFO("\n OCPP directory exist \n"); } else { //DEBUG_INFO("\n OCPP directory not exist, create dir \n"); sprintf(rmFileCmd,"mkdir -p %s","/Storage/OCPP"); system(rmFileCmd); } memset(rmFileCmd, 0, ARRAY_SIZE(rmFileCmd)); if((access("/Storage/OCPP/TransactionRelatedQueue20",F_OK))!=-1) { //DEBUG_INFO("TransactionRelatedQueue20 exist.\n"); } else { //DEBUG_INFO("TransactionRelatedQueue20 not exist\n"); FILE *log = fopen("/Storage/OCPP/TransactionRelatedQueue20", "w+"); if(log == NULL) { //DEBUG_INFO("log is NULL\n"); return 0; } else { fclose(log); } } // open file for writing strcpy(filename, "/Storage/OCPP/TransactionRelatedQueue20"); infile = fopen ("/Storage/OCPP/TransactionRelatedQueue20", "r"); outfile = fopen (tempfile, "w"); //DEBUG_INFO("feof(infile) =%d\n",feof(infile)); int c; c = fgetc(infile); //printf("file c:%d\n",c); rewind(infile); if(c == EOF) { //DEBUG_INFO("TransactionRelatedQueue20 is NULL\n"); fclose(infile); fclose(outfile); sprintf(rmFileCmd,"rm -f %s",tempfile); system(rmFileCmd); } else { char buf[QUEUE_MESSAGE_LENGTH]={0}; int i = 0; //DEBUG_INFO("Orignal File is not NULL\n"); while (fgets(buf, sizeof(buf), infile) != NULL) { //printf("Orignal File get strings \n"); buf[strlen(buf) - 1] = '\0'; // eat the newline fgets() stores if(i==0) { queueOpInfo.TransactionQueueNum -= 1; queueOpInfo.TransactionMessageResend = 0; DEBUG_INFO("delete the item\n"); } if(i != 0) { fprintf(outfile,"%s\n", buf); } i = i + 1; } fclose(infile); fclose(outfile); sprintf(rmFileCmd,"rm -f %s",filename); system(rmFileCmd); resultRename = rename(tempfile, filename); if(resultRename == 0) { //DEBUG_INFO("TransactionRelatedQueue20 file renamed successfully"); } else { //DEBUG_INFO("Error: unable to rename the TransactionRelatedQueue20 file"); } DEBUG_INFO("delq() end\n"); } system("/bin/fsync -d /dev/mtdblock13;/bin/sync &"); return 0; } int showqueue() { char rmFileCmd[100]={0}; struct stat stats; stat("/Storage/OCPP", &stats); // Check for directory existence if (S_ISDIR(stats.st_mode) == 1) { //DEBUG_INFO("\n OCPP directory exist \n"); } else { //DEBUG_INFO("\n OCPP directory not exist, create dir \n"); sprintf(rmFileCmd,"mkdir -p %s","/Storage/OCPP"); system(rmFileCmd); } memset(rmFileCmd, 0, ARRAY_SIZE(rmFileCmd)); if((access("/Storage/OCPP/TransactionRelatedQueue20",F_OK))!=-1) { //DEBUG_INFO("TransactionRelatedQueue20 exist.\n"); } else { //DEBUG_INFO("TransactionRelatedQueue20 not exist\n"); FILE *log = fopen("/Storage/OCPP/TransactionRelatedQueue20", "w+"); if(log == NULL) { DEBUG_INFO("log is NULL\n"); return FALSE; } else { fclose(log); } } FILE *fp = fopen("/Storage/OCPP/TransactionRelatedQueue20", "r"); char line[QUEUE_MESSAGE_LENGTH]={0}; // check if file exist (and you can open it) or not if (fp == NULL) { DEBUG_INFO("can open file TransactionRelatedQueue20!"); return FALSE; } queueOpInfo.TransactionQueueNum = 0; // the number of packets in queue while(fgets(line, sizeof line, fp) != NULL) { //DEBUG_INFO("%s\n", line); queueOpInfo.TransactionQueueNum += 1; //the number of packets in queue } fclose(fp); return TRUE; } int sentqueue(){ FILE *fp; int result = FALSE; // 1: TRUE 0:FALSE char str[QUEUE_MESSAGE_LENGTH]={0}; char cmdBuf[100]={0}; struct stat stats; json_object *queueJson; DEBUG_INFO("Sent queue.\n"); stat("/Storage/OCPP", &stats); // Check for directory existence if (S_ISDIR(stats.st_mode) == 1) { //DEBUG_INFO("\n OCPP directory exist \n"); } else { //DEBUG_INFO("\n OCPP directory not exist, create dir \n"); sprintf(cmdBuf,"mkdir -p %s","/Storage/OCPP"); system(cmdBuf); } /* opening file for reading */ fp = fopen("/Storage/OCPP/TransactionRelatedQueue20" , "r"); if(fp == NULL) { DEBUG_ERROR("Error opening file"); return FALSE; } if( fgets (str, QUEUE_MESSAGE_LENGTH, fp)!=NULL ) { queueJson = json_tokener_parse(str); if(!is_error(queueJson)) { LWS_Send(str); } json_object_put(queueJson); result = TRUE; } else { result = FALSE; } fclose(fp); return result; } void* processTransactionQueue(void* data) { char frontUUID[100] ={0}; char frontData[QUEUE_MESSAGE_LENGTH] ={0}; int queueNotEmpty = FALSE; while(1) { if(!req_SendQueue && ((getDiffSecNow(startTime.startTimeQueue) >= ((TransactionMessageRetryIntervalGet()>10?TransactionMessageRetryIntervalGet():10)*(queueOpInfo.TransactionMessageResend>1?2:1))) || (isWebsocketSendable && isQueueSendable && (getDiffSecNow(startTime.startTimeQueue) >= ((counterQueueSent>=20)?5:0))))) { if(FirstHeartBeatResponse() == 1) { memset(frontUUID, 0, ARRAY_SIZE(frontUUID)); memset(frontData, 0, ARRAY_SIZE(frontData)); queueNotEmpty = queue_operation(QUEUE_OPERATION_SHOWFRONT,frontUUID, frontData); if((queueNotEmpty == TRUE) && (GetOcppConnStatus() == 1)) { if(isWebsocketSendable) DEBUG_INFO("isWebsocketSendable on.\n"); if(isQueueSendable) DEBUG_INFO("isQueueSendable on.\n"); if(((getDiffSecNow(startTime.startTimeQueue) > (TransactionMessageRetryIntervalGet()>10?TransactionMessageRetryIntervalGet():10)))) DEBUG_INFO("Queue timer(%d) over spec(%d).\n", getDiffSecNow(startTime.startTimeQueue), TransactionMessageRetryIntervalGet()); if(queueOpInfo.TransactionMessageResend < TransactionMessageAttemptsGet()) { DEBUG_INFO("Sent message from queue request.\n"); DEBUG_INFO("TransactionMessageResend time: %d\n", queueOpInfo.TransactionMessageResend); req_SendQueue = 1; queueOpInfo.TransactionMessageResend += 1; } else { DEBUG_INFO("Transaction message resend(%d) over spec(%d) message abandon.\n", queueOpInfo.TransactionMessageResend, TransactionMessageAttemptsGet()); queue_operation(QUEUE_OPERATION_DEL,"",""); queueOpInfo.TransactionMessageResend = 0; req_SendQueue = 0; } } } // Refresh queue timer refreshStartTimer(&startTime.startTimeQueue); if((counterQueueSent >= 10) || (queueNotEmpty == FALSE)) { counterQueueSent = 0; } else { counterQueueSent += 1; } } usleep(500000); } pthread_exit(NULL); return 0; } void* processWatchdog() { for(;;) { if((getDiffSecNow(startTime.disconnectServer) >= 7200)) { DEBUG_INFO("OCPP server disconnect timer(%d) over 7200 seconds.\n", getDiffSecNow(startTime.disconnectServer)); system("killall OcppBackend20"); } if(counterConnect >= 3) { DEBUG_INFO("Connect OCPP server timeout over 3 count.\n"); system("killall OcppBackend20"); } if((0 < GetWebSocketPingInterval()) && (0 < counterPingSend) && ((GetWebSocketPingInterval()+5) <= getDiffSecNow(startTime.pingOn)) && (wsi_client != NULL) && (GetServerSign() == TRUE)) { DEBUG_WARN("Pong packet receive timeout.\n"); //system("killall OcppBackend20"); lws_context_destroy(context); ConnectionEstablished = 0; context = NULL; } usleep(500000); } pthread_exit(NULL); // } void CheckTransactionPacket(char *uuid) { char frontUUID[100]={0}; char frontData[QUEUE_MESSAGE_LENGTH]={0}; int queueNotEmpty = FALSE; int cmpResult = 0; queueNotEmpty = queue_operation(QUEUE_OPERATION_SHOWFRONT,frontUUID, frontData);//showfront(frontUUID, frontData); ---> remove temporally if(queueNotEmpty == TRUE) { cmpResult = strcmp(frontUUID, uuid); if (cmpResult == 0) { DEBUG_INFO("Receive queue response match.\n"); queue_operation(QUEUE_OPERATION_DEL,"","");//delq(); ---> remove temporally queueOpInfo.TransactionMessageResend = 0; } else DEBUG_INFO("Receive queue response mismatch.\n"); } } int queue_operation(int type, char *frontUUID, char *frontData) { int result=0; while(1) { if (!queueOpInfo.IsUsing ) { queueOpInfo.IsUsing = TRUE; if(type == QUEUE_OPERATION_SHOWQUEUE) // show items in queue { result = showqueue(); } else if(type == QUEUE_OPERATION_SHOWFRONT) // show first item { result = showfront(frontUUID, frontData); } else if(type == QUEUE_OPERATION_DEL) // delete item { result = delq(); } else if(type == QUEUE_OPERATION_SENT) // sent items in queue { result = sentqueue(); } else if(type == QUEUE_OPERATION_ADD) // add items to the queue { // If queue file over size only add start * stop transaction message if(!isQueueOverSize() || (strstr(frontData, "MeterValues") == NULL)) { result = addq(frontUUID, frontData); } } queueOpInfo.IsUsing = FALSE; break; } usleep(100000); } return result; } int removeMessageSentFile(void) { char rmFileCmd[100]={0}; struct stat stats; stat("/Storage/OCPP", &stats); // Check for directory existence if(S_ISDIR(stats.st_mode) == 1) { //DEBUG_INFO("\n OCPP directory exist \n"); } else { DEBUG_INFO("\n directory not exist, create dir \n"); sprintf(rmFileCmd,"mkdir -p %s","/Storage/OCPP"); system(rmFileCmd); } stat("/Storage/OCPP/TransactionRelatedQueue20", &stats); if(stats.st_size < 10) { memset(rmFileCmd, 0, ARRAY_SIZE(rmFileCmd)); if((access("/Storage/OCPP/MessageSent",F_OK))!=-1) { DEBUG_INFO("MessageSent file exist.\n"); sprintf(rmFileCmd,"rm -f %s","/Storage/OCPP/MessageSent"); system(rmFileCmd); } memset(rmFileCmd, 0, ARRAY_SIZE(rmFileCmd)); } return 0; } //================================================ // Main process //================================================ int main(void) { char rmFileCmd[100]={0}; struct stat stats; queueOpInfo.IsUsing = FALSE; queueOpInfo.TransactionMessageResend = 0; DEBUG_INFO("Module_OcppBackend20 task initialization...\n"); //lws_set_log_level(LLL_PARSER | LLL_HEADER | LLL_ERR | LLL_WARN | LLL_NOTICE | LLL_INFO | LLL_DEBUG | LLL_EXT | LLL_CLIENT | LLL_LATENCY , NULL); if(ProcessShareMemory()== FAIL) { return FAIL; } // Check & create OCPP dir stat("/Storage/OCPP", &stats); if(S_ISDIR(stats.st_mode) != 1) { DEBUG_INFO("OCPP directory not exist, create dir \n"); sprintf(rmFileCmd,"mkdir -p /Storage/OCPP"); system(rmFileCmd); } memset(rmFileCmd, 0, ARRAY_SIZE(rmFileCmd)); // Create Process: Resend Transaction refreshStartTimer(&startTime.disconnectServer); pthread_create(&tid_ProcQueue, NULL, processTransactionQueue, NULL); pthread_create(&tid_Watchdog, NULL, processWatchdog, NULL); // Sqlite3 initial if(DB_Initial() != PASS) { DEBUG_ERROR("OCPP 2.0 local database initial fail.\n"); return 0; } initialConfigurationTable(); removeMessageSentFile(); for(;;) { // Connect server if(ConnectionEstablished==0) // Check InternetConn 0: disconnected, 1: connected { isWebsocketSendable = 1; isQueueSendable = 1; SetOcppConnStatus(FALSE); SetServerSign(FALSE); InitialSystemValue(); if(getDiffSecNow(startTime.connect) >= 30) { DEBUG_INFO("Server connecting...\n"); pthread_create(&tid_connectServer, NULL, ConnectWsServer, NULL); refreshStartTimer(&startTime.connect); } CheckSystemValue(); } else { // Sign in if((GetServerSign() == FALSE) && (isConnectorInitMode(0) != TRUE) && ( (GetBootNotificationInterval()>0) ? (getDiffSecNow(startTime.bootNotification) >= GetBootNotificationInterval()) : (getDiffSecNow(startTime.bootNotification) >= 10) ) ) { sendBootNotificationRequest(); refreshStartTimer(&startTime.bootNotification); } // Check System Value CheckSystemValue(); // On line operation if(GetServerSign() == TRUE) { // Send message from queue if((req_SendQueue == 1) && (isWebsocketSendable || ((queueOpInfo.TransactionMessageResend > 1) && (queueOpInfo.PreTransactionMessageResend != queueOpInfo.TransactionMessageResend)))) { queue_operation(QUEUE_OPERATION_SENT, "", ""); req_SendQueue = 0; queueOpInfo.PreTransactionMessageResend = queueOpInfo.TransactionMessageResend; } // PING packet if(isWebsocketSendable && (0 < GetWebSocketPingInterval()) && ((GetWebSocketPingInterval()+counterPingSend) <= getDiffSecNow(startTime.pingOn))) { lws_callback_on_writable(wsi_client); counterPingSend++; } if(GetHeartBeatWithNOResponse() >= 30) { lws_context_destroy(context); ConnectionEstablished = 0; context = NULL; SetHeartBeatWithNOResponse(); DEBUG_WARN("Heartbeat re-send over 30 count.\n"); } if((GetOcppConnStatus() == 0)) { if(getDiffSecNow(startTime.reConnect) >= 3) { DEBUG_INFO("GetOcppConnStatus() = %d\n", GetOcppConnStatus()); lws_context_destroy(context); ConnectionEstablished = 0; context = NULL; } } else { refreshStartTimer(&startTime.reConnect); } refreshStartTimer(&startTime.disconnectServer); } } do { lws_service(context, 0);//timeout_ms }while((SendBufLen>0) && (context!=NULL) && GetOcppConnStatus()); refreshProcDogTimer(); usleep(100000); } pthread_join(tid_ProcQueue, NULL); pthread_join(tid_Watchdog, NULL); return FAIL; }