03-request-response-correlation-1.c 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. #include <stdbool.h>
  2. #include <stdio.h>
  3. #include <stdlib.h>
  4. #include <string.h>
  5. #include <mosquitto.h>
  6. #include <mqtt_protocol.h>
  7. static int run = -1;
  8. static int sent_mid = -1;
  9. void on_connect(struct mosquitto *mosq, void *obj, int rc)
  10. {
  11. if(rc){
  12. exit(1);
  13. }else{
  14. mosquitto_subscribe(mosq, NULL, "response/topic", 0);
  15. }
  16. }
  17. void on_subscribe(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos)
  18. {
  19. mosquitto_property *props = NULL;
  20. mosquitto_property_add_string(&props, MQTT_PROP_RESPONSE_TOPIC, "response/topic");
  21. mosquitto_property_add_binary(&props, MQTT_PROP_CORRELATION_DATA, "corridor", 8);
  22. mosquitto_publish_v5(mosq, NULL, "request/topic", 6, "action", 0, 0, props);
  23. mosquitto_property_free_all(&props);
  24. }
  25. void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg)
  26. {
  27. if(!strcmp(msg->payload, "a response")){
  28. run = 0;
  29. }else{
  30. run = 1;
  31. }
  32. }
  33. int main(int argc, char *argv[])
  34. {
  35. int rc;
  36. struct mosquitto *mosq;
  37. int ver = PROTOCOL_VERSION_v5;
  38. int port = atoi(argv[1]);
  39. mosquitto_lib_init();
  40. mosq = mosquitto_new("request-test", true, NULL);
  41. if(mosq == NULL){
  42. return 1;
  43. }
  44. mosquitto_opts_set(mosq, MOSQ_OPT_PROTOCOL_VERSION, &ver);
  45. mosquitto_connect_callback_set(mosq, on_connect);
  46. mosquitto_subscribe_callback_set(mosq, on_subscribe);
  47. mosquitto_message_callback_set(mosq, on_message);
  48. rc = mosquitto_connect(mosq, "localhost", port, 60);
  49. while(run == -1){
  50. rc = mosquitto_loop(mosq, -1, 1);
  51. }
  52. mosquitto_destroy(mosq);
  53. mosquitto_lib_cleanup();
  54. return run;
  55. }