From e5e72d1eda1aa9b4b1e549189768ca28fe2afc16 Mon Sep 17 00:00:00 2001 From: "michal.szwaj" Date: Mon, 12 Mar 2018 17:53:31 +0100 Subject: [PATCH] Implementation of TCPTransport --- include/f1x/aasdk/TCP/ITCPEndpoint.hpp | 2 +- include/f1x/aasdk/Transport/TCPTransport.hpp | 49 ++++++++ src/TCP/TCPEndpoint.cpp | 4 +- src/Transport/TCPTransport.cpp | 115 +++++++++++++++++++ 4 files changed, 167 insertions(+), 3 deletions(-) create mode 100644 include/f1x/aasdk/Transport/TCPTransport.hpp create mode 100644 src/Transport/TCPTransport.cpp diff --git a/include/f1x/aasdk/TCP/ITCPEndpoint.hpp b/include/f1x/aasdk/TCP/ITCPEndpoint.hpp index 80c01c9..eb01c8b 100644 --- a/include/f1x/aasdk/TCP/ITCPEndpoint.hpp +++ b/include/f1x/aasdk/TCP/ITCPEndpoint.hpp @@ -15,7 +15,7 @@ class ITCPEndpoint { public: typedef std::shared_ptr Pointer; - typedef io::Promise Promise; + typedef io::Promise Promise; virtual ~ITCPEndpoint() = default; diff --git a/include/f1x/aasdk/Transport/TCPTransport.hpp b/include/f1x/aasdk/Transport/TCPTransport.hpp new file mode 100644 index 0000000..024b585 --- /dev/null +++ b/include/f1x/aasdk/Transport/TCPTransport.hpp @@ -0,0 +1,49 @@ +#pragma once + +#include +#include +#include + +namespace f1x +{ +namespace aasdk +{ +namespace transport +{ + +class TCPTransport: public ITransport, public std::enable_shared_from_this, boost::noncopyable +{ +public: + TCPTransport(boost::asio::io_service& ioService, tcp::ITCPEndpoint::Pointer tcpEndpoint); + + void receive(size_t size, ReceivePromise::Pointer promise) override; + void send(common::Data data, SendPromise::Pointer promise) override; + void stop() override; + +private: + typedef std::list> ReceiveQueue; + typedef std::list> SendQueue; + + using std::enable_shared_from_this::shared_from_this; + void doReceive(); + void receiveHandler(ReceiveQueue::iterator queueElement); + void receiveFailureHandler(const aasdk::error::Error& e, ReceiveQueue::iterator queueElement); + + void doSend(); + void sendHandler(SendQueue::iterator queueElement); + void sendFailureHandler(const aasdk::error::Error& e, SendQueue::iterator queueElement); + + boost::asio::io_service& ioService_; + tcp::ITCPEndpoint::Pointer tcpEndpoint_; + + boost::asio::io_service::strand receiveStrand_; + ReceiveQueue receiveQueue_; + common::Data receiveData_; + + boost::asio::io_service::strand sendStrand_; + SendQueue sendQueue_; +}; + +} +} +} diff --git a/src/TCP/TCPEndpoint.cpp b/src/TCP/TCPEndpoint.cpp index 8c534f8..0b914f1 100644 --- a/src/TCP/TCPEndpoint.cpp +++ b/src/TCP/TCPEndpoint.cpp @@ -41,11 +41,11 @@ void TCPEndpoint::stop() socket_.close(ec); } -void TCPEndpoint::asyncOperationHandler(const boost::system::error_code& ec, size_t bytesTransferred, Promise::Pointer promise) +void TCPEndpoint::asyncOperationHandler(const boost::system::error_code& ec, size_t, Promise::Pointer promise) { if(!ec) { - promise->resolve(bytesTransferred); + promise->resolve(); } else { diff --git a/src/Transport/TCPTransport.cpp b/src/Transport/TCPTransport.cpp new file mode 100644 index 0000000..72fcab4 --- /dev/null +++ b/src/Transport/TCPTransport.cpp @@ -0,0 +1,115 @@ +#include + +namespace f1x +{ +namespace aasdk +{ +namespace transport +{ + +TCPTransport::TCPTransport(boost::asio::io_service& ioService, tcp::ITCPEndpoint::Pointer tcpEndpoint) + : ioService_(ioService) + , tcpEndpoint_(std::move(tcpEndpoint)) + , receiveStrand_(ioService_) + , sendStrand_(ioService_) +{ + +} + +void TCPTransport::receive(size_t size, ReceivePromise::Pointer promise) +{ + receiveStrand_.dispatch([this, self = this->shared_from_this(), size, promise = std::move(promise)]() mutable { + receiveQueue_.emplace_back(std::make_pair(size, std::move(promise))); + + if(receiveQueue_.size() == 1) + { + this->doReceive(); + } + }); +} + +void TCPTransport::send(common::Data data, SendPromise::Pointer promise) +{ + sendStrand_.dispatch([this, self = this->shared_from_this(), data = std::move(data), promise = std::move(promise)]() mutable { + sendQueue_.emplace_back(std::make_pair(std::move(data), std::move(promise))); + + if(sendQueue_.size() == 1) + { + this->doSend(); + } + }); +} + +void TCPTransport::stop() +{ + tcpEndpoint_->stop(); +} + +void TCPTransport::doReceive() +{ + auto queueElement = receiveQueue_.begin(); + receiveData_.resize(queueElement->first); + + auto receivePromise = tcp::ITCPEndpoint::Promise::defer(receiveStrand_); + receivePromise->then(std::bind(&TCPTransport::receiveHandler, this->shared_from_this(), queueElement), + std::bind(&TCPTransport::receiveFailureHandler, this->shared_from_this(), std::placeholders::_1, queueElement)); + tcpEndpoint_->receive(common::DataBuffer(receiveData_), std::move(receivePromise)); +} + +void TCPTransport::receiveHandler(ReceiveQueue::iterator queueElement) +{ + queueElement->second->resolve(std::move(receiveData_)); + receiveQueue_.erase(queueElement); + + if(!receiveQueue_.empty()) + { + this->doReceive(); + } +} + +void TCPTransport::receiveFailureHandler(const aasdk::error::Error& e, ReceiveQueue::iterator queueElement) +{ + queueElement->second->reject(e); + receiveQueue_.erase(queueElement); + + if(!receiveQueue_.empty()) + { + this->doReceive(); + } +} + +void TCPTransport::doSend() +{ + auto queueElement = sendQueue_.begin(); + + auto sendPromise = tcp::ITCPEndpoint::Promise::defer(sendStrand_); + sendPromise->then(std::bind(&TCPTransport::sendHandler, this->shared_from_this(), queueElement), + std::bind(&TCPTransport::sendFailureHandler, this->shared_from_this(), std::placeholders::_1, queueElement)); + tcpEndpoint_->send(common::DataConstBuffer(queueElement->first), std::move(sendPromise)); +} + +void TCPTransport::sendHandler(SendQueue::iterator queueElement) +{ + queueElement->second->resolve(); + sendQueue_.erase(queueElement); + + if(!sendQueue_.empty()) + { + this->doSend(); + } +} + +void TCPTransport::sendFailureHandler(const aasdk::error::Error& e, SendQueue::iterator queueElement) +{ + queueElement->second->reject(e); + sendQueue_.erase(queueElement); + + if(!sendQueue_.empty()) + { + this->doSend(); + } +} + +} +} +}