123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332 |
- #include <msgpack.hpp>
- #include <gtest/gtest.h>
- #include <sstream>
- TEST(streaming, basic)
- {
- msgpack::sbuffer buffer;
- msgpack::packer<msgpack::sbuffer> pk(&buffer);
- pk.pack(1);
- pk.pack(2);
- pk.pack(3);
- const char* input = buffer.data();
- const char* const eof = input + buffer.size();
- msgpack::unpacker pac;
- msgpack::object_handle oh;
- int count = 0;
- while(count < 3) {
- pac.reserve_buffer(32*1024);
- // read buffer into pac.buffer() upto
- // pac.buffer_capacity() bytes.
- size_t len = 1;
- memcpy(pac.buffer(), input, len);
- input += len;
- pac.buffer_consumed(len);
- while(pac.next(oh)) {
- msgpack::object obj = oh.get();
- switch(count++) {
- case 0:
- EXPECT_EQ(1, obj.as<int>());
- break;
- case 1:
- EXPECT_EQ(2, obj.as<int>());
- break;
- case 2:
- EXPECT_EQ(3, obj.as<int>());
- return;
- }
- }
- EXPECT_TRUE(input < eof);
- }
- }
- // obsolete
- #if MSGPACK_DEFAULT_API_VERSION == 1
- TEST(streaming, basic_pointer)
- {
- msgpack::sbuffer buffer;
- msgpack::packer<msgpack::sbuffer> pk(&buffer);
- pk.pack(1);
- pk.pack(2);
- pk.pack(3);
- const char* input = buffer.data();
- const char* const eof = input + buffer.size();
- msgpack::unpacker pac;
- msgpack::object_handle oh;
- int count = 0;
- while(count < 3) {
- pac.reserve_buffer(32*1024);
- // read buffer into pac.buffer() upto
- // pac.buffer_capacity() bytes.
- size_t len = 1;
- memcpy(pac.buffer(), input, len);
- input += len;
- pac.buffer_consumed(len);
- #if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 2))
- #pragma GCC diagnostic push
- #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
- #endif // (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 2))
- while(pac.next(&oh)) {
- #if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 2))
- #pragma GCC diagnostic pop
- #endif // (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 2))
- msgpack::object obj = oh.get();
- switch(count++) {
- case 0:
- EXPECT_EQ(1, obj.as<int>());
- break;
- case 1:
- EXPECT_EQ(2, obj.as<int>());
- break;
- case 2:
- EXPECT_EQ(3, obj.as<int>());
- return;
- }
- }
- EXPECT_TRUE(input < eof);
- }
- }
- #endif // MSGPACK_DEFAULT_API_VERSION == 1
- #if !defined(MSGPACK_USE_CPP03)
- TEST(streaming, move)
- {
- msgpack::sbuffer buffer;
- msgpack::packer<msgpack::sbuffer> pk(&buffer);
- pk.pack(1);
- pk.pack(2);
- pk.pack(3);
- const char* input = buffer.data();
- const char* const eof = input + buffer.size();
- msgpack::unpacker pac;
- msgpack::object_handle oh;
- int count = 0;
- while(count < 3) {
- msgpack::unpacker pac_in(std::move(pac));
- pac_in.reserve_buffer(32*1024);
- // read buffer into pac_in.buffer() upto
- // pac_in.buffer_capac_inity() bytes.
- size_t len = 1;
- memcpy(pac_in.buffer(), input, len);
- input += len;
- pac_in.buffer_consumed(len);
- while(pac_in.next(oh)) {
- msgpack::object obj = oh.get();
- switch(count++) {
- case 0:
- EXPECT_EQ(1, obj.as<int>());
- break;
- case 1:
- EXPECT_EQ(2, obj.as<int>());
- break;
- case 2:
- EXPECT_EQ(3, obj.as<int>());
- return;
- }
- }
- EXPECT_TRUE(input < eof);
- pac = std::move(pac_in);
- }
- }
- #endif // !defined(MSGPACK_USE_CPP03)
- class event_handler {
- public:
- event_handler(std::istream& input) : input(input) { }
- ~event_handler() { }
- void on_read()
- {
- while(true) {
- pac.reserve_buffer(32*1024);
- size_t len = static_cast<size_t>(input.readsome(pac.buffer(), pac.buffer_capacity()));
- if(len == 0) {
- return;
- }
- pac.buffer_consumed(len);
- msgpack::object_handle oh;
- while(pac.next(oh)) {
- on_message(oh.get(), msgpack::move(oh.zone()));
- }
- if(pac.message_size() > 10*1024*1024) {
- throw std::runtime_error("message is too large");
- }
- }
- }
- void on_message(msgpack::object obj, msgpack::unique_ptr<msgpack::zone>)
- {
- EXPECT_EQ(expect, obj.as<int>());
- }
- int expect;
- private:
- std::istream& input;
- msgpack::unpacker pac;
- };
- TEST(streaming, event)
- {
- std::stringstream stream;
- msgpack::packer<std::ostream> pk(&stream);
- event_handler handler(stream);
- pk.pack(1);
- handler.expect = 1;
- handler.on_read();
- pk.pack(2);
- handler.expect = 2;
- handler.on_read();
- pk.pack(3);
- handler.expect = 3;
- handler.on_read();
- }
- // obsolete
- #if MSGPACK_DEFAULT_API_VERSION == 1
- // backward compatibility
- TEST(streaming, basic_compat)
- {
- std::ostringstream stream;
- msgpack::packer<std::ostream> pk(&stream);
- pk.pack(1);
- pk.pack(2);
- pk.pack(3);
- std::istringstream input(stream.str());
- msgpack::unpacker pac;
- int count = 0;
- while(count < 3) {
- pac.reserve_buffer(32*1024);
- size_t len = static_cast<size_t>(input.readsome(pac.buffer(), pac.buffer_capacity()));
- pac.buffer_consumed(len);
- while(pac.execute()) {
- msgpack::unique_ptr<msgpack::zone> z(pac.release_zone());
- msgpack::object obj = pac.data();
- pac.reset();
- switch(count++) {
- case 0:
- EXPECT_EQ(1, obj.as<int>());
- break;
- case 1:
- EXPECT_EQ(2, obj.as<int>());
- break;
- case 2:
- EXPECT_EQ(3, obj.as<int>());
- return;
- }
- }
- }
- }
- // backward compatibility
- class event_handler_compat {
- public:
- event_handler_compat(std::istream& input) : input(input) { }
- ~event_handler_compat() { }
- void on_read()
- {
- while(true) {
- pac.reserve_buffer(32*1024);
- size_t len = static_cast<size_t>(input.readsome(pac.buffer(), pac.buffer_capacity()));
- if(len == 0) {
- return;
- }
- pac.buffer_consumed(len);
- while(pac.execute()) {
- msgpack::unique_ptr<msgpack::zone> z(pac.release_zone());
- msgpack::object obj = pac.data();
- pac.reset();
- on_message(obj, msgpack::move(z));
- }
- if(pac.message_size() > 10*1024*1024) {
- throw std::runtime_error("message is too large");
- }
- }
- }
- void on_message(msgpack::object obj, msgpack::unique_ptr<msgpack::zone>)
- {
- EXPECT_EQ(expect, obj.as<int>());
- }
- int expect;
- private:
- std::istream& input;
- msgpack::unpacker pac;
- };
- TEST(streaming, event_compat)
- {
- std::stringstream stream;
- msgpack::packer<std::ostream> pk(&stream);
- event_handler_compat handler(stream);
- pk.pack(1);
- handler.expect = 1;
- handler.on_read();
- pk.pack(2);
- handler.expect = 2;
- handler.on_read();
- pk.pack(3);
- handler.expect = 3;
- handler.on_read();
- }
- #endif // !defined(MSGPACK_USE_CPP03)
|