From db60c041b08459b29b406276f6828d75831a62cb Mon Sep 17 00:00:00 2001 From: "Simon.Dean" Date: Mon, 31 Jan 2022 14:09:47 +0000 Subject: [PATCH] Rework to message handling. Promises are being used as Observables and when we hit an interleaved frame, we're resolving a promise on a message for a different channel. Although previous way worked, it was somewhat wrong. We have tried to ensure here we resolve the original message only and anything else we send back via an interleaved Frame Handler. Ideally we should then work to ensure we get the interleaved message to the appropriate channel. We'll have a look to setup an interleaved messaged handler that we can send messages up to the chain, or possibly better yet, turn the Messenger/MessageInStream to be responsible for sending to the appropriate service through observables. --- .../f1x/aasdk/Messenger/IMessageInStream.hpp | 1 + .../f1x/aasdk/Messenger/MessageInStream.hpp | 13 ++- include/f1x/aasdk/Messenger/Messenger.hpp | 2 + src/Messenger/MessageInStream.cpp | 107 ++++++++++++------ src/Messenger/Messenger.cpp | 29 ++++- 5 files changed, 116 insertions(+), 36 deletions(-) diff --git a/include/f1x/aasdk/Messenger/IMessageInStream.hpp b/include/f1x/aasdk/Messenger/IMessageInStream.hpp index dc49c05..4d08f25 100644 --- a/include/f1x/aasdk/Messenger/IMessageInStream.hpp +++ b/include/f1x/aasdk/Messenger/IMessageInStream.hpp @@ -37,6 +37,7 @@ public: virtual ~IMessageInStream() = default; virtual void startReceive(ReceivePromise::Pointer promise) = 0; + virtual void setInterleavedHandler(ReceivePromise::Pointer promise) = 0; }; } diff --git a/include/f1x/aasdk/Messenger/MessageInStream.hpp b/include/f1x/aasdk/Messenger/MessageInStream.hpp index a906efe..76a2ad0 100644 --- a/include/f1x/aasdk/Messenger/MessageInStream.hpp +++ b/include/f1x/aasdk/Messenger/MessageInStream.hpp @@ -37,6 +37,7 @@ public: MessageInStream(boost::asio::io_service& ioService, transport::ITransport::Pointer transport, ICryptor::Pointer cryptor); void startReceive(ReceivePromise::Pointer promise) override; + void setInterleavedHandler(ReceivePromise::Pointer promise) override; private: using std::enable_shared_from_this::shared_from_this; @@ -48,13 +49,21 @@ private: boost::asio::io_service::strand strand_; transport::ITransport::Pointer transport_; ICryptor::Pointer cryptor_; - FrameType recentFrameType_; + FrameType thisFrameType_; ReceivePromise::Pointer promise_; + ReceivePromise::Pointer interleavedPromise_; Message::Pointer message_; int frameSize_; - std::map channel_assembly_buffers; + ChannelId currentChannelId_; + ChannelId originalMessageChannelId_; + std::map messageBuffer_; + + + bool isInterleaved_; + bool haveOriginalChannel_; + bool isNewMessage_; }; } diff --git a/include/f1x/aasdk/Messenger/Messenger.hpp b/include/f1x/aasdk/Messenger/Messenger.hpp index 0dce8ad..49c5ca6 100644 --- a/include/f1x/aasdk/Messenger/Messenger.hpp +++ b/include/f1x/aasdk/Messenger/Messenger.hpp @@ -46,9 +46,11 @@ private: typedef std::list> ChannelSendQueue; void doSend(); void inStreamMessageHandler(Message::Pointer message); + void randomInStreamMessageHandler(Message::Pointer message); void outStreamMessageHandler(ChannelSendQueue::iterator queueElement); void rejectReceivePromiseQueue(const error::Error& e); void rejectSendPromiseQueue(const error::Error& e); + void randomRejectReceivePromiseQueue(const error::Error& e); void parseMessage(Message::Pointer message, ReceivePromise::Pointer promise); boost::asio::io_service::strand receiveStrand_; diff --git a/src/Messenger/MessageInStream.cpp b/src/Messenger/MessageInStream.cpp index d1a50a2..0ee6ba0 100644 --- a/src/Messenger/MessageInStream.cpp +++ b/src/Messenger/MessageInStream.cpp @@ -33,7 +33,7 @@ MessageInStream::MessageInStream(boost::asio::io_service& ioService, transport:: , transport_(std::move(transport)) , cryptor_(std::move(cryptor)) { - + isNewMessage_ = true; } void MessageInStream::startReceive(ReceivePromise::Pointer promise) @@ -42,7 +42,7 @@ void MessageInStream::startReceive(ReceivePromise::Pointer promise) if(promise_ == nullptr) { promise_ = std::move(promise); - + isNewMessage_ = true; auto transportPromise = transport::ITransport::ReceivePromise::defer(strand_); transportPromise->then( [this, self = this->shared_from_this()](common::Data data) mutable { @@ -62,36 +62,68 @@ void MessageInStream::startReceive(ReceivePromise::Pointer promise) }); } +void MessageInStream::setInterleavedHandler(ReceivePromise::Pointer promise) +{ + interleavedPromise_ = std::move(promise); +} + void MessageInStream::receiveFrameHeaderHandler(const common::DataConstBuffer& buffer) { FrameHeader frameHeader(buffer); - if (buffer.cdata[0] != 3) { - AASDK_LOG(debug) << "Message from channel " << std::to_string(buffer.cdata[0]); + + isInterleaved_ = false; + + // Store the ChannelId if this is a new message. + if (isNewMessage_) { + originalMessageChannelId_ = frameHeader.getChannelId(); + isNewMessage_ = false; } + // If Frame Channel does not match Message Channel, store Existing Message in Buffer. if(message_ != nullptr && message_->getChannelId() != frameHeader.getChannelId()) { - // we have interleaved channels, stop working on the old one and store it for later; Switch to the new one - channel_assembly_buffers[message_->getChannelId()] = message_; + AASDK_LOG(debug) << "[MessageInStream] ChannelId mismatch -- Frame " << channelIdToString(frameHeader_.getChannelId()) << " -- Message -- " << channelIdToString(message_.getChannelId()); + isInterleaved_ = true; + + messageBuffer_[message_->getChannelId()] = message_; message_ = nullptr; - // message_.reset(); - // promise_->reject(error::Error(error::ErrorCode::MESSENGER_INTERTWINED_CHANNELS)); - // promise_.reset(); - // return; } - auto prevBuffer = channel_assembly_buffers.find(frameHeader.getChannelId()); - if(prevBuffer != channel_assembly_buffers.end()){ // is there previous data in our map? - if(frameHeader.getType()!=FrameType::FIRST) //only use the data if we're not on a new frame, otherwise disregard - message_ = prevBuffer->second; - else{ - message_ = std::make_shared(frameHeader.getChannelId(), frameHeader.getEncryptionType(), frameHeader.getMessageType()); + + if (frameHeader.getType() == FrameType::FIRST || frameHeader.getType() == FrameType::BULK) { + // If it's a First or Bulk Frame, we need to start a new message. + message_ = std::make_shared(frameHeader.getChannelId(), frameHeader.getEncryptionType(), frameHeader.getMessageType()); + } else { + // This is a Middle or Last Frame. We must find an existing message. + auto bufferedMessage = messageBuffer_.find(frameHeader.getChannelId()); + if (bufferedMessage != messageBuffer_.end()) { + /* + * If the original channel does not match, then this is an interleaved frame. + * It is no good just to match the channelid on the message we recovered from the queue, we need + * to go back to the original message channel as even this frame may be ANOTHER interleaved + * message frame within an existing interleaved message. + * Our promise must resolve only the channel id we've been tasked to work on. + * Everything else is incidental. + */ + + // We can restore the original message, and and if the current frame matches the chnnale id + // then we're not interleaved anymore. + if (originalMessageChannelId_ == frameHeader.getChannelId()) { + isInterleaved_ = false; + AASDK_LOG(debug) << "[MessageInStream] Restored Message from Buffer"; + } + + message_ = bufferedMessage->second; + messageBuffer_.erase(bufferedMessage); } - channel_assembly_buffers.erase(prevBuffer); // get rid of the previously stored data because it's now our working data. } - else if(message_ == nullptr){ + + // If we have nothing at this point, start a new message. + if(message_ == nullptr) + { message_ = std::make_shared(frameHeader.getChannelId(), frameHeader.getEncryptionType(), frameHeader.getMessageType()); } - recentFrameType_ = frameHeader.getType(); + + thisFrameType_ = frameHeader.getType(); const size_t frameSize = FrameSize::getSizeOf(frameHeader.getType() == FrameType::FIRST ? FrameSizeType::EXTENDED : FrameSizeType::SHORT); auto transportPromise = transport::ITransport::ReceivePromise::defer(strand_); @@ -147,23 +179,34 @@ void MessageInStream::receiveFramePayloadHandler(const common::DataConstBuffer& message_->insertPayload(buffer); } - if(recentFrameType_ == FrameType::BULK || recentFrameType_ == FrameType::LAST) + bool isResolved = false; + + // If this is the LAST frame or a BULK frame... + if(thisFrameType_ == FrameType::BULK || thisFrameType_ == FrameType::LAST) { - promise_->resolve(std::move(message_)); - promise_.reset(); + // If this isn't an interleaved frame, then we can resolve the promise + if (!isInterleaved_) { + isResolved = true; + promise_->resolve(std::move(message_)); + promise_.reset(); + } else { + // Otherwise resolve through our random promise + interleavedPromise_->resolve(std::move(message_)); + } } - else - { + + // If the main promise isn't resolved, then carry on retrieving frame headers. + if (!isResolved) { auto transportPromise = transport::ITransport::ReceivePromise::defer(strand_); transportPromise->then( - [this, self = this->shared_from_this()](common::Data data) mutable { - this->receiveFrameHeaderHandler(common::DataConstBuffer(data)); - }, - [this, self = this->shared_from_this()](const error::Error& e) mutable { - message_.reset(); - promise_->reject(e); - promise_.reset(); - }); + [this, self = this->shared_from_this()](common::Data data) mutable { + this->receiveFrameHeaderHandler(common::DataConstBuffer(data)); + }, + [this, self = this->shared_from_this()](const error::Error& e) mutable { + message_.reset(); + promise_->reject(e); + promise_.reset(); + }); transport_->receive(FrameHeader::getSizeOf(), std::move(transportPromise)); } diff --git a/src/Messenger/Messenger.cpp b/src/Messenger/Messenger.cpp index 894ad7b..70c7af6 100644 --- a/src/Messenger/Messenger.cpp +++ b/src/Messenger/Messenger.cpp @@ -42,7 +42,10 @@ void Messenger::enqueueReceive(ChannelId channelId, ReceivePromise::Pointer prom receiveStrand_.dispatch([this, self = this->shared_from_this(), channelId, promise = std::move(promise)]() mutable { if(!channelReceiveMessageQueue_.empty(channelId)) { - this->parseMessage(channelReceiveMessageQueue_.pop(channelId), promise); + //TODO: Use this to check on the Frame/Channel Id? + //this->parseMessage(channelReceiveMessageQueue_.pop(channelId), promise); + promise->resolve(std::move(channelReceiveMessageQueue_.pop(channelId))); + //TODO: Problem with resolving like this, is that if we resolve an interleave frame, our resolve goes to the wrong channel - eg Audio on VideoServiceChannel } else { @@ -54,6 +57,11 @@ void Messenger::enqueueReceive(ChannelId channelId, ReceivePromise::Pointer prom inStreamPromise->then(std::bind(&Messenger::inStreamMessageHandler, this->shared_from_this(), std::placeholders::_1), std::bind(&Messenger::rejectReceivePromiseQueue, this->shared_from_this(), std::placeholders::_1)); messageInStream_->startReceive(std::move(inStreamPromise)); + + auto randomInStreamPromise = ReceivePromise::defer(receiveStrand_); + randomInStreamPromise->then(std::bind(&Messenger::randomInStreamMessageHandler, this->shared_from_this(), std::placeholders::_1), + std::bind(&Messenger::randomRejectReceivePromiseQueue, this->shared_from_this(), std::placeholders::_1)); + messageInStream_->setInterleavedHandler(std::move(randomInStreamPromise)); } } }); @@ -80,7 +88,8 @@ void Messenger::inStreamMessageHandler(Message::Pointer message) if(channelReceivePromiseQueue_.isPending(channelId)) { - this->parseMessage(message, channelReceivePromiseQueue_.pop(channelId)); + //this->parseMessage(message, channelReceivePromiseQueue_.pop(channelId)); + channelReceivePromiseQueue_.pop(channelId)->resolve(std::move(message)); } else { @@ -96,6 +105,17 @@ void Messenger::inStreamMessageHandler(Message::Pointer message) } } +void Messenger::randomInStreamMessageHandler(Message::Pointer message) +{ + //AASDK_LOG(debug) << "Interleaved Message Pushed to Queue";; + channelReceiveMessageQueue_.push(std::move(message)); + + auto randomInStreamPromise = ReceivePromise::defer(receiveStrand_); + randomInStreamPromise->then(std::bind(&Messenger::randomInStreamMessageHandler, this->shared_from_this(), std::placeholders::_1), + std::bind(&Messenger::randomRejectReceivePromiseQueue, this->shared_from_this(), std::placeholders::_1)); + messageInStream_->setInterleavedHandler(std::move(randomInStreamPromise)); +} + void Messenger::parseMessage(Message::Pointer message, ReceivePromise::Pointer promise) { if (message->getChannelId() != ChannelId::VIDEO) { //AASDK_LOG(debug) << channelIdToString(message->getChannelId()) << " " << MessageId(message->getPayload()); @@ -132,6 +152,11 @@ void Messenger::rejectReceivePromiseQueue(const error::Error& e) } } +void Messenger::randomRejectReceivePromiseQueue(const error::Error& e) +{ + // Dummy +} + void Messenger::rejectSendPromiseQueue(const error::Error& e) { while(!channelSendPromiseQueue_.empty())