diff --git a/include/f1x/aasdk/Messenger/IMessageInStream.hpp b/include/f1x/aasdk/Messenger/IMessageInStream.hpp index dc49c05..4d08f25 100644 --- a/include/f1x/aasdk/Messenger/IMessageInStream.hpp +++ b/include/f1x/aasdk/Messenger/IMessageInStream.hpp @@ -37,6 +37,7 @@ public: virtual ~IMessageInStream() = default; virtual void startReceive(ReceivePromise::Pointer promise) = 0; + virtual void setInterleavedHandler(ReceivePromise::Pointer promise) = 0; }; } diff --git a/include/f1x/aasdk/Messenger/MessageInStream.hpp b/include/f1x/aasdk/Messenger/MessageInStream.hpp index a906efe..76a2ad0 100644 --- a/include/f1x/aasdk/Messenger/MessageInStream.hpp +++ b/include/f1x/aasdk/Messenger/MessageInStream.hpp @@ -37,6 +37,7 @@ public: MessageInStream(boost::asio::io_service& ioService, transport::ITransport::Pointer transport, ICryptor::Pointer cryptor); void startReceive(ReceivePromise::Pointer promise) override; + void setInterleavedHandler(ReceivePromise::Pointer promise) override; private: using std::enable_shared_from_this::shared_from_this; @@ -48,13 +49,21 @@ private: boost::asio::io_service::strand strand_; transport::ITransport::Pointer transport_; ICryptor::Pointer cryptor_; - FrameType recentFrameType_; + FrameType thisFrameType_; ReceivePromise::Pointer promise_; + ReceivePromise::Pointer interleavedPromise_; Message::Pointer message_; int frameSize_; - std::map channel_assembly_buffers; + ChannelId currentChannelId_; + ChannelId originalMessageChannelId_; + std::map messageBuffer_; + + + bool isInterleaved_; + bool haveOriginalChannel_; + bool isNewMessage_; }; } diff --git a/include/f1x/aasdk/Messenger/Messenger.hpp b/include/f1x/aasdk/Messenger/Messenger.hpp index 0dce8ad..49c5ca6 100644 --- a/include/f1x/aasdk/Messenger/Messenger.hpp +++ b/include/f1x/aasdk/Messenger/Messenger.hpp @@ -46,9 +46,11 @@ private: typedef std::list> ChannelSendQueue; void doSend(); void inStreamMessageHandler(Message::Pointer message); + void randomInStreamMessageHandler(Message::Pointer message); void outStreamMessageHandler(ChannelSendQueue::iterator queueElement); void rejectReceivePromiseQueue(const error::Error& e); void rejectSendPromiseQueue(const error::Error& e); + void randomRejectReceivePromiseQueue(const error::Error& e); void parseMessage(Message::Pointer message, ReceivePromise::Pointer promise); boost::asio::io_service::strand receiveStrand_; diff --git a/src/Messenger/MessageInStream.cpp b/src/Messenger/MessageInStream.cpp index d1a50a2..0ee6ba0 100644 --- a/src/Messenger/MessageInStream.cpp +++ b/src/Messenger/MessageInStream.cpp @@ -33,7 +33,7 @@ MessageInStream::MessageInStream(boost::asio::io_service& ioService, transport:: , transport_(std::move(transport)) , cryptor_(std::move(cryptor)) { - + isNewMessage_ = true; } void MessageInStream::startReceive(ReceivePromise::Pointer promise) @@ -42,7 +42,7 @@ void MessageInStream::startReceive(ReceivePromise::Pointer promise) if(promise_ == nullptr) { promise_ = std::move(promise); - + isNewMessage_ = true; auto transportPromise = transport::ITransport::ReceivePromise::defer(strand_); transportPromise->then( [this, self = this->shared_from_this()](common::Data data) mutable { @@ -62,36 +62,68 @@ void MessageInStream::startReceive(ReceivePromise::Pointer promise) }); } +void MessageInStream::setInterleavedHandler(ReceivePromise::Pointer promise) +{ + interleavedPromise_ = std::move(promise); +} + void MessageInStream::receiveFrameHeaderHandler(const common::DataConstBuffer& buffer) { FrameHeader frameHeader(buffer); - if (buffer.cdata[0] != 3) { - AASDK_LOG(debug) << "Message from channel " << std::to_string(buffer.cdata[0]); + + isInterleaved_ = false; + + // Store the ChannelId if this is a new message. + if (isNewMessage_) { + originalMessageChannelId_ = frameHeader.getChannelId(); + isNewMessage_ = false; } + // If Frame Channel does not match Message Channel, store Existing Message in Buffer. if(message_ != nullptr && message_->getChannelId() != frameHeader.getChannelId()) { - // we have interleaved channels, stop working on the old one and store it for later; Switch to the new one - channel_assembly_buffers[message_->getChannelId()] = message_; + AASDK_LOG(debug) << "[MessageInStream] ChannelId mismatch -- Frame " << channelIdToString(frameHeader_.getChannelId()) << " -- Message -- " << channelIdToString(message_.getChannelId()); + isInterleaved_ = true; + + messageBuffer_[message_->getChannelId()] = message_; message_ = nullptr; - // message_.reset(); - // promise_->reject(error::Error(error::ErrorCode::MESSENGER_INTERTWINED_CHANNELS)); - // promise_.reset(); - // return; } - auto prevBuffer = channel_assembly_buffers.find(frameHeader.getChannelId()); - if(prevBuffer != channel_assembly_buffers.end()){ // is there previous data in our map? - if(frameHeader.getType()!=FrameType::FIRST) //only use the data if we're not on a new frame, otherwise disregard - message_ = prevBuffer->second; - else{ - message_ = std::make_shared(frameHeader.getChannelId(), frameHeader.getEncryptionType(), frameHeader.getMessageType()); + + if (frameHeader.getType() == FrameType::FIRST || frameHeader.getType() == FrameType::BULK) { + // If it's a First or Bulk Frame, we need to start a new message. + message_ = std::make_shared(frameHeader.getChannelId(), frameHeader.getEncryptionType(), frameHeader.getMessageType()); + } else { + // This is a Middle or Last Frame. We must find an existing message. + auto bufferedMessage = messageBuffer_.find(frameHeader.getChannelId()); + if (bufferedMessage != messageBuffer_.end()) { + /* + * If the original channel does not match, then this is an interleaved frame. + * It is no good just to match the channelid on the message we recovered from the queue, we need + * to go back to the original message channel as even this frame may be ANOTHER interleaved + * message frame within an existing interleaved message. + * Our promise must resolve only the channel id we've been tasked to work on. + * Everything else is incidental. + */ + + // We can restore the original message, and and if the current frame matches the chnnale id + // then we're not interleaved anymore. + if (originalMessageChannelId_ == frameHeader.getChannelId()) { + isInterleaved_ = false; + AASDK_LOG(debug) << "[MessageInStream] Restored Message from Buffer"; + } + + message_ = bufferedMessage->second; + messageBuffer_.erase(bufferedMessage); } - channel_assembly_buffers.erase(prevBuffer); // get rid of the previously stored data because it's now our working data. } - else if(message_ == nullptr){ + + // If we have nothing at this point, start a new message. + if(message_ == nullptr) + { message_ = std::make_shared(frameHeader.getChannelId(), frameHeader.getEncryptionType(), frameHeader.getMessageType()); } - recentFrameType_ = frameHeader.getType(); + + thisFrameType_ = frameHeader.getType(); const size_t frameSize = FrameSize::getSizeOf(frameHeader.getType() == FrameType::FIRST ? FrameSizeType::EXTENDED : FrameSizeType::SHORT); auto transportPromise = transport::ITransport::ReceivePromise::defer(strand_); @@ -147,23 +179,34 @@ void MessageInStream::receiveFramePayloadHandler(const common::DataConstBuffer& message_->insertPayload(buffer); } - if(recentFrameType_ == FrameType::BULK || recentFrameType_ == FrameType::LAST) + bool isResolved = false; + + // If this is the LAST frame or a BULK frame... + if(thisFrameType_ == FrameType::BULK || thisFrameType_ == FrameType::LAST) { - promise_->resolve(std::move(message_)); - promise_.reset(); + // If this isn't an interleaved frame, then we can resolve the promise + if (!isInterleaved_) { + isResolved = true; + promise_->resolve(std::move(message_)); + promise_.reset(); + } else { + // Otherwise resolve through our random promise + interleavedPromise_->resolve(std::move(message_)); + } } - else - { + + // If the main promise isn't resolved, then carry on retrieving frame headers. + if (!isResolved) { auto transportPromise = transport::ITransport::ReceivePromise::defer(strand_); transportPromise->then( - [this, self = this->shared_from_this()](common::Data data) mutable { - this->receiveFrameHeaderHandler(common::DataConstBuffer(data)); - }, - [this, self = this->shared_from_this()](const error::Error& e) mutable { - message_.reset(); - promise_->reject(e); - promise_.reset(); - }); + [this, self = this->shared_from_this()](common::Data data) mutable { + this->receiveFrameHeaderHandler(common::DataConstBuffer(data)); + }, + [this, self = this->shared_from_this()](const error::Error& e) mutable { + message_.reset(); + promise_->reject(e); + promise_.reset(); + }); transport_->receive(FrameHeader::getSizeOf(), std::move(transportPromise)); } diff --git a/src/Messenger/Messenger.cpp b/src/Messenger/Messenger.cpp index 894ad7b..70c7af6 100644 --- a/src/Messenger/Messenger.cpp +++ b/src/Messenger/Messenger.cpp @@ -42,7 +42,10 @@ void Messenger::enqueueReceive(ChannelId channelId, ReceivePromise::Pointer prom receiveStrand_.dispatch([this, self = this->shared_from_this(), channelId, promise = std::move(promise)]() mutable { if(!channelReceiveMessageQueue_.empty(channelId)) { - this->parseMessage(channelReceiveMessageQueue_.pop(channelId), promise); + //TODO: Use this to check on the Frame/Channel Id? + //this->parseMessage(channelReceiveMessageQueue_.pop(channelId), promise); + promise->resolve(std::move(channelReceiveMessageQueue_.pop(channelId))); + //TODO: Problem with resolving like this, is that if we resolve an interleave frame, our resolve goes to the wrong channel - eg Audio on VideoServiceChannel } else { @@ -54,6 +57,11 @@ void Messenger::enqueueReceive(ChannelId channelId, ReceivePromise::Pointer prom inStreamPromise->then(std::bind(&Messenger::inStreamMessageHandler, this->shared_from_this(), std::placeholders::_1), std::bind(&Messenger::rejectReceivePromiseQueue, this->shared_from_this(), std::placeholders::_1)); messageInStream_->startReceive(std::move(inStreamPromise)); + + auto randomInStreamPromise = ReceivePromise::defer(receiveStrand_); + randomInStreamPromise->then(std::bind(&Messenger::randomInStreamMessageHandler, this->shared_from_this(), std::placeholders::_1), + std::bind(&Messenger::randomRejectReceivePromiseQueue, this->shared_from_this(), std::placeholders::_1)); + messageInStream_->setInterleavedHandler(std::move(randomInStreamPromise)); } } }); @@ -80,7 +88,8 @@ void Messenger::inStreamMessageHandler(Message::Pointer message) if(channelReceivePromiseQueue_.isPending(channelId)) { - this->parseMessage(message, channelReceivePromiseQueue_.pop(channelId)); + //this->parseMessage(message, channelReceivePromiseQueue_.pop(channelId)); + channelReceivePromiseQueue_.pop(channelId)->resolve(std::move(message)); } else { @@ -96,6 +105,17 @@ void Messenger::inStreamMessageHandler(Message::Pointer message) } } +void Messenger::randomInStreamMessageHandler(Message::Pointer message) +{ + //AASDK_LOG(debug) << "Interleaved Message Pushed to Queue";; + channelReceiveMessageQueue_.push(std::move(message)); + + auto randomInStreamPromise = ReceivePromise::defer(receiveStrand_); + randomInStreamPromise->then(std::bind(&Messenger::randomInStreamMessageHandler, this->shared_from_this(), std::placeholders::_1), + std::bind(&Messenger::randomRejectReceivePromiseQueue, this->shared_from_this(), std::placeholders::_1)); + messageInStream_->setInterleavedHandler(std::move(randomInStreamPromise)); +} + void Messenger::parseMessage(Message::Pointer message, ReceivePromise::Pointer promise) { if (message->getChannelId() != ChannelId::VIDEO) { //AASDK_LOG(debug) << channelIdToString(message->getChannelId()) << " " << MessageId(message->getPayload()); @@ -132,6 +152,11 @@ void Messenger::rejectReceivePromiseQueue(const error::Error& e) } } +void Messenger::randomRejectReceivePromiseQueue(const error::Error& e) +{ + // Dummy +} + void Messenger::rejectSendPromiseQueue(const error::Error& e) { while(!channelSendPromiseQueue_.empty())