diff --git a/include/aasdk/Messenger/IMessageInStream.hpp b/include/aasdk/Messenger/IMessageInStream.hpp index 0d9e798..b64e4b0 100644 --- a/include/aasdk/Messenger/IMessageInStream.hpp +++ b/include/aasdk/Messenger/IMessageInStream.hpp @@ -35,7 +35,8 @@ public: IMessageInStream() = default; virtual ~IMessageInStream() = default; - virtual void startReceive(ReceivePromise::Pointer promise) = 0; + virtual void startReceive(ReceivePromise::Pointer promise, ChannelId channelId, int promiseIndex, int messageIndex) = 0; + virtual void setInterleavedHandler(ReceivePromise::Pointer promise) = 0; }; } diff --git a/include/aasdk/Messenger/MessageInStream.hpp b/include/aasdk/Messenger/MessageInStream.hpp index 29e9c44..ad44a15 100644 --- a/include/aasdk/Messenger/MessageInStream.hpp +++ b/include/aasdk/Messenger/MessageInStream.hpp @@ -37,7 +37,8 @@ class MessageInStream: public IMessageInStream, public std::enable_shared_from_t public: MessageInStream(boost::asio::io_service& ioService, transport::ITransport::Pointer transport, ICryptor::Pointer cryptor); - void startReceive(ReceivePromise::Pointer promise) override; + void startReceive(ReceivePromise::Pointer promise, ChannelId channelId, int promiseIndex, int messageIndex) override; + void setInterleavedHandler(ReceivePromise::Pointer promise) override; private: using std::enable_shared_from_this::shared_from_this; @@ -49,10 +50,18 @@ private: boost::asio::io_service::strand strand_; transport::ITransport::Pointer transport_; ICryptor::Pointer cryptor_; - FrameType recentFrameType_; + + FrameType thisFrameType_; ReceivePromise::Pointer promise_; + ReceivePromise::Pointer interleavedPromise_; Message::Pointer message_; + std::map messageBuffer_; + + int frameSize_; + bool isInterleaved_; + bool isValidFrame_; + int currentMessageIndex_; }; } diff --git a/include/aasdk/Messenger/Messenger.hpp b/include/aasdk/Messenger/Messenger.hpp index 6956320..c1f4c3c 100644 --- a/include/aasdk/Messenger/Messenger.hpp +++ b/include/aasdk/Messenger/Messenger.hpp @@ -47,7 +47,9 @@ private: void inStreamMessageHandler(Message::Pointer message); void outStreamMessageHandler(ChannelSendQueue::iterator queueElement); void rejectReceivePromiseQueue(const error::Error& e); + void interleavedMessageHandler(Message::Pointer message); void rejectSendPromiseQueue(const error::Error& e); + void rejectInterleavedMessageHandler(const error::Error& e); boost::asio::io_service::strand receiveStrand_; boost::asio::io_service::strand sendStrand_; @@ -57,6 +59,9 @@ private: ChannelReceivePromiseQueue channelReceivePromiseQueue_; ChannelReceiveMessageQueue channelReceiveMessageQueue_; ChannelSendQueue channelSendPromiseQueue_; + + int currentPromiseIndex_; + int currentMessageIndex_; }; } diff --git a/src/Messenger/MessageInStream.cpp b/src/Messenger/MessageInStream.cpp index c39e0a6..46838dd 100644 --- a/src/Messenger/MessageInStream.cpp +++ b/src/Messenger/MessageInStream.cpp @@ -18,7 +18,8 @@ #include #include - +#include +#include namespace aasdk { @@ -30,65 +31,87 @@ MessageInStream::MessageInStream(boost::asio::io_service& ioService, transport:: , transport_(std::move(transport)) , cryptor_(std::move(cryptor)) { - + currentMessageIndex_ = 0; } -void MessageInStream::startReceive(ReceivePromise::Pointer promise) +void MessageInStream::startReceive(ReceivePromise::Pointer promise, ChannelId channelId, int promiseIndex, int messageIndex) { - strand_.dispatch([this, self = this->shared_from_this(), promise = std::move(promise)]() mutable { - if(promise_ == nullptr) - { - promise_ = std::move(promise); + AASDK_LOG(debug) << "[MessageInStream] 1. Start Receive called with channel " << channelIdToString(channelId) << " PI " << std::to_string(promiseIndex) << " MI " << std::to_string(messageIndex); + strand_.dispatch([this, self = this->shared_from_this(), promise = std::move(promise)]() mutable { + if (promise_ == nullptr) { + promise_ = std::move(promise); auto transportPromise = transport::ITransport::ReceivePromise::defer(strand_); transportPromise->then( - [this, self = this->shared_from_this()](common::Data data) mutable { - this->receiveFrameHeaderHandler(common::DataConstBuffer(data)); - }, - [this, self = this->shared_from_this()](const error::Error& e) mutable { - promise_->reject(e); - promise_.reset(); - }); + [this, self = this->shared_from_this()](common::Data data) mutable { + this->receiveFrameHeaderHandler(common::DataConstBuffer(data)); + }, + [this, self = this->shared_from_this()](const error::Error &e) mutable { + AASDK_LOG(debug) << "[MessageInStream] 2. Error Here?"; + promise_->reject(e); + promise_.reset(); + }); transport_->receive(FrameHeader::getSizeOf(), std::move(transportPromise)); - } - else - { + } else { + AASDK_LOG(debug) << "[MessageInStream] 3. Operation in Progress."; promise->reject(error::Error(error::ErrorCode::OPERATION_IN_PROGRESS)); } }); } +void MessageInStream::setInterleavedHandler(ReceivePromise::Pointer promise) +{ + interleavedPromise_ = std::move(promise); +} + void MessageInStream::receiveFrameHeaderHandler(const common::DataConstBuffer& buffer) { FrameHeader frameHeader(buffer); - if(message_ != nullptr && message_->getChannelId() != frameHeader.getChannelId()) - { + AASDK_LOG(debug) << "[MessageInStream] 5. Processing Frame Header: Ch " << channelIdToString(frameHeader.getChannelId()) << " Fr " << frameTypeToString(frameHeader.getType()); + + isValidFrame_ = true; + isInterleaved_ = false; + + // New Promise or Interleaved + if(message_ != nullptr && message_->getChannelId() != frameHeader.getChannelId()) { + // We have an existing message but the channels don't match... + AASDK_LOG(debug) << "[MessageInStream] 6. Interleaved ChannelId MisMatch - F: " << channelIdToString(frameHeader.getChannelId()) << " M: " << channelIdToString(message_->getChannelId()); + + isInterleaved_ = true; + + // Store message in buffer; messageBuffer_[message_->getChannelId()] = message_; - message_ = nullptr; + message_.reset(); } - auto bufferedMessage = messageBuffer_.find(frameHeader.getChannelId()); + // Look for Buffered Message + if ((message_ == nullptr) && (frameHeader.getType() == FrameType::MIDDLE || frameHeader.getType() == FrameType::LAST)) { + AASDK_LOG(debug) << "[MessageInStream] 7. Null Message but Middle or Last Frame."; + auto bufferedMessage = messageBuffer_.find(frameHeader.getChannelId()); + if (bufferedMessage != messageBuffer_.end()) { + AASDK_LOG(debug) << "[MessageInStream] 8. Found Existing Message on Channel."; - if(bufferedMessage != messageBuffer_.end()) - { - if(frameHeader.getType() != FrameType::FIRST) - { message_ = bufferedMessage->second; + messageBuffer_.erase(bufferedMessage); + + isInterleaved_ = false; } - else - { - message_ = std::make_shared(frameHeader.getChannelId(), frameHeader.getEncryptionType(), frameHeader.getMessageType()); - } - messageBuffer_.erase(bufferedMessage); } - else if(message_ == nullptr) - { + + if (message_ == nullptr) { + if (frameHeader.getType() == FrameType::FIRST || frameHeader.getType() == FrameType::BULK) { + AASDK_LOG(debug) << "[MessageInStream] 11. New message created with Index " << std::to_string(currentMessageIndex_); + currentMessageIndex_++; + } else { + // This will be an invalid message, but we still need to read from the buffer. + isValidFrame_ = false; + } message_ = std::make_shared(frameHeader.getChannelId(), frameHeader.getEncryptionType(), frameHeader.getMessageType()); } - recentFrameType_ = frameHeader.getType(); + thisFrameType_ = frameHeader.getType(); const size_t frameSize = FrameSize::getSizeOf(frameHeader.getType() == FrameType::FIRST ? FrameSizeType::EXTENDED : FrameSizeType::SHORT); auto transportPromise = transport::ITransport::ReceivePromise::defer(strand_); @@ -144,23 +167,36 @@ void MessageInStream::receiveFramePayloadHandler(const common::DataConstBuffer& message_->insertPayload(buffer); } - if(recentFrameType_ == FrameType::BULK || recentFrameType_ == FrameType::LAST) + bool isResolved = false; + + // If this is the LAST frame or a BULK frame... + if((thisFrameType_ == FrameType::BULK || thisFrameType_ == FrameType::LAST) && isValidFrame_) { - promise_->resolve(std::move(message_)); - promise_.reset(); + if (!isInterleaved_) { + AASDK_LOG(debug) << "[MessageInStream] 12. Resolving Normal message. " << std::to_string(currentMessageIndex_); + promise_->resolve(std::move(message_)); + promise_.reset(); + isResolved = true; + } else { + AASDK_LOG(debug) << "[MessageInStream] 13. Resolving Interleaved Message. " << std::to_string(currentMessageIndex_); + interleavedPromise_->resolve(std::move(message_)); + } + currentMessageIndex_--; + message_.reset(); } - else - { + + // If the main promise isn't resolved, then carry on retrieving frame headers. + if (!isResolved) { auto transportPromise = transport::ITransport::ReceivePromise::defer(strand_); transportPromise->then( - [this, self = this->shared_from_this()](common::Data data) mutable { - this->receiveFrameHeaderHandler(common::DataConstBuffer(data)); - }, - [this, self = this->shared_from_this()](const error::Error& e) mutable { - message_.reset(); - promise_->reject(e); - promise_.reset(); - }); + [this, self = this->shared_from_this()](common::Data data) mutable { + this->receiveFrameHeaderHandler(common::DataConstBuffer(data)); + }, + [this, self = this->shared_from_this()](const error::Error& e) mutable { + message_.reset(); + promise_->reject(e); + promise_.reset(); + }); transport_->receive(FrameHeader::getSizeOf(), std::move(transportPromise)); } diff --git a/src/Messenger/Messenger.cpp b/src/Messenger/Messenger.cpp index 0c8d7cb..2dfd515 100644 --- a/src/Messenger/Messenger.cpp +++ b/src/Messenger/Messenger.cpp @@ -19,7 +19,7 @@ #include #include #include - +#include namespace aasdk { @@ -32,26 +32,43 @@ Messenger::Messenger(boost::asio::io_service& ioService, IMessageInStream::Point , messageInStream_(std::move(messageInStream)) , messageOutStream_(std::move(messageOutStream)) { - + currentPromiseIndex_ = 0; + currentMessageIndex_ = 0; } void Messenger::enqueueReceive(ChannelId channelId, ReceivePromise::Pointer promise) { + // enqueueReceive is called from the service channel. + AASDK_LOG(debug) << "[Messenger] 1. enqueueReceived called on Channel " << channelIdToString(channelId); + receiveStrand_.dispatch([this, self = this->shared_from_this(), channelId, promise = std::move(promise)]() mutable { + auto interleavedPromise = ReceivePromise::defer(receiveStrand_); + interleavedPromise->then(std::bind(&Messenger::interleavedMessageHandler, this->shared_from_this(), std::placeholders::_1), + std::bind(&Messenger::rejectInterleavedMessageHandler, this->shared_from_this(), std::placeholders::_1)); + messageInStream_->setInterleavedHandler(std::move(interleavedPromise)); + + //If there's any messages on the channel, resolve. The channel will call enqueueReceive again. if(!channelReceiveMessageQueue_.empty(channelId)) { promise->resolve(std::move(channelReceiveMessageQueue_.pop(channelId))); + AASDK_LOG(debug) << "[Messenger] 2. Message Queue not Empty. Resolving promise with Existing Message."; } else { + currentPromiseIndex_++; + AASDK_LOG(debug) << "[Messenger] 3. Message Queue is Empty. Pushing Promise on Index " << std::to_string(currentPromiseIndex_); channelReceivePromiseQueue_.push(channelId, std::move(promise)); if(channelReceivePromiseQueue_.size() == 1) { + currentMessageIndex_++; + + AASDK_LOG(debug) << "[Messenger] 4. Calling startReceive on MessageIndex " << std::to_string(currentMessageIndex_); + auto inStreamPromise = ReceivePromise::defer(receiveStrand_); inStreamPromise->then(std::bind(&Messenger::inStreamMessageHandler, this->shared_from_this(), std::placeholders::_1), std::bind(&Messenger::rejectReceivePromiseQueue, this->shared_from_this(), std::placeholders::_1)); - messageInStream_->startReceive(std::move(inStreamPromise)); + messageInStream_->startReceive(std::move(inStreamPromise), channelId, currentPromiseIndex_, currentMessageIndex_); } } }); @@ -72,22 +89,32 @@ void Messenger::enqueueSend(Message::Pointer message, SendPromise::Pointer promi void Messenger::inStreamMessageHandler(Message::Pointer message) { auto channelId = message->getChannelId(); + AASDK_LOG(debug) << "[Messenger] 5. inStreamMessageHandler from Channel " << channelIdToString(channelId); + currentMessageIndex_--; + + // If there's a promise on the queue, we resolve the promise with this message.... if(channelReceivePromiseQueue_.isPending(channelId)) { channelReceivePromiseQueue_.pop(channelId)->resolve(std::move(message)); + currentPromiseIndex_--; } else { + // Or we push the message to the Message Queue for when we do get a promise + AASDK_LOG(debug) << "[Messenger] 7. Pushing Message to Queue as we have no Pending Promises." << std::to_string(currentPromiseIndex_); channelReceiveMessageQueue_.push(std::move(message)); } if(!channelReceivePromiseQueue_.empty()) { + currentMessageIndex_++; + AASDK_LOG(debug) << "[Messenger] 8. Calling startReceive on PromiseIndex " << std::to_string(currentPromiseIndex_) << " and MessageIndex " << std::to_string(currentMessageIndex_); + auto inStreamPromise = ReceivePromise::defer(receiveStrand_); inStreamPromise->then(std::bind(&Messenger::inStreamMessageHandler, this->shared_from_this(), std::placeholders::_1), std::bind(&Messenger::rejectReceivePromiseQueue, this->shared_from_this(), std::placeholders::_1)); - messageInStream_->startReceive(std::move(inStreamPromise)); + messageInStream_->startReceive(std::move(inStreamPromise), channelId, currentPromiseIndex_,currentMessageIndex_); } } @@ -101,6 +128,20 @@ void Messenger::doSend() messageOutStream_->stream(std::move(queueElementIter->first), std::move(outStreamPromise)); } +void Messenger::interleavedMessageHandler(Message::Pointer message) +{ + auto channelId = message->getChannelId(); + + AASDK_LOG(debug) << "[Messenger] 9. interleavedMessageHandler from Channel " << channelIdToString(channelId); + + channelReceiveMessageQueue_.push(std::move(message)); + + auto interleavedPromise = ReceivePromise::defer(receiveStrand_); + interleavedPromise->then(std::bind(&Messenger::interleavedMessageHandler, this->shared_from_this(), std::placeholders::_1), + std::bind(&Messenger::rejectInterleavedMessageHandler, this->shared_from_this(), std::placeholders::_1)); + messageInStream_->setInterleavedHandler(std::move(interleavedPromise)); +} + void Messenger::outStreamMessageHandler(ChannelSendQueue::iterator queueElement) { queueElement->second->resolve(); @@ -117,9 +158,15 @@ void Messenger::rejectReceivePromiseQueue(const error::Error& e) while(!channelReceivePromiseQueue_.empty()) { channelReceivePromiseQueue_.pop()->reject(e); + currentPromiseIndex_--; } } +void Messenger::rejectInterleavedMessageHandler(const error::Error& e) +{ + // Dummy +} + void Messenger::rejectSendPromiseQueue(const error::Error& e) { while(!channelSendPromiseQueue_.empty()) @@ -132,6 +179,7 @@ void Messenger::rejectSendPromiseQueue(const error::Error& e) void Messenger::stop() { + currentPromiseIndex_ = 0; receiveStrand_.dispatch([this, self = this->shared_from_this()]() { channelReceiveMessageQueue_.clear(); });