Rework to message handling.

Promises are being used as Observables and when we hit an interleaved frame, we're resolving a promise on a message for a different channel. Although previous way worked, it was somewhat wrong.

We have tried to ensure here we resolve the original message only and anything else we send back via an interleaved Frame Handler.

Ideally we should then work to ensure we get the interleaved message to the appropriate channel. We'll have a look to setup an interleaved messaged handler that we can send messages up to the chain, or possibly better yet, turn the Messenger/MessageInStream to be responsible for sending to the appropriate service through observables.
pull/14/head
Simon.Dean 2022-01-31 14:09:47 +00:00
parent efcc4fd7ca
commit db60c041b0
5 changed files with 116 additions and 36 deletions

View File

@ -37,6 +37,7 @@ public:
virtual ~IMessageInStream() = default;
virtual void startReceive(ReceivePromise::Pointer promise) = 0;
virtual void setInterleavedHandler(ReceivePromise::Pointer promise) = 0;
};
}

View File

@ -37,6 +37,7 @@ public:
MessageInStream(boost::asio::io_service& ioService, transport::ITransport::Pointer transport, ICryptor::Pointer cryptor);
void startReceive(ReceivePromise::Pointer promise) override;
void setInterleavedHandler(ReceivePromise::Pointer promise) override;
private:
using std::enable_shared_from_this<MessageInStream>::shared_from_this;
@ -48,13 +49,21 @@ private:
boost::asio::io_service::strand strand_;
transport::ITransport::Pointer transport_;
ICryptor::Pointer cryptor_;
FrameType recentFrameType_;
FrameType thisFrameType_;
ReceivePromise::Pointer promise_;
ReceivePromise::Pointer interleavedPromise_;
Message::Pointer message_;
int frameSize_;
std::map<messenger::ChannelId, Message::Pointer> channel_assembly_buffers;
ChannelId currentChannelId_;
ChannelId originalMessageChannelId_;
std::map<messenger::ChannelId, Message::Pointer> messageBuffer_;
bool isInterleaved_;
bool haveOriginalChannel_;
bool isNewMessage_;
};
}

View File

@ -46,9 +46,11 @@ private:
typedef std::list<std::pair<Message::Pointer, SendPromise::Pointer>> ChannelSendQueue;
void doSend();
void inStreamMessageHandler(Message::Pointer message);
void randomInStreamMessageHandler(Message::Pointer message);
void outStreamMessageHandler(ChannelSendQueue::iterator queueElement);
void rejectReceivePromiseQueue(const error::Error& e);
void rejectSendPromiseQueue(const error::Error& e);
void randomRejectReceivePromiseQueue(const error::Error& e);
void parseMessage(Message::Pointer message, ReceivePromise::Pointer promise);
boost::asio::io_service::strand receiveStrand_;

View File

@ -33,7 +33,7 @@ MessageInStream::MessageInStream(boost::asio::io_service& ioService, transport::
, transport_(std::move(transport))
, cryptor_(std::move(cryptor))
{
isNewMessage_ = true;
}
void MessageInStream::startReceive(ReceivePromise::Pointer promise)
@ -42,7 +42,7 @@ void MessageInStream::startReceive(ReceivePromise::Pointer promise)
if(promise_ == nullptr)
{
promise_ = std::move(promise);
isNewMessage_ = true;
auto transportPromise = transport::ITransport::ReceivePromise::defer(strand_);
transportPromise->then(
[this, self = this->shared_from_this()](common::Data data) mutable {
@ -62,36 +62,68 @@ void MessageInStream::startReceive(ReceivePromise::Pointer promise)
});
}
void MessageInStream::setInterleavedHandler(ReceivePromise::Pointer promise)
{
interleavedPromise_ = std::move(promise);
}
void MessageInStream::receiveFrameHeaderHandler(const common::DataConstBuffer& buffer)
{
FrameHeader frameHeader(buffer);
if (buffer.cdata[0] != 3) {
AASDK_LOG(debug) << "Message from channel " << std::to_string(buffer.cdata[0]);
isInterleaved_ = false;
// Store the ChannelId if this is a new message.
if (isNewMessage_) {
originalMessageChannelId_ = frameHeader.getChannelId();
isNewMessage_ = false;
}
// If Frame Channel does not match Message Channel, store Existing Message in Buffer.
if(message_ != nullptr && message_->getChannelId() != frameHeader.getChannelId())
{
// we have interleaved channels, stop working on the old one and store it for later; Switch to the new one
channel_assembly_buffers[message_->getChannelId()] = message_;
AASDK_LOG(debug) << "[MessageInStream] ChannelId mismatch -- Frame " << channelIdToString(frameHeader_.getChannelId()) << " -- Message -- " << channelIdToString(message_.getChannelId());
isInterleaved_ = true;
messageBuffer_[message_->getChannelId()] = message_;
message_ = nullptr;
// message_.reset();
// promise_->reject(error::Error(error::ErrorCode::MESSENGER_INTERTWINED_CHANNELS));
// promise_.reset();
// return;
}
auto prevBuffer = channel_assembly_buffers.find(frameHeader.getChannelId());
if(prevBuffer != channel_assembly_buffers.end()){ // is there previous data in our map?
if(frameHeader.getType()!=FrameType::FIRST) //only use the data if we're not on a new frame, otherwise disregard
message_ = prevBuffer->second;
else{
message_ = std::make_shared<Message>(frameHeader.getChannelId(), frameHeader.getEncryptionType(), frameHeader.getMessageType());
if (frameHeader.getType() == FrameType::FIRST || frameHeader.getType() == FrameType::BULK) {
// If it's a First or Bulk Frame, we need to start a new message.
message_ = std::make_shared<Message>(frameHeader.getChannelId(), frameHeader.getEncryptionType(), frameHeader.getMessageType());
} else {
// This is a Middle or Last Frame. We must find an existing message.
auto bufferedMessage = messageBuffer_.find(frameHeader.getChannelId());
if (bufferedMessage != messageBuffer_.end()) {
/*
* If the original channel does not match, then this is an interleaved frame.
* It is no good just to match the channelid on the message we recovered from the queue, we need
* to go back to the original message channel as even this frame may be ANOTHER interleaved
* message frame within an existing interleaved message.
* Our promise must resolve only the channel id we've been tasked to work on.
* Everything else is incidental.
*/
// We can restore the original message, and and if the current frame matches the chnnale id
// then we're not interleaved anymore.
if (originalMessageChannelId_ == frameHeader.getChannelId()) {
isInterleaved_ = false;
AASDK_LOG(debug) << "[MessageInStream] Restored Message from Buffer";
}
message_ = bufferedMessage->second;
messageBuffer_.erase(bufferedMessage);
}
channel_assembly_buffers.erase(prevBuffer); // get rid of the previously stored data because it's now our working data.
}
else if(message_ == nullptr){
// If we have nothing at this point, start a new message.
if(message_ == nullptr)
{
message_ = std::make_shared<Message>(frameHeader.getChannelId(), frameHeader.getEncryptionType(), frameHeader.getMessageType());
}
recentFrameType_ = frameHeader.getType();
thisFrameType_ = frameHeader.getType();
const size_t frameSize = FrameSize::getSizeOf(frameHeader.getType() == FrameType::FIRST ? FrameSizeType::EXTENDED : FrameSizeType::SHORT);
auto transportPromise = transport::ITransport::ReceivePromise::defer(strand_);
@ -147,23 +179,34 @@ void MessageInStream::receiveFramePayloadHandler(const common::DataConstBuffer&
message_->insertPayload(buffer);
}
if(recentFrameType_ == FrameType::BULK || recentFrameType_ == FrameType::LAST)
bool isResolved = false;
// If this is the LAST frame or a BULK frame...
if(thisFrameType_ == FrameType::BULK || thisFrameType_ == FrameType::LAST)
{
promise_->resolve(std::move(message_));
promise_.reset();
// If this isn't an interleaved frame, then we can resolve the promise
if (!isInterleaved_) {
isResolved = true;
promise_->resolve(std::move(message_));
promise_.reset();
} else {
// Otherwise resolve through our random promise
interleavedPromise_->resolve(std::move(message_));
}
}
else
{
// If the main promise isn't resolved, then carry on retrieving frame headers.
if (!isResolved) {
auto transportPromise = transport::ITransport::ReceivePromise::defer(strand_);
transportPromise->then(
[this, self = this->shared_from_this()](common::Data data) mutable {
this->receiveFrameHeaderHandler(common::DataConstBuffer(data));
},
[this, self = this->shared_from_this()](const error::Error& e) mutable {
message_.reset();
promise_->reject(e);
promise_.reset();
});
[this, self = this->shared_from_this()](common::Data data) mutable {
this->receiveFrameHeaderHandler(common::DataConstBuffer(data));
},
[this, self = this->shared_from_this()](const error::Error& e) mutable {
message_.reset();
promise_->reject(e);
promise_.reset();
});
transport_->receive(FrameHeader::getSizeOf(), std::move(transportPromise));
}

View File

@ -42,7 +42,10 @@ void Messenger::enqueueReceive(ChannelId channelId, ReceivePromise::Pointer prom
receiveStrand_.dispatch([this, self = this->shared_from_this(), channelId, promise = std::move(promise)]() mutable {
if(!channelReceiveMessageQueue_.empty(channelId))
{
this->parseMessage(channelReceiveMessageQueue_.pop(channelId), promise);
//TODO: Use this to check on the Frame/Channel Id?
//this->parseMessage(channelReceiveMessageQueue_.pop(channelId), promise);
promise->resolve(std::move(channelReceiveMessageQueue_.pop(channelId)));
//TODO: Problem with resolving like this, is that if we resolve an interleave frame, our resolve goes to the wrong channel - eg Audio on VideoServiceChannel
}
else
{
@ -54,6 +57,11 @@ void Messenger::enqueueReceive(ChannelId channelId, ReceivePromise::Pointer prom
inStreamPromise->then(std::bind(&Messenger::inStreamMessageHandler, this->shared_from_this(), std::placeholders::_1),
std::bind(&Messenger::rejectReceivePromiseQueue, this->shared_from_this(), std::placeholders::_1));
messageInStream_->startReceive(std::move(inStreamPromise));
auto randomInStreamPromise = ReceivePromise::defer(receiveStrand_);
randomInStreamPromise->then(std::bind(&Messenger::randomInStreamMessageHandler, this->shared_from_this(), std::placeholders::_1),
std::bind(&Messenger::randomRejectReceivePromiseQueue, this->shared_from_this(), std::placeholders::_1));
messageInStream_->setInterleavedHandler(std::move(randomInStreamPromise));
}
}
});
@ -80,7 +88,8 @@ void Messenger::inStreamMessageHandler(Message::Pointer message)
if(channelReceivePromiseQueue_.isPending(channelId))
{
this->parseMessage(message, channelReceivePromiseQueue_.pop(channelId));
//this->parseMessage(message, channelReceivePromiseQueue_.pop(channelId));
channelReceivePromiseQueue_.pop(channelId)->resolve(std::move(message));
}
else
{
@ -96,6 +105,17 @@ void Messenger::inStreamMessageHandler(Message::Pointer message)
}
}
void Messenger::randomInStreamMessageHandler(Message::Pointer message)
{
//AASDK_LOG(debug) << "Interleaved Message Pushed to Queue";;
channelReceiveMessageQueue_.push(std::move(message));
auto randomInStreamPromise = ReceivePromise::defer(receiveStrand_);
randomInStreamPromise->then(std::bind(&Messenger::randomInStreamMessageHandler, this->shared_from_this(), std::placeholders::_1),
std::bind(&Messenger::randomRejectReceivePromiseQueue, this->shared_from_this(), std::placeholders::_1));
messageInStream_->setInterleavedHandler(std::move(randomInStreamPromise));
}
void Messenger::parseMessage(Message::Pointer message, ReceivePromise::Pointer promise) {
if (message->getChannelId() != ChannelId::VIDEO) {
//AASDK_LOG(debug) << channelIdToString(message->getChannelId()) << " " << MessageId(message->getPayload());
@ -132,6 +152,11 @@ void Messenger::rejectReceivePromiseQueue(const error::Error& e)
}
}
void Messenger::randomRejectReceivePromiseQueue(const error::Error& e)
{
// Dummy
}
void Messenger::rejectSendPromiseQueue(const error::Error& e)
{
while(!channelSendPromiseQueue_.empty())