mysql_log.c 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. #include <signal.h>
  2. #include <stdio.h>
  3. #include <string.h>
  4. #ifndef WIN32
  5. # include <unistd.h>
  6. #else
  7. # include <process.h>
  8. # define snprintf sprintf_s
  9. #endif
  10. #include <mosquitto.h>
  11. #include <mysql/mysql.h>
  12. #define db_host "localhost"
  13. #define db_username "mqtt_log"
  14. #define db_password "password"
  15. #define db_database "mqtt_log"
  16. #define db_port 3306
  17. #define db_query "INSERT INTO mqtt_log (topic, payload) VALUES (?,?)"
  18. #define mqtt_host "localhost"
  19. #define mqtt_port 1883
  20. static int run = 1;
  21. static MYSQL_STMT *stmt = NULL;
  22. void handle_signal(int s)
  23. {
  24. run = 0;
  25. }
  26. void connect_callback(struct mosquitto *mosq, void *obj, int result)
  27. {
  28. }
  29. void message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
  30. {
  31. MYSQL_BIND bind[2];
  32. memset(bind, 0, sizeof(bind));
  33. bind[0].buffer_type = MYSQL_TYPE_STRING;
  34. bind[0].buffer = message->topic;
  35. bind[0].buffer_length = strlen(message->topic);
  36. // Note: payload is normally a binary blob and could contains
  37. // NULL byte. This sample does not handle it and assume payload is a
  38. // string.
  39. bind[1].buffer_type = MYSQL_TYPE_STRING;
  40. bind[1].buffer = message->payload;
  41. bind[1].buffer_length = message->payloadlen;
  42. mysql_stmt_bind_param(stmt, bind);
  43. mysql_stmt_execute(stmt);
  44. }
  45. int main(int argc, char *argv[])
  46. {
  47. MYSQL *connection;
  48. my_bool reconnect = true;
  49. char clientid[24];
  50. struct mosquitto *mosq;
  51. int rc = 0;
  52. signal(SIGINT, handle_signal);
  53. signal(SIGTERM, handle_signal);
  54. mysql_library_init(0, NULL, NULL);
  55. mosquitto_lib_init();
  56. connection = mysql_init(NULL);
  57. if(connection){
  58. mysql_options(connection, MYSQL_OPT_RECONNECT, &reconnect);
  59. connection = mysql_real_connect(connection, db_host, db_username, db_password, db_database, db_port, NULL, 0);
  60. if(connection){
  61. stmt = mysql_stmt_init(connection);
  62. mysql_stmt_prepare(stmt, db_query, strlen(db_query));
  63. memset(clientid, 0, 24);
  64. snprintf(clientid, 23, "mysql_log_%d", getpid());
  65. mosq = mosquitto_new(clientid, true, connection);
  66. if(mosq){
  67. mosquitto_connect_callback_set(mosq, connect_callback);
  68. mosquitto_message_callback_set(mosq, message_callback);
  69. rc = mosquitto_connect(mosq, mqtt_host, mqtt_port, 60);
  70. mosquitto_subscribe(mosq, NULL, "#", 0);
  71. while(run){
  72. rc = mosquitto_loop(mosq, -1, 1);
  73. if(run && rc){
  74. sleep(20);
  75. mosquitto_reconnect(mosq);
  76. }
  77. }
  78. mosquitto_destroy(mosq);
  79. }
  80. mysql_stmt_close(stmt);
  81. mysql_close(connection);
  82. }else{
  83. fprintf(stderr, "Error: Unable to connect to database.\n");
  84. printf("%s\n", mysql_error(connection));
  85. rc = 1;
  86. }
  87. }else{
  88. fprintf(stderr, "Error: Unable to start mysql.\n");
  89. rc = 1;
  90. }
  91. mysql_library_end();
  92. mosquitto_lib_cleanup();
  93. return rc;
  94. }