Reworked Message Handling

- Have adjusted the buffer mechanism so instead of using the buffer to store a message in the event of an interleaved frame, we basically perform all work on the buffer (albeit we take the existing message for the channel id from the buffer for the frame we're working on and write the message to the buffer if the message isn't resolved).
- Moved writing to buffer to receiveFramePayloadHandler() function if the message is BULK or LAST and therefore not resolved.
- Simplified receiveFrameHeaderHandler() so that we automatically try to find a message from the buffer and then createa message as necessary depending on the frame type.
- Excellent stability with no noticeable artefacts in audio after 1 hour.
- Removed majority of debugging after achieving stability.
pull/15/head
Simon.Dean 2022-02-13 21:06:07 +00:00
parent f80e72c480
commit bd18165181
5 changed files with 33 additions and 110 deletions

View File

@ -35,8 +35,7 @@ public:
IMessageInStream() = default;
virtual ~IMessageInStream() = default;
virtual void startReceive(ReceivePromise::Pointer promise, ChannelId channelId, int promiseIndex, int messageIndex) = 0;
virtual void setInterleavedHandler(ReceivePromise::Pointer promise) = 0;
virtual void startReceive(ReceivePromise::Pointer promise) = 0;
};
}

View File

@ -37,8 +37,7 @@ class MessageInStream: public IMessageInStream, public std::enable_shared_from_t
public:
MessageInStream(boost::asio::io_service& ioService, transport::ITransport::Pointer transport, ICryptor::Pointer cryptor);
void startReceive(ReceivePromise::Pointer promise, ChannelId channelId, int promiseIndex, int messageIndex) override;
void setInterleavedHandler(ReceivePromise::Pointer promise) override;
void startReceive(ReceivePromise::Pointer promise) override;
private:
using std::enable_shared_from_this<MessageInStream>::shared_from_this;
@ -59,9 +58,7 @@ private:
std::map<messenger::ChannelId, Message::Pointer> messageBuffer_;
int frameSize_;
bool isInterleaved_;
bool isValidFrame_;
int currentMessageIndex_;
};
}

View File

@ -47,9 +47,7 @@ private:
void inStreamMessageHandler(Message::Pointer message);
void outStreamMessageHandler(ChannelSendQueue::iterator queueElement);
void rejectReceivePromiseQueue(const error::Error& e);
void interleavedMessageHandler(Message::Pointer message);
void rejectSendPromiseQueue(const error::Error& e);
void rejectInterleavedMessageHandler(const error::Error& e);
boost::asio::io_service::strand receiveStrand_;
boost::asio::io_service::strand sendStrand_;
@ -60,8 +58,6 @@ private:
ChannelReceiveMessageQueue channelReceiveMessageQueue_;
ChannelSendQueue channelSendPromiseQueue_;
int currentPromiseIndex_;
int currentMessageIndex_;
};
}

View File

@ -31,13 +31,11 @@ MessageInStream::MessageInStream(boost::asio::io_service& ioService, transport::
, transport_(std::move(transport))
, cryptor_(std::move(cryptor))
{
currentMessageIndex_ = 0;
}
void MessageInStream::startReceive(ReceivePromise::Pointer promise, ChannelId channelId, int promiseIndex, int messageIndex)
void MessageInStream::startReceive(ReceivePromise::Pointer promise)
{
AASDK_LOG(debug) << "[MessageInStream] 1. Start Receive called with channel " << channelIdToString(channelId) << " PI " << std::to_string(promiseIndex) << " MI " << std::to_string(messageIndex);
strand_.dispatch([this, self = this->shared_from_this(), promise = std::move(promise)]() mutable {
if (promise_ == nullptr) {
promise_ = std::move(promise);
@ -47,68 +45,47 @@ void MessageInStream::startReceive(ReceivePromise::Pointer promise, ChannelId ch
this->receiveFrameHeaderHandler(common::DataConstBuffer(data));
},
[this, self = this->shared_from_this()](const error::Error &e) mutable {
AASDK_LOG(debug) << "[MessageInStream] 2. Error Here?";
promise_->reject(e);
promise_.reset();
});
transport_->receive(FrameHeader::getSizeOf(), std::move(transportPromise));
} else {
AASDK_LOG(debug) << "[MessageInStream] 3. Operation in Progress.";
promise->reject(error::Error(error::ErrorCode::OPERATION_IN_PROGRESS));
}
});
}
void MessageInStream::setInterleavedHandler(ReceivePromise::Pointer promise)
{
interleavedPromise_ = std::move(promise);
}
void MessageInStream::receiveFrameHeaderHandler(const common::DataConstBuffer& buffer)
{
FrameHeader frameHeader(buffer);
AASDK_LOG(debug) << "[MessageInStream] 5. Processing Frame Header: Ch " << channelIdToString(frameHeader.getChannelId()) << " Fr " << frameTypeToString(frameHeader.getType());
AASDK_LOG(debug) << "[MessageInStream] Processing Frame Header: Ch " << channelIdToString(frameHeader.getChannelId()) << " Fr " << frameTypeToString(frameHeader.getType());
isValidFrame_ = true;
isInterleaved_ = false;
// New Promise or Interleaved
if(message_ != nullptr && message_->getChannelId() != frameHeader.getChannelId()) {
// We have an existing message but the channels don't match...
AASDK_LOG(debug) << "[MessageInStream] 6. Interleaved ChannelId MisMatch - F: " << channelIdToString(frameHeader.getChannelId()) << " M: " << channelIdToString(message_->getChannelId());
auto bufferedMessage = messageBuffer_.find(frameHeader.getChannelId());
if (bufferedMessage != messageBuffer_.end()) {
// We have found a message...
message_ = std::move(bufferedMessage->second);
messageBuffer_.erase(bufferedMessage);
isInterleaved_ = true;
AASDK_LOG(debug) << "[MessageInStream] Found existing message.";
// Store message in buffer;
messageBuffer_[message_->getChannelId()] = message_;
message_.reset();
}
// Look for Buffered Message
if ((message_ == nullptr) && (frameHeader.getType() == FrameType::MIDDLE || frameHeader.getType() == FrameType::LAST)) {
AASDK_LOG(debug) << "[MessageInStream] 7. Null Message but Middle or Last Frame.";
auto bufferedMessage = messageBuffer_.find(frameHeader.getChannelId());
if (bufferedMessage != messageBuffer_.end()) {
AASDK_LOG(debug) << "[MessageInStream] 8. Found Existing Message on Channel.";
message_ = bufferedMessage->second;
messageBuffer_.erase(bufferedMessage);
isInterleaved_ = false;
}
}
if (message_ == nullptr) {
if (frameHeader.getType() == FrameType::FIRST || frameHeader.getType() == FrameType::BULK) {
AASDK_LOG(debug) << "[MessageInStream] 11. New message created with Index " << std::to_string(currentMessageIndex_);
currentMessageIndex_++;
} else {
// This will be an invalid message, but we still need to read from the buffer.
// If it's first or bulk, we need to override the message anyhow, so we will start again.
// Need to start a new message anyhow
message_ = std::make_shared<Message>(frameHeader.getChannelId(), frameHeader.getEncryptionType(), frameHeader.getMessageType());
}
} else {
AASDK_LOG(debug) << "[MessageInStream] Could not find existing message.";
// No Message Found in Buffers and this is a middle or last frame, this an error.
// Still need to process the frame, but we will not resolve at the end.
message_ = std::make_shared<Message>(frameHeader.getChannelId(), frameHeader.getEncryptionType(), frameHeader.getMessageType());
if (frameHeader.getType() == FrameType::MIDDLE || frameHeader.getType() == FrameType::LAST) {
// This is an error
isValidFrame_ = false;
}
message_ = std::make_shared<Message>(frameHeader.getChannelId(), frameHeader.getEncryptionType(), frameHeader.getMessageType());
}
thisFrameType_ = frameHeader.getType();
@ -172,17 +149,15 @@ void MessageInStream::receiveFramePayloadHandler(const common::DataConstBuffer&
// If this is the LAST frame or a BULK frame...
if((thisFrameType_ == FrameType::BULK || thisFrameType_ == FrameType::LAST) && isValidFrame_)
{
if (!isInterleaved_) {
AASDK_LOG(debug) << "[MessageInStream] 12. Resolving Normal message. " << std::to_string(currentMessageIndex_);
promise_->resolve(std::move(message_));
promise_.reset();
isResolved = true;
} else {
AASDK_LOG(debug) << "[MessageInStream] 13. Resolving Interleaved Message. " << std::to_string(currentMessageIndex_);
interleavedPromise_->resolve(std::move(message_));
}
AASDK_LOG(debug) << "[MessageInStream] Resolving message.";
promise_->resolve(std::move(message_));
promise_.reset();
isResolved = true;
currentMessageIndex_--;
message_.reset();
} else {
// First or Middle message, we'll store in our buffer...
messageBuffer_[message_->getChannelId()] = std::move(message_);
}
// If the main promise isn't resolved, then carry on retrieving frame headers.

View File

@ -32,43 +32,28 @@ Messenger::Messenger(boost::asio::io_service& ioService, IMessageInStream::Point
, messageInStream_(std::move(messageInStream))
, messageOutStream_(std::move(messageOutStream))
{
currentPromiseIndex_ = 0;
currentMessageIndex_ = 0;
}
void Messenger::enqueueReceive(ChannelId channelId, ReceivePromise::Pointer promise)
{
// enqueueReceive is called from the service channel.
AASDK_LOG(debug) << "[Messenger] 1. enqueueReceived called on Channel " << channelIdToString(channelId);
receiveStrand_.dispatch([this, self = this->shared_from_this(), channelId, promise = std::move(promise)]() mutable {
auto interleavedPromise = ReceivePromise::defer(receiveStrand_);
interleavedPromise->then(std::bind(&Messenger::interleavedMessageHandler, this->shared_from_this(), std::placeholders::_1),
std::bind(&Messenger::rejectInterleavedMessageHandler, this->shared_from_this(), std::placeholders::_1));
messageInStream_->setInterleavedHandler(std::move(interleavedPromise));
//If there's any messages on the channel, resolve. The channel will call enqueueReceive again.
if(!channelReceiveMessageQueue_.empty(channelId))
{
promise->resolve(std::move(channelReceiveMessageQueue_.pop(channelId)));
AASDK_LOG(debug) << "[Messenger] 2. Message Queue not Empty. Resolving promise with Existing Message.";
}
else
{
currentPromiseIndex_++;
AASDK_LOG(debug) << "[Messenger] 3. Message Queue is Empty. Pushing Promise on Index " << std::to_string(currentPromiseIndex_);
channelReceivePromiseQueue_.push(channelId, std::move(promise));
if(channelReceivePromiseQueue_.size() == 1)
{
currentMessageIndex_++;
AASDK_LOG(debug) << "[Messenger] 4. Calling startReceive on MessageIndex " << std::to_string(currentMessageIndex_);
auto inStreamPromise = ReceivePromise::defer(receiveStrand_);
inStreamPromise->then(std::bind(&Messenger::inStreamMessageHandler, this->shared_from_this(), std::placeholders::_1),
std::bind(&Messenger::rejectReceivePromiseQueue, this->shared_from_this(), std::placeholders::_1));
messageInStream_->startReceive(std::move(inStreamPromise), channelId, currentPromiseIndex_, currentMessageIndex_);
messageInStream_->startReceive(std::move(inStreamPromise));
}
}
});
@ -89,32 +74,24 @@ void Messenger::enqueueSend(Message::Pointer message, SendPromise::Pointer promi
void Messenger::inStreamMessageHandler(Message::Pointer message)
{
auto channelId = message->getChannelId();
AASDK_LOG(debug) << "[Messenger] 5. inStreamMessageHandler from Channel " << channelIdToString(channelId);
currentMessageIndex_--;
// If there's a promise on the queue, we resolve the promise with this message....
if(channelReceivePromiseQueue_.isPending(channelId))
{
channelReceivePromiseQueue_.pop(channelId)->resolve(std::move(message));
currentPromiseIndex_--;
}
else
{
// Or we push the message to the Message Queue for when we do get a promise
AASDK_LOG(debug) << "[Messenger] 7. Pushing Message to Queue as we have no Pending Promises." << std::to_string(currentPromiseIndex_);
channelReceiveMessageQueue_.push(std::move(message));
}
if(!channelReceivePromiseQueue_.empty())
{
currentMessageIndex_++;
AASDK_LOG(debug) << "[Messenger] 8. Calling startReceive on PromiseIndex " << std::to_string(currentPromiseIndex_) << " and MessageIndex " << std::to_string(currentMessageIndex_);
auto inStreamPromise = ReceivePromise::defer(receiveStrand_);
inStreamPromise->then(std::bind(&Messenger::inStreamMessageHandler, this->shared_from_this(), std::placeholders::_1),
std::bind(&Messenger::rejectReceivePromiseQueue, this->shared_from_this(), std::placeholders::_1));
messageInStream_->startReceive(std::move(inStreamPromise), channelId, currentPromiseIndex_,currentMessageIndex_);
messageInStream_->startReceive(std::move(inStreamPromise));
}
}
@ -128,20 +105,6 @@ void Messenger::doSend()
messageOutStream_->stream(std::move(queueElementIter->first), std::move(outStreamPromise));
}
void Messenger::interleavedMessageHandler(Message::Pointer message)
{
auto channelId = message->getChannelId();
AASDK_LOG(debug) << "[Messenger] 9. interleavedMessageHandler from Channel " << channelIdToString(channelId);
channelReceiveMessageQueue_.push(std::move(message));
auto interleavedPromise = ReceivePromise::defer(receiveStrand_);
interleavedPromise->then(std::bind(&Messenger::interleavedMessageHandler, this->shared_from_this(), std::placeholders::_1),
std::bind(&Messenger::rejectInterleavedMessageHandler, this->shared_from_this(), std::placeholders::_1));
messageInStream_->setInterleavedHandler(std::move(interleavedPromise));
}
void Messenger::outStreamMessageHandler(ChannelSendQueue::iterator queueElement)
{
queueElement->second->resolve();
@ -158,15 +121,9 @@ void Messenger::rejectReceivePromiseQueue(const error::Error& e)
while(!channelReceivePromiseQueue_.empty())
{
channelReceivePromiseQueue_.pop()->reject(e);
currentPromiseIndex_--;
}
}
void Messenger::rejectInterleavedMessageHandler(const error::Error& e)
{
// Dummy
}
void Messenger::rejectSendPromiseQueue(const error::Error& e)
{
while(!channelSendPromiseQueue_.empty())
@ -179,7 +136,6 @@ void Messenger::rejectSendPromiseQueue(const error::Error& e)
void Messenger::stop()
{
currentPromiseIndex_ = 0;
receiveStrand_.dispatch([this, self = this->shared_from_this()]() {
channelReceiveMessageQueue_.clear();
});