streaming.cpp 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. #include <msgpack.hpp>
  2. #include <gtest/gtest.h>
  3. #include <sstream>
  4. TEST(streaming, basic)
  5. {
  6. msgpack::sbuffer buffer;
  7. msgpack::packer<msgpack::sbuffer> pk(&buffer);
  8. pk.pack(1);
  9. pk.pack(2);
  10. pk.pack(3);
  11. const char* input = buffer.data();
  12. const char* const eof = input + buffer.size();
  13. msgpack::unpacker pac;
  14. msgpack::object_handle oh;
  15. int count = 0;
  16. while(count < 3) {
  17. pac.reserve_buffer(32*1024);
  18. // read buffer into pac.buffer() upto
  19. // pac.buffer_capacity() bytes.
  20. size_t len = 1;
  21. memcpy(pac.buffer(), input, len);
  22. input += len;
  23. pac.buffer_consumed(len);
  24. while(pac.next(oh)) {
  25. msgpack::object obj = oh.get();
  26. switch(count++) {
  27. case 0:
  28. EXPECT_EQ(1, obj.as<int>());
  29. break;
  30. case 1:
  31. EXPECT_EQ(2, obj.as<int>());
  32. break;
  33. case 2:
  34. EXPECT_EQ(3, obj.as<int>());
  35. return;
  36. }
  37. }
  38. EXPECT_TRUE(input < eof);
  39. }
  40. }
  41. // obsolete
  42. #if MSGPACK_DEFAULT_API_VERSION == 1
  43. TEST(streaming, basic_pointer)
  44. {
  45. msgpack::sbuffer buffer;
  46. msgpack::packer<msgpack::sbuffer> pk(&buffer);
  47. pk.pack(1);
  48. pk.pack(2);
  49. pk.pack(3);
  50. const char* input = buffer.data();
  51. const char* const eof = input + buffer.size();
  52. msgpack::unpacker pac;
  53. msgpack::object_handle oh;
  54. int count = 0;
  55. while(count < 3) {
  56. pac.reserve_buffer(32*1024);
  57. // read buffer into pac.buffer() upto
  58. // pac.buffer_capacity() bytes.
  59. size_t len = 1;
  60. memcpy(pac.buffer(), input, len);
  61. input += len;
  62. pac.buffer_consumed(len);
  63. #if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 2))
  64. #pragma GCC diagnostic push
  65. #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
  66. #endif // (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 2))
  67. while(pac.next(&oh)) {
  68. #if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 2))
  69. #pragma GCC diagnostic pop
  70. #endif // (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 2))
  71. msgpack::object obj = oh.get();
  72. switch(count++) {
  73. case 0:
  74. EXPECT_EQ(1, obj.as<int>());
  75. break;
  76. case 1:
  77. EXPECT_EQ(2, obj.as<int>());
  78. break;
  79. case 2:
  80. EXPECT_EQ(3, obj.as<int>());
  81. return;
  82. }
  83. }
  84. EXPECT_TRUE(input < eof);
  85. }
  86. }
  87. #endif // MSGPACK_DEFAULT_API_VERSION == 1
  88. #if !defined(MSGPACK_USE_CPP03)
  89. TEST(streaming, move)
  90. {
  91. msgpack::sbuffer buffer;
  92. msgpack::packer<msgpack::sbuffer> pk(&buffer);
  93. pk.pack(1);
  94. pk.pack(2);
  95. pk.pack(3);
  96. const char* input = buffer.data();
  97. const char* const eof = input + buffer.size();
  98. msgpack::unpacker pac;
  99. msgpack::object_handle oh;
  100. int count = 0;
  101. while(count < 3) {
  102. msgpack::unpacker pac_in(std::move(pac));
  103. pac_in.reserve_buffer(32*1024);
  104. // read buffer into pac_in.buffer() upto
  105. // pac_in.buffer_capac_inity() bytes.
  106. size_t len = 1;
  107. memcpy(pac_in.buffer(), input, len);
  108. input += len;
  109. pac_in.buffer_consumed(len);
  110. while(pac_in.next(oh)) {
  111. msgpack::object obj = oh.get();
  112. switch(count++) {
  113. case 0:
  114. EXPECT_EQ(1, obj.as<int>());
  115. break;
  116. case 1:
  117. EXPECT_EQ(2, obj.as<int>());
  118. break;
  119. case 2:
  120. EXPECT_EQ(3, obj.as<int>());
  121. return;
  122. }
  123. }
  124. EXPECT_TRUE(input < eof);
  125. pac = std::move(pac_in);
  126. }
  127. }
  128. #endif // !defined(MSGPACK_USE_CPP03)
  129. class event_handler {
  130. public:
  131. event_handler(std::istream& input) : input(input) { }
  132. ~event_handler() { }
  133. void on_read()
  134. {
  135. while(true) {
  136. pac.reserve_buffer(32*1024);
  137. size_t len = static_cast<size_t>(input.readsome(pac.buffer(), pac.buffer_capacity()));
  138. if(len == 0) {
  139. return;
  140. }
  141. pac.buffer_consumed(len);
  142. msgpack::object_handle oh;
  143. while(pac.next(oh)) {
  144. on_message(oh.get(), msgpack::move(oh.zone()));
  145. }
  146. if(pac.message_size() > 10*1024*1024) {
  147. throw std::runtime_error("message is too large");
  148. }
  149. }
  150. }
  151. void on_message(msgpack::object obj, msgpack::unique_ptr<msgpack::zone>)
  152. {
  153. EXPECT_EQ(expect, obj.as<int>());
  154. }
  155. int expect;
  156. private:
  157. std::istream& input;
  158. msgpack::unpacker pac;
  159. };
  160. TEST(streaming, event)
  161. {
  162. std::stringstream stream;
  163. msgpack::packer<std::ostream> pk(&stream);
  164. event_handler handler(stream);
  165. pk.pack(1);
  166. handler.expect = 1;
  167. handler.on_read();
  168. pk.pack(2);
  169. handler.expect = 2;
  170. handler.on_read();
  171. pk.pack(3);
  172. handler.expect = 3;
  173. handler.on_read();
  174. }
  175. // obsolete
  176. #if MSGPACK_DEFAULT_API_VERSION == 1
  177. // backward compatibility
  178. TEST(streaming, basic_compat)
  179. {
  180. std::ostringstream stream;
  181. msgpack::packer<std::ostream> pk(&stream);
  182. pk.pack(1);
  183. pk.pack(2);
  184. pk.pack(3);
  185. std::istringstream input(stream.str());
  186. msgpack::unpacker pac;
  187. int count = 0;
  188. while(count < 3) {
  189. pac.reserve_buffer(32*1024);
  190. size_t len = static_cast<size_t>(input.readsome(pac.buffer(), pac.buffer_capacity()));
  191. pac.buffer_consumed(len);
  192. while(pac.execute()) {
  193. msgpack::unique_ptr<msgpack::zone> z(pac.release_zone());
  194. msgpack::object obj = pac.data();
  195. pac.reset();
  196. switch(count++) {
  197. case 0:
  198. EXPECT_EQ(1, obj.as<int>());
  199. break;
  200. case 1:
  201. EXPECT_EQ(2, obj.as<int>());
  202. break;
  203. case 2:
  204. EXPECT_EQ(3, obj.as<int>());
  205. return;
  206. }
  207. }
  208. }
  209. }
  210. // backward compatibility
  211. class event_handler_compat {
  212. public:
  213. event_handler_compat(std::istream& input) : input(input) { }
  214. ~event_handler_compat() { }
  215. void on_read()
  216. {
  217. while(true) {
  218. pac.reserve_buffer(32*1024);
  219. size_t len = static_cast<size_t>(input.readsome(pac.buffer(), pac.buffer_capacity()));
  220. if(len == 0) {
  221. return;
  222. }
  223. pac.buffer_consumed(len);
  224. while(pac.execute()) {
  225. msgpack::unique_ptr<msgpack::zone> z(pac.release_zone());
  226. msgpack::object obj = pac.data();
  227. pac.reset();
  228. on_message(obj, msgpack::move(z));
  229. }
  230. if(pac.message_size() > 10*1024*1024) {
  231. throw std::runtime_error("message is too large");
  232. }
  233. }
  234. }
  235. void on_message(msgpack::object obj, msgpack::unique_ptr<msgpack::zone>)
  236. {
  237. EXPECT_EQ(expect, obj.as<int>());
  238. }
  239. int expect;
  240. private:
  241. std::istream& input;
  242. msgpack::unpacker pac;
  243. };
  244. TEST(streaming, event_compat)
  245. {
  246. std::stringstream stream;
  247. msgpack::packer<std::ostream> pk(&stream);
  248. event_handler_compat handler(stream);
  249. pk.pack(1);
  250. handler.expect = 1;
  251. handler.on_read();
  252. pk.pack(2);
  253. handler.expect = 2;
  254. handler.on_read();
  255. pk.pack(3);
  256. handler.expect = 3;
  257. handler.on_read();
  258. }
  259. #endif // !defined(MSGPACK_USE_CPP03)