pull/9/head
snirpo 2019-12-01 20:01:45 +01:00 committed by Ryan Press
parent 1d91dec240
commit 23b7f37791
3 changed files with 11 additions and 2 deletions

View File

@ -49,6 +49,7 @@ private:
void outStreamMessageHandler(ChannelSendQueue::iterator queueElement);
void rejectReceivePromiseQueue(const error::Error& e);
void rejectSendPromiseQueue(const error::Error& e);
void parseMessage(Message::Pointer message, ReceivePromise::Pointer promise);
boost::asio::io_service::strand receiveStrand_;
boost::asio::io_service::strand sendStrand_;

View File

@ -17,6 +17,7 @@
*/
#include <f1x/aasdk/Messenger/Message.hpp>
#include <f1x/aasdk/Common/Log.hpp>
namespace f1x
{
@ -83,6 +84,7 @@ void Message::insertPayload(const common::Data& payload)
void Message::insertPayload(const google::protobuf::Message& message)
{
AASDK_LOG(debug) << message.DebugString();
auto offset = payload_.size();
payload_.resize(payload_.size() + message.ByteSize());

View File

@ -19,6 +19,7 @@
#include <boost/endian/conversion.hpp>
#include <f1x/aasdk/Error/Error.hpp>
#include <f1x/aasdk/Messenger/Messenger.hpp>
#include <f1x/aasdk/Common/Log.hpp>
namespace f1x
{
@ -41,7 +42,7 @@ void Messenger::enqueueReceive(ChannelId channelId, ReceivePromise::Pointer prom
receiveStrand_.dispatch([this, self = this->shared_from_this(), channelId, promise = std::move(promise)]() mutable {
if(!channelReceiveMessageQueue_.empty(channelId))
{
promise->resolve(std::move(channelReceiveMessageQueue_.pop(channelId)));
this->parseMessage(channelReceiveMessageQueue_.pop(channelId), promise);
}
else
{
@ -76,7 +77,7 @@ void Messenger::inStreamMessageHandler(Message::Pointer message)
if(channelReceivePromiseQueue_.isPending(channelId))
{
channelReceivePromiseQueue_.pop(channelId)->resolve(std::move(message));
this->parseMessage(message, channelReceivePromiseQueue_.pop(channelId));
}
else
{
@ -92,6 +93,11 @@ void Messenger::inStreamMessageHandler(Message::Pointer message)
}
}
void Messenger::parseMessage(Message::Pointer message, ReceivePromise::Pointer promise) {
AASDK_LOG(debug) << channelIdToString(message->getChannelId()) << " " << MessageId(message->getPayload());
promise->resolve(message);
}
void Messenger::doSend()
{
auto queueElementIter = channelSendPromiseQueue_.begin();