diff --git a/include/f1x/aasdk/Messenger/MessageInStream.hpp b/include/f1x/aasdk/Messenger/MessageInStream.hpp index 262c3f5..c8e24b0 100644 --- a/include/f1x/aasdk/Messenger/MessageInStream.hpp +++ b/include/f1x/aasdk/Messenger/MessageInStream.hpp @@ -23,7 +23,7 @@ #include #include #include - +#include namespace f1x { namespace aasdk @@ -51,6 +51,8 @@ private: FrameType recentFrameType_; ReceivePromise::Pointer promise_; Message::Pointer message_; + + std::map channel_assembly_buffers; }; } diff --git a/src/Messenger/MessageInStream.cpp b/src/Messenger/MessageInStream.cpp index 72a666f..81084da 100644 --- a/src/Messenger/MessageInStream.cpp +++ b/src/Messenger/MessageInStream.cpp @@ -18,7 +18,7 @@ #include #include - +#include namespace f1x { namespace aasdk @@ -63,19 +63,28 @@ void MessageInStream::startReceive(ReceivePromise::Pointer promise) void MessageInStream::receiveFrameHeaderHandler(const common::DataConstBuffer& buffer) { FrameHeader frameHeader(buffer); - - if(message_ == nullptr) + if(message_ != nullptr && message_->getChannelId() != frameHeader.getChannelId()) { + // we have interleaved channels, stop working on the old one and store it for later; Switch to the new one + channel_assembly_buffers[message_->getChannelId()] = message_; + message_ = nullptr; + // message_.reset(); + // promise_->reject(error::Error(error::ErrorCode::MESSENGER_INTERTWINED_CHANNELS)); + // promise_.reset(); + // return; + } + auto prevBuffer = channel_assembly_buffers.find(frameHeader.getChannelId()); + if(prevBuffer != channel_assembly_buffers.end()){ // is there previous data in our map? + if(frameHeader.getType()!=FrameType::FIRST) //only use the data if we're not on a new frame, otherwise disregard + message_ = prevBuffer->second; + else{ + message_ = std::make_shared(frameHeader.getChannelId(), frameHeader.getEncryptionType(), frameHeader.getMessageType()); + } + channel_assembly_buffers.erase(prevBuffer); // get rid of the previously stored data because it's now our working data. + } + else if(message_ == nullptr){ message_ = std::make_shared(frameHeader.getChannelId(), frameHeader.getEncryptionType(), frameHeader.getMessageType()); } - else if(message_->getChannelId() != frameHeader.getChannelId()) - { - message_.reset(); - promise_->reject(error::Error(error::ErrorCode::MESSENGER_INTERTWINED_CHANNELS)); - promise_.reset(); - return; - } - recentFrameType_ = frameHeader.getType(); const size_t frameSize = FrameSize::getSizeOf(frameHeader.getType() == FrameType::FIRST ? FrameSizeType::EXTENDED : FrameSizeType::SHORT);