asio_send_recv_zlib.cpp 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. // MessagePack for C++ example
  2. //
  3. // Copyright (C) 2017 KONDO Takatoshi
  4. //
  5. // Distributed under the Boost Software License, Version 1.0.
  6. // (See accompanying file LICENSE_1_0.txt or copy at
  7. // http://www.boost.org/LICENSE_1_0.txt)
  8. //
  9. #include <string>
  10. #include <sstream>
  11. #include <iostream>
  12. #include <boost/asio.hpp>
  13. #include <boost/lexical_cast.hpp>
  14. #include <msgpack.hpp>
  15. #include <msgpack/zbuffer.hpp>
  16. #include <zlib.h>
  17. void print(std::string const& buf) {
  18. for (std::string::const_iterator it = buf.begin(), end = buf.end();
  19. it != end;
  20. ++it) {
  21. std::cout
  22. << std::setw(2)
  23. << std::hex
  24. << std::setfill('0')
  25. << (static_cast<int>(*it) & 0xff)
  26. << ' ';
  27. }
  28. std::cout << std::dec << std::endl;
  29. }
  30. int main() {
  31. boost::asio::io_service ios;
  32. std::uint16_t const port = 12345;
  33. int num_of_zlib_data = 2;
  34. int idx_zlib_data = 0;
  35. // Server
  36. std::size_t const window_size = 11;
  37. boost::asio::ip::tcp::acceptor ac(ios, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port));
  38. boost::asio::ip::tcp::socket ss(ios);
  39. std::function<void()> do_accept;
  40. std::function<void()> do_async_read_some;
  41. // zlib for decompress
  42. z_stream strm;
  43. auto zlib_init = [&] {
  44. strm.zalloc = Z_NULL;
  45. strm.zfree = Z_NULL;
  46. strm.opaque = Z_NULL;
  47. strm.next_in = Z_NULL;
  48. {
  49. int zret = inflateInit(&strm);
  50. if (zret != Z_OK) {
  51. std::cout << "Zlib inflateInit() error = " << zret << std::endl;
  52. }
  53. }
  54. };
  55. zlib_init();
  56. std::vector<char> buf(4); // buf size
  57. msgpack::unpacker unp;
  58. do_accept = [&] {
  59. ac.async_accept(
  60. ss,
  61. [&]
  62. (boost::system::error_code const& e) {
  63. if (e) {
  64. std::cout << __LINE__ << ":" << e.message() << std::endl;
  65. return;
  66. }
  67. do_async_read_some = [&] {
  68. ss.async_read_some(
  69. boost::asio::buffer(buf),
  70. [&](boost::system::error_code const& e, std::size_t bytes_transferred) {
  71. if (e) {
  72. std::cout << __LINE__ << ":" << e.message() << std::endl;
  73. return;
  74. }
  75. std::cout << bytes_transferred << " bytes read." << std::endl;
  76. print(std::string(std::string(&buf[0], buf.size())));
  77. strm.avail_in = bytes_transferred;
  78. do {
  79. strm.next_in = reinterpret_cast<unsigned char*>(&buf[0]) + (bytes_transferred - strm.avail_in);
  80. int zret;
  81. unp.reserve_buffer(window_size);
  82. strm.avail_out = window_size;
  83. strm.next_out = reinterpret_cast<unsigned char*>(unp.buffer());
  84. do {
  85. zret = inflate(&strm, Z_NO_FLUSH);
  86. assert(zret != Z_STREAM_ERROR);
  87. switch (zret) {
  88. case Z_NEED_DICT:
  89. zret = Z_DATA_ERROR;
  90. // fall through
  91. case Z_DATA_ERROR:
  92. case Z_MEM_ERROR:
  93. inflateEnd(&strm);
  94. std::cout << "Zlib inflate() error = " << zret << std::endl;
  95. std::exit(-1);
  96. }
  97. std::size_t decompressed_size = window_size - strm.avail_out;
  98. std::cout << decompressed_size << " bytes decompressed." << std::endl;
  99. unp.buffer_consumed(decompressed_size);
  100. msgpack::object_handle oh;
  101. while (unp.next(oh)) {
  102. std::cout << oh.get() << std::endl;
  103. }
  104. } while (strm.avail_out == 0);
  105. if (zret == Z_STREAM_END) {
  106. inflateEnd(&strm);
  107. std::cout << "Zlib decompress finished." << std::endl;
  108. ++idx_zlib_data;
  109. if (idx_zlib_data == num_of_zlib_data) {
  110. std::cout << "All zlib decompress finished." << std::endl;
  111. return;
  112. }
  113. zlib_init();
  114. }
  115. } while (strm.avail_in != 0);
  116. do_async_read_some();
  117. }
  118. );
  119. };
  120. do_async_read_some();
  121. }
  122. );
  123. };
  124. do_accept();
  125. // Client
  126. auto host = "localhost";
  127. boost::asio::ip::tcp::resolver r(ios);
  128. #if BOOST_VERSION < 106600
  129. boost::asio::ip::tcp::resolver::query q(host, boost::lexical_cast<std::string>(port));
  130. auto it = r.resolve(q);
  131. boost::asio::ip::tcp::resolver::iterator end;
  132. #else // BOOST_VERSION < 106600
  133. auto eps = r.resolve(host, boost::lexical_cast<std::string>(port));
  134. auto it = eps.begin();
  135. auto end = eps.end();
  136. #endif // BOOST_VERSION < 106600
  137. boost::asio::ip::tcp::socket cs(ios);
  138. boost::asio::async_connect(
  139. cs,
  140. it,
  141. end,
  142. [&]
  143. (boost::system::error_code const& e, boost::asio::ip::tcp::resolver::iterator) {
  144. if (e) {
  145. std::cout << __LINE__ << ":" << e.message() << std::endl;
  146. return;
  147. }
  148. std::cout << __LINE__ << ":client connected" << std::endl;
  149. for (int i = 0; i != num_of_zlib_data; ++i) {
  150. msgpack::zbuffer zb;
  151. msgpack::pack(zb, std::make_tuple(i, false, "hello world", 12.3456));
  152. zb.flush(); // finalize zbuffer (don't forget it)
  153. print(std::string(zb.data(), zb.size()));
  154. write(cs, boost::asio::buffer(zb.data(), zb.size()));
  155. }
  156. }
  157. );
  158. // Start
  159. ios.run();
  160. }