stream_unpack.cpp 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. // MessagePack for C++ example
  2. //
  3. // Copyright (C) 2017 KONDO Takatoshi
  4. //
  5. // Distributed under the Boost Software License, Version 1.0.
  6. // (See accompanying file LICENSE_1_0.txt or copy at
  7. // http://www.boost.org/LICENSE_1_0.txt)
  8. //
  9. #include <iostream>
  10. #include <sstream>
  11. #include <cassert>
  12. #include <thread>
  13. // MSGPACK_USE_X3_PARSE should be defined before including msgpack.hpp
  14. // It usually defined as a compiler option as -DMSGPACK_USE_X3_PARSE.
  15. //#define MSGPACK_USE_X3_PARSE
  16. #include <msgpack.hpp>
  17. #include <boost/asio.hpp>
  18. #include <boost/coroutine2/all.hpp>
  19. #if defined(__clang__)
  20. #pragma GCC diagnostic push
  21. #pragma GCC diagnostic ignored "-Wunused-parameter"
  22. #endif // defined(__clang__)
  23. #include <boost/spirit/home/support/multi_pass.hpp>
  24. #if defined(__clang__)
  25. #pragma GCC diagnostic pop
  26. #endif // defined(__clang__)
  27. namespace as = boost::asio;
  28. namespace x3 = boost::spirit::x3;
  29. namespace coro2 = boost::coroutines2;
  30. using pull_type = coro2::asymmetric_coroutine<std::shared_ptr<std::vector<char>>>::pull_type;
  31. // iterator fetching data from coroutine2.
  32. class buffered_iterator : public std::iterator<std::input_iterator_tag, char> {
  33. public:
  34. using pointer_t = typename iterator::pointer;
  35. using reference_t = typename iterator::reference;
  36. explicit buffered_iterator(pull_type& source) noexcept
  37. : source_{ &source } {
  38. fetch_();
  39. }
  40. buffered_iterator() = default;
  41. bool operator==(buffered_iterator const& other) const noexcept {
  42. if (!other.source_ && !source_ && !other.buf_ && !buf_) return true;
  43. return other.it_ == it_;
  44. }
  45. bool operator!=(buffered_iterator const& other) const noexcept {
  46. return !(other == *this);
  47. }
  48. buffered_iterator & operator++() {
  49. increment_();
  50. return * this;
  51. }
  52. buffered_iterator operator++(int) = delete;
  53. reference_t operator*() noexcept {
  54. return *it_;
  55. }
  56. pointer_t operator->() noexcept {
  57. return std::addressof(*it_);
  58. }
  59. private:
  60. void fetch_() noexcept {
  61. BOOST_ASSERT( nullptr != source_);
  62. if (*source_) {
  63. buf_ = source_->get();
  64. it_ = buf_->begin();
  65. }
  66. else {
  67. source_ = nullptr;
  68. buf_.reset();
  69. }
  70. }
  71. void increment_() {
  72. BOOST_ASSERT( nullptr != source_);
  73. BOOST_ASSERT(*source_);
  74. if (++it_ == buf_->end()) {
  75. (*source_)();
  76. fetch_();
  77. }
  78. }
  79. private:
  80. pull_type* source_{ nullptr };
  81. std::shared_ptr<std::vector<char>> buf_;
  82. std::vector<char>::iterator it_;
  83. };
  84. // session class that corresponding to each client
  85. class session : public std::enable_shared_from_this<session> {
  86. public:
  87. session(as::ip::tcp::socket socket)
  88. : socket_(std::move(socket)) {
  89. }
  90. void start() {
  91. sink_ = std::make_shared<coro2::asymmetric_coroutine<std::shared_ptr<std::vector<char>>>::push_type>(
  92. [&, this](pull_type& source) {
  93. // *1 is started when the first sink is called.
  94. std::cout << "session started" << std::endl;
  95. do_read();
  96. source();
  97. // use buffered_iterator here
  98. // b is incremented in msgpack::unpack() and fetch data from sink
  99. // via coroutine2 mechanism
  100. auto b = boost::spirit::make_default_multi_pass(buffered_iterator(source));
  101. auto e = boost::spirit::make_default_multi_pass(buffered_iterator());
  102. // This is usually an infinity look, but for test, loop is finished when
  103. // two message pack data is processed.
  104. for (int i = 0; i != 2; ++i) {
  105. auto oh = msgpack::unpack(b, e);
  106. std::cout << oh.get() << std::endl;
  107. }
  108. }
  109. );
  110. // send dummy data to start *1
  111. (*sink_)({});
  112. }
  113. private:
  114. void do_read() {
  115. std::cout << "session do_read() is called" << std::endl;
  116. auto self(shared_from_this());
  117. auto data = std::make_shared<std::vector<char>>(static_cast<std::size_t>(max_length));
  118. socket_.async_read_some(
  119. boost::asio::buffer(*data),
  120. [this, self, data]
  121. (boost::system::error_code ec, std::size_t length) {
  122. if (!ec) {
  123. data->resize(length);
  124. (*sink_)(data);
  125. do_read();
  126. }
  127. }
  128. );
  129. }
  130. as::ip::tcp::socket socket_;
  131. static constexpr std::size_t const max_length = 1024;
  132. std::shared_ptr<coro2::asymmetric_coroutine<std::shared_ptr<std::vector<char>>>::push_type> sink_;
  133. };
  134. class server {
  135. public:
  136. server(
  137. as::io_service& ios,
  138. std::uint16_t port)
  139. : acceptor_(ios, as::ip::tcp::endpoint(as::ip::tcp::v4(), port)),
  140. socket_(ios) {
  141. do_accept();
  142. std::cout << "server start accept" << std::endl;
  143. ios.run();
  144. }
  145. private:
  146. void do_accept() {
  147. acceptor_.async_accept(
  148. socket_,
  149. [this](boost::system::error_code ec) {
  150. if (!ec) {
  151. std::make_shared<session>(std::move(socket_))->start();
  152. }
  153. // for test, only one session is accepted.
  154. // do_accept();
  155. }
  156. );
  157. }
  158. as::ip::tcp::acceptor acceptor_;
  159. as::ip::tcp::socket socket_;
  160. };
  161. int main() {
  162. std::thread srv(
  163. []{
  164. boost::asio::io_service ios;
  165. server s(ios, 12345);
  166. }
  167. );
  168. std::thread cli(
  169. []{
  170. std::this_thread::sleep_for(std::chrono::seconds(1));
  171. std::cout << "client start" << std::endl;
  172. std::stringstream ss;
  173. std::map<std::string, std::vector<int>> v1 {
  174. { "ABC", { 1, 2, 3 } },
  175. { "DEFG", { 4, 5 } }
  176. };
  177. std::vector<std::string> v2 {
  178. "HIJ", "KLM", "NOP"
  179. };
  180. msgpack::pack(ss, v1);
  181. msgpack::pack(ss, v2);
  182. auto send_data = ss.str();
  183. boost::asio::io_service ios;
  184. as::ip::tcp::resolver::query q("127.0.0.1", "12345");
  185. as::ip::tcp::resolver r(ios);
  186. auto it = r.resolve(q);
  187. std::cout << "client connect" << std::endl;
  188. as::ip::tcp::socket s(ios);
  189. as::connect(s, it);
  190. std::size_t const size = 5;
  191. std::size_t rest = send_data.size();
  192. std::size_t index = 0;
  193. while (rest != 0) {
  194. std::cout << "client send data" << std::endl;
  195. auto send_size = size < rest ? size : rest;
  196. as::write(s, as::buffer(&send_data[index], send_size));
  197. rest -= send_size;
  198. index += send_size;
  199. std::cout << "client wait" << std::endl;
  200. std::this_thread::sleep_for(std::chrono::seconds(1));
  201. }
  202. }
  203. );
  204. cli.join();
  205. std::cout << "client joinded" << std::endl;
  206. srv.join();
  207. std::cout << "server joinded" << std::endl;
  208. }