From bd181651819379632b8907f9621e0d2b5c29431e Mon Sep 17 00:00:00 2001 From: "Simon.Dean" Date: Sun, 13 Feb 2022 21:06:07 +0000 Subject: [PATCH] Reworked Message Handling - Have adjusted the buffer mechanism so instead of using the buffer to store a message in the event of an interleaved frame, we basically perform all work on the buffer (albeit we take the existing message for the channel id from the buffer for the frame we're working on and write the message to the buffer if the message isn't resolved). - Moved writing to buffer to receiveFramePayloadHandler() function if the message is BULK or LAST and therefore not resolved. - Simplified receiveFrameHeaderHandler() so that we automatically try to find a message from the buffer and then createa message as necessary depending on the frame type. - Excellent stability with no noticeable artefacts in audio after 1 hour. - Removed majority of debugging after achieving stability. --- include/aasdk/Messenger/IMessageInStream.hpp | 3 +- include/aasdk/Messenger/MessageInStream.hpp | 5 +- include/aasdk/Messenger/Messenger.hpp | 4 - src/Messenger/MessageInStream.cpp | 81 +++++++------------- src/Messenger/Messenger.cpp | 50 +----------- 5 files changed, 33 insertions(+), 110 deletions(-) diff --git a/include/aasdk/Messenger/IMessageInStream.hpp b/include/aasdk/Messenger/IMessageInStream.hpp index b64e4b0..0d9e798 100644 --- a/include/aasdk/Messenger/IMessageInStream.hpp +++ b/include/aasdk/Messenger/IMessageInStream.hpp @@ -35,8 +35,7 @@ public: IMessageInStream() = default; virtual ~IMessageInStream() = default; - virtual void startReceive(ReceivePromise::Pointer promise, ChannelId channelId, int promiseIndex, int messageIndex) = 0; - virtual void setInterleavedHandler(ReceivePromise::Pointer promise) = 0; + virtual void startReceive(ReceivePromise::Pointer promise) = 0; }; } diff --git a/include/aasdk/Messenger/MessageInStream.hpp b/include/aasdk/Messenger/MessageInStream.hpp index ad44a15..489c5d7 100644 --- a/include/aasdk/Messenger/MessageInStream.hpp +++ b/include/aasdk/Messenger/MessageInStream.hpp @@ -37,8 +37,7 @@ class MessageInStream: public IMessageInStream, public std::enable_shared_from_t public: MessageInStream(boost::asio::io_service& ioService, transport::ITransport::Pointer transport, ICryptor::Pointer cryptor); - void startReceive(ReceivePromise::Pointer promise, ChannelId channelId, int promiseIndex, int messageIndex) override; - void setInterleavedHandler(ReceivePromise::Pointer promise) override; + void startReceive(ReceivePromise::Pointer promise) override; private: using std::enable_shared_from_this::shared_from_this; @@ -59,9 +58,7 @@ private: std::map messageBuffer_; int frameSize_; - bool isInterleaved_; bool isValidFrame_; - int currentMessageIndex_; }; } diff --git a/include/aasdk/Messenger/Messenger.hpp b/include/aasdk/Messenger/Messenger.hpp index c1f4c3c..b5996cf 100644 --- a/include/aasdk/Messenger/Messenger.hpp +++ b/include/aasdk/Messenger/Messenger.hpp @@ -47,9 +47,7 @@ private: void inStreamMessageHandler(Message::Pointer message); void outStreamMessageHandler(ChannelSendQueue::iterator queueElement); void rejectReceivePromiseQueue(const error::Error& e); - void interleavedMessageHandler(Message::Pointer message); void rejectSendPromiseQueue(const error::Error& e); - void rejectInterleavedMessageHandler(const error::Error& e); boost::asio::io_service::strand receiveStrand_; boost::asio::io_service::strand sendStrand_; @@ -60,8 +58,6 @@ private: ChannelReceiveMessageQueue channelReceiveMessageQueue_; ChannelSendQueue channelSendPromiseQueue_; - int currentPromiseIndex_; - int currentMessageIndex_; }; } diff --git a/src/Messenger/MessageInStream.cpp b/src/Messenger/MessageInStream.cpp index 46838dd..f20992d 100644 --- a/src/Messenger/MessageInStream.cpp +++ b/src/Messenger/MessageInStream.cpp @@ -31,13 +31,11 @@ MessageInStream::MessageInStream(boost::asio::io_service& ioService, transport:: , transport_(std::move(transport)) , cryptor_(std::move(cryptor)) { - currentMessageIndex_ = 0; + } -void MessageInStream::startReceive(ReceivePromise::Pointer promise, ChannelId channelId, int promiseIndex, int messageIndex) +void MessageInStream::startReceive(ReceivePromise::Pointer promise) { - AASDK_LOG(debug) << "[MessageInStream] 1. Start Receive called with channel " << channelIdToString(channelId) << " PI " << std::to_string(promiseIndex) << " MI " << std::to_string(messageIndex); - strand_.dispatch([this, self = this->shared_from_this(), promise = std::move(promise)]() mutable { if (promise_ == nullptr) { promise_ = std::move(promise); @@ -47,68 +45,47 @@ void MessageInStream::startReceive(ReceivePromise::Pointer promise, ChannelId ch this->receiveFrameHeaderHandler(common::DataConstBuffer(data)); }, [this, self = this->shared_from_this()](const error::Error &e) mutable { - AASDK_LOG(debug) << "[MessageInStream] 2. Error Here?"; promise_->reject(e); promise_.reset(); }); transport_->receive(FrameHeader::getSizeOf(), std::move(transportPromise)); } else { - AASDK_LOG(debug) << "[MessageInStream] 3. Operation in Progress."; promise->reject(error::Error(error::ErrorCode::OPERATION_IN_PROGRESS)); } }); } -void MessageInStream::setInterleavedHandler(ReceivePromise::Pointer promise) -{ - interleavedPromise_ = std::move(promise); -} - void MessageInStream::receiveFrameHeaderHandler(const common::DataConstBuffer& buffer) { FrameHeader frameHeader(buffer); - AASDK_LOG(debug) << "[MessageInStream] 5. Processing Frame Header: Ch " << channelIdToString(frameHeader.getChannelId()) << " Fr " << frameTypeToString(frameHeader.getType()); + AASDK_LOG(debug) << "[MessageInStream] Processing Frame Header: Ch " << channelIdToString(frameHeader.getChannelId()) << " Fr " << frameTypeToString(frameHeader.getType()); isValidFrame_ = true; - isInterleaved_ = false; - // New Promise or Interleaved - if(message_ != nullptr && message_->getChannelId() != frameHeader.getChannelId()) { - // We have an existing message but the channels don't match... - AASDK_LOG(debug) << "[MessageInStream] 6. Interleaved ChannelId MisMatch - F: " << channelIdToString(frameHeader.getChannelId()) << " M: " << channelIdToString(message_->getChannelId()); + auto bufferedMessage = messageBuffer_.find(frameHeader.getChannelId()); + if (bufferedMessage != messageBuffer_.end()) { + // We have found a message... + message_ = std::move(bufferedMessage->second); + messageBuffer_.erase(bufferedMessage); - isInterleaved_ = true; + AASDK_LOG(debug) << "[MessageInStream] Found existing message."; - // Store message in buffer; - messageBuffer_[message_->getChannelId()] = message_; - message_.reset(); - } - - // Look for Buffered Message - if ((message_ == nullptr) && (frameHeader.getType() == FrameType::MIDDLE || frameHeader.getType() == FrameType::LAST)) { - AASDK_LOG(debug) << "[MessageInStream] 7. Null Message but Middle or Last Frame."; - auto bufferedMessage = messageBuffer_.find(frameHeader.getChannelId()); - if (bufferedMessage != messageBuffer_.end()) { - AASDK_LOG(debug) << "[MessageInStream] 8. Found Existing Message on Channel."; - - message_ = bufferedMessage->second; - messageBuffer_.erase(bufferedMessage); - - isInterleaved_ = false; - } - } - - if (message_ == nullptr) { if (frameHeader.getType() == FrameType::FIRST || frameHeader.getType() == FrameType::BULK) { - AASDK_LOG(debug) << "[MessageInStream] 11. New message created with Index " << std::to_string(currentMessageIndex_); - currentMessageIndex_++; - } else { - // This will be an invalid message, but we still need to read from the buffer. + // If it's first or bulk, we need to override the message anyhow, so we will start again. + // Need to start a new message anyhow + message_ = std::make_shared(frameHeader.getChannelId(), frameHeader.getEncryptionType(), frameHeader.getMessageType()); + } + } else { + AASDK_LOG(debug) << "[MessageInStream] Could not find existing message."; + // No Message Found in Buffers and this is a middle or last frame, this an error. + // Still need to process the frame, but we will not resolve at the end. + message_ = std::make_shared(frameHeader.getChannelId(), frameHeader.getEncryptionType(), frameHeader.getMessageType()); + if (frameHeader.getType() == FrameType::MIDDLE || frameHeader.getType() == FrameType::LAST) { + // This is an error isValidFrame_ = false; } - message_ = std::make_shared(frameHeader.getChannelId(), frameHeader.getEncryptionType(), frameHeader.getMessageType()); } thisFrameType_ = frameHeader.getType(); @@ -172,17 +149,15 @@ void MessageInStream::receiveFramePayloadHandler(const common::DataConstBuffer& // If this is the LAST frame or a BULK frame... if((thisFrameType_ == FrameType::BULK || thisFrameType_ == FrameType::LAST) && isValidFrame_) { - if (!isInterleaved_) { - AASDK_LOG(debug) << "[MessageInStream] 12. Resolving Normal message. " << std::to_string(currentMessageIndex_); - promise_->resolve(std::move(message_)); - promise_.reset(); - isResolved = true; - } else { - AASDK_LOG(debug) << "[MessageInStream] 13. Resolving Interleaved Message. " << std::to_string(currentMessageIndex_); - interleavedPromise_->resolve(std::move(message_)); - } + AASDK_LOG(debug) << "[MessageInStream] Resolving message."; + promise_->resolve(std::move(message_)); + promise_.reset(); + isResolved = true; + currentMessageIndex_--; - message_.reset(); + } else { + // First or Middle message, we'll store in our buffer... + messageBuffer_[message_->getChannelId()] = std::move(message_); } // If the main promise isn't resolved, then carry on retrieving frame headers. diff --git a/src/Messenger/Messenger.cpp b/src/Messenger/Messenger.cpp index 2dfd515..367923f 100644 --- a/src/Messenger/Messenger.cpp +++ b/src/Messenger/Messenger.cpp @@ -32,43 +32,28 @@ Messenger::Messenger(boost::asio::io_service& ioService, IMessageInStream::Point , messageInStream_(std::move(messageInStream)) , messageOutStream_(std::move(messageOutStream)) { - currentPromiseIndex_ = 0; - currentMessageIndex_ = 0; + } void Messenger::enqueueReceive(ChannelId channelId, ReceivePromise::Pointer promise) { // enqueueReceive is called from the service channel. - AASDK_LOG(debug) << "[Messenger] 1. enqueueReceived called on Channel " << channelIdToString(channelId); - receiveStrand_.dispatch([this, self = this->shared_from_this(), channelId, promise = std::move(promise)]() mutable { - auto interleavedPromise = ReceivePromise::defer(receiveStrand_); - interleavedPromise->then(std::bind(&Messenger::interleavedMessageHandler, this->shared_from_this(), std::placeholders::_1), - std::bind(&Messenger::rejectInterleavedMessageHandler, this->shared_from_this(), std::placeholders::_1)); - messageInStream_->setInterleavedHandler(std::move(interleavedPromise)); - //If there's any messages on the channel, resolve. The channel will call enqueueReceive again. if(!channelReceiveMessageQueue_.empty(channelId)) { promise->resolve(std::move(channelReceiveMessageQueue_.pop(channelId))); - AASDK_LOG(debug) << "[Messenger] 2. Message Queue not Empty. Resolving promise with Existing Message."; } else { - currentPromiseIndex_++; - AASDK_LOG(debug) << "[Messenger] 3. Message Queue is Empty. Pushing Promise on Index " << std::to_string(currentPromiseIndex_); channelReceivePromiseQueue_.push(channelId, std::move(promise)); if(channelReceivePromiseQueue_.size() == 1) { - currentMessageIndex_++; - - AASDK_LOG(debug) << "[Messenger] 4. Calling startReceive on MessageIndex " << std::to_string(currentMessageIndex_); - auto inStreamPromise = ReceivePromise::defer(receiveStrand_); inStreamPromise->then(std::bind(&Messenger::inStreamMessageHandler, this->shared_from_this(), std::placeholders::_1), std::bind(&Messenger::rejectReceivePromiseQueue, this->shared_from_this(), std::placeholders::_1)); - messageInStream_->startReceive(std::move(inStreamPromise), channelId, currentPromiseIndex_, currentMessageIndex_); + messageInStream_->startReceive(std::move(inStreamPromise)); } } }); @@ -89,32 +74,24 @@ void Messenger::enqueueSend(Message::Pointer message, SendPromise::Pointer promi void Messenger::inStreamMessageHandler(Message::Pointer message) { auto channelId = message->getChannelId(); - AASDK_LOG(debug) << "[Messenger] 5. inStreamMessageHandler from Channel " << channelIdToString(channelId); - - currentMessageIndex_--; // If there's a promise on the queue, we resolve the promise with this message.... if(channelReceivePromiseQueue_.isPending(channelId)) { channelReceivePromiseQueue_.pop(channelId)->resolve(std::move(message)); - currentPromiseIndex_--; } else { // Or we push the message to the Message Queue for when we do get a promise - AASDK_LOG(debug) << "[Messenger] 7. Pushing Message to Queue as we have no Pending Promises." << std::to_string(currentPromiseIndex_); channelReceiveMessageQueue_.push(std::move(message)); } if(!channelReceivePromiseQueue_.empty()) { - currentMessageIndex_++; - AASDK_LOG(debug) << "[Messenger] 8. Calling startReceive on PromiseIndex " << std::to_string(currentPromiseIndex_) << " and MessageIndex " << std::to_string(currentMessageIndex_); - auto inStreamPromise = ReceivePromise::defer(receiveStrand_); inStreamPromise->then(std::bind(&Messenger::inStreamMessageHandler, this->shared_from_this(), std::placeholders::_1), std::bind(&Messenger::rejectReceivePromiseQueue, this->shared_from_this(), std::placeholders::_1)); - messageInStream_->startReceive(std::move(inStreamPromise), channelId, currentPromiseIndex_,currentMessageIndex_); + messageInStream_->startReceive(std::move(inStreamPromise)); } } @@ -128,20 +105,6 @@ void Messenger::doSend() messageOutStream_->stream(std::move(queueElementIter->first), std::move(outStreamPromise)); } -void Messenger::interleavedMessageHandler(Message::Pointer message) -{ - auto channelId = message->getChannelId(); - - AASDK_LOG(debug) << "[Messenger] 9. interleavedMessageHandler from Channel " << channelIdToString(channelId); - - channelReceiveMessageQueue_.push(std::move(message)); - - auto interleavedPromise = ReceivePromise::defer(receiveStrand_); - interleavedPromise->then(std::bind(&Messenger::interleavedMessageHandler, this->shared_from_this(), std::placeholders::_1), - std::bind(&Messenger::rejectInterleavedMessageHandler, this->shared_from_this(), std::placeholders::_1)); - messageInStream_->setInterleavedHandler(std::move(interleavedPromise)); -} - void Messenger::outStreamMessageHandler(ChannelSendQueue::iterator queueElement) { queueElement->second->resolve(); @@ -158,15 +121,9 @@ void Messenger::rejectReceivePromiseQueue(const error::Error& e) while(!channelReceivePromiseQueue_.empty()) { channelReceivePromiseQueue_.pop()->reject(e); - currentPromiseIndex_--; } } -void Messenger::rejectInterleavedMessageHandler(const error::Error& e) -{ - // Dummy -} - void Messenger::rejectSendPromiseQueue(const error::Error& e) { while(!channelSendPromiseQueue_.empty()) @@ -179,7 +136,6 @@ void Messenger::rejectSendPromiseQueue(const error::Error& e) void Messenger::stop() { - currentPromiseIndex_ = 0; receiveStrand_.dispatch([this, self = this->shared_from_this()]() { channelReceiveMessageQueue_.clear(); });