stream.cpp 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. // MessagePack for C++ example
  2. //
  3. // Copyright (C) 2008-2015 FURUHASHI Sadayuki and KONDO Takatoshi
  4. //
  5. // Distributed under the Boost Software License, Version 1.0.
  6. // (See accompanying file LICENSE_1_0.txt or copy at
  7. // http://www.boost.org/LICENSE_1_0.txt)
  8. //
  9. #include <msgpack.hpp>
  10. #include <iostream>
  11. #include <stdexcept>
  12. #include <stdio.h>
  13. #include <string.h>
  14. #include <unistd.h>
  15. #include <errno.h>
  16. #include <pthread.h>
  17. #if defined(_MSC_VER) || defined(__MINGW32__)
  18. #include <io.h>
  19. #include <fcntl.h>
  20. #define pipe(fds) _pipe(fds, 4096, _O_BINARY)
  21. #endif // _MSC_VER || __MINGW32__
  22. class Server {
  23. public:
  24. Server(int sock) : m_sock(sock) { }
  25. ~Server() { }
  26. typedef msgpack::unique_ptr<msgpack::zone> unique_zone;
  27. void socket_readable()
  28. {
  29. m_pac.reserve_buffer(1024);
  30. ssize_t count =
  31. read(m_sock, m_pac.buffer(), m_pac.buffer_capacity());
  32. if(count <= 0) {
  33. if(count == 0) {
  34. throw std::runtime_error("connection closed");
  35. }
  36. if(errno == EAGAIN || errno == EINTR) {
  37. return;
  38. }
  39. throw std::runtime_error(strerror(errno));
  40. }
  41. m_pac.buffer_consumed(count);
  42. msgpack::object_handle oh;
  43. while (m_pac.next(oh)) {
  44. msgpack::object msg = oh.get();
  45. unique_zone& life = oh.zone();
  46. process_message(msg, life);
  47. }
  48. if(m_pac.message_size() > 10*1024*1024) {
  49. throw std::runtime_error("message is too large");
  50. }
  51. }
  52. private:
  53. void process_message(msgpack::object msg, unique_zone&)
  54. {
  55. std::cout << "message reached: " << msg << std::endl;
  56. }
  57. private:
  58. int m_sock;
  59. msgpack::unpacker m_pac;
  60. };
  61. static void* run_server(void* arg)
  62. {
  63. try {
  64. Server* srv = reinterpret_cast<Server*>(arg);
  65. while(true) {
  66. srv->socket_readable();
  67. }
  68. return NULL;
  69. } catch (std::exception& e) {
  70. std::cerr << "error while processing client packet: "
  71. << e.what() << std::endl;
  72. return NULL;
  73. } catch (...) {
  74. std::cerr << "error while processing client packet: "
  75. << "unknown error" << std::endl;
  76. return NULL;
  77. }
  78. }
  79. struct fwriter {
  80. fwriter(int fd) : m_fp( fdopen(fd, "w") ) { }
  81. void write(const char* buf, size_t buflen)
  82. {
  83. size_t count = fwrite(buf, buflen, 1, m_fp);
  84. if(count < 1) {
  85. std::cout << buflen << std::endl;
  86. std::cout << count << std::endl;
  87. throw std::runtime_error(strerror(errno));
  88. }
  89. }
  90. void flush() { fflush(m_fp); }
  91. void close() { fclose(m_fp); }
  92. private:
  93. FILE* m_fp;
  94. };
  95. int main(void)
  96. {
  97. int pair[2];
  98. if (pipe(pair) != 0) return -1;
  99. // run server thread
  100. Server srv(pair[0]);
  101. pthread_t thread;
  102. pthread_create(&thread, NULL,
  103. run_server, reinterpret_cast<void*>(&srv));
  104. // client thread:
  105. fwriter writer(pair[1]);
  106. msgpack::packer<fwriter> pk(writer);
  107. typedef msgpack::type::tuple<std::string, std::string, std::string> put_t;
  108. typedef msgpack::type::tuple<std::string, std::string> get_t;
  109. put_t req1("put", "apple", "red");
  110. put_t req2("put", "lemon", "yellow");
  111. get_t req3("get", "apple");
  112. pk.pack(req1);
  113. pk.pack(req2);
  114. pk.pack(req3);
  115. writer.flush();
  116. writer.close();
  117. pthread_join(thread, NULL);
  118. }