From 175dfa6b60dfb5f40bbe2838042a4b9818a81c46 Mon Sep 17 00:00:00 2001 From: "michal.szwaj" Date: Thu, 15 Mar 2018 18:28:02 +0100 Subject: [PATCH] Use Transport base class in TCPTransport --- include/f1x/aasdk/Transport/TCPTransport.hpp | 30 +----- include/f1x/aasdk/Transport/USBTransport.hpp | 4 - src/Transport/TCPTransport.cpp | 103 +++++-------------- src/Transport/Transport.cpp | 13 +++ src/Transport/USBTransport.cpp | 13 --- 5 files changed, 43 insertions(+), 120 deletions(-) diff --git a/include/f1x/aasdk/Transport/TCPTransport.hpp b/include/f1x/aasdk/Transport/TCPTransport.hpp index c4eeefd..0159176 100644 --- a/include/f1x/aasdk/Transport/TCPTransport.hpp +++ b/include/f1x/aasdk/Transport/TCPTransport.hpp @@ -18,10 +18,8 @@ #pragma once -#include #include -#include -#include +#include namespace f1x { @@ -30,37 +28,19 @@ namespace aasdk namespace transport { -class TCPTransport: public ITransport, public std::enable_shared_from_this, boost::noncopyable +class TCPTransport: public Transport { public: TCPTransport(boost::asio::io_service& ioService, tcp::ITCPEndpoint::Pointer tcpEndpoint); - void receive(size_t size, ReceivePromise::Pointer promise) override; - void send(common::Data data, SendPromise::Pointer promise) override; void stop() override; private: - typedef std::list> ReceiveQueue; - typedef std::list> SendQueue; + void enqueueReceive(common::DataBuffer buffer) override; + void enqueueSend(SendQueue::iterator queueElement) override; + void sendHandler(SendQueue::iterator queueElement, const error::Error& e); - using std::enable_shared_from_this::shared_from_this; - void distributeReceivedData(); - void receiveHandler(size_t bytesTransferred); - void receiveFailureHandler(const aasdk::error::Error& e); - - void doSend(); - void sendHandler(size_t bytesTransferred, SendQueue::iterator queueElement); - void sendFailureHandler(const aasdk::error::Error& e, SendQueue::iterator queueElement); - - boost::asio::io_service& ioService_; tcp::ITCPEndpoint::Pointer tcpEndpoint_; - - boost::asio::io_service::strand receiveStrand_; - ReceiveQueue receiveQueue_; - DataSink tcpReceivedDataSink_; - - boost::asio::io_service::strand sendStrand_; - SendQueue sendQueue_; }; } diff --git a/include/f1x/aasdk/Transport/USBTransport.hpp b/include/f1x/aasdk/Transport/USBTransport.hpp index ffffb79..3464130 100644 --- a/include/f1x/aasdk/Transport/USBTransport.hpp +++ b/include/f1x/aasdk/Transport/USBTransport.hpp @@ -18,11 +18,8 @@ #pragma once -#include -#include #include #include -#include #include namespace f1x @@ -42,7 +39,6 @@ public: private: void enqueueReceive(common::DataBuffer buffer) override; void enqueueSend(SendQueue::iterator queueElement) override; - void receiveHandler(size_t bytesTransferred); void doSend(SendQueue::iterator queueElement, common::Data::size_type offset); void sendHandler(SendQueue::iterator queueElement, common::Data::size_type offset, size_t bytesTransferred); diff --git a/src/Transport/TCPTransport.cpp b/src/Transport/TCPTransport.cpp index 21f0466..5c9c624 100644 --- a/src/Transport/TCPTransport.cpp +++ b/src/Transport/TCPTransport.cpp @@ -26,36 +26,37 @@ namespace transport { TCPTransport::TCPTransport(boost::asio::io_service& ioService, tcp::ITCPEndpoint::Pointer tcpEndpoint) - : ioService_(ioService) + : Transport(ioService) , tcpEndpoint_(std::move(tcpEndpoint)) - , receiveStrand_(ioService_) - , sendStrand_(ioService_) { } -void TCPTransport::receive(size_t size, ReceivePromise::Pointer promise) +void TCPTransport::enqueueReceive(common::DataBuffer buffer) { - receiveStrand_.dispatch([this, self = this->shared_from_this(), size, promise = std::move(promise)]() mutable { - receiveQueue_.emplace_back(std::make_pair(size, std::move(promise))); + auto receivePromise = tcp::ITCPEndpoint::Promise::defer(sendStrand_); + receivePromise->then([this, self = this->shared_from_this()](auto bytesTransferred) { + this->receiveHandler(bytesTransferred); + }, + [this, self = this->shared_from_this()](auto e) { + this->rejectReceivePromises(e); + }); - if(receiveQueue_.size() == 1) - { - this->distributeReceivedData(); - } - }); + tcpEndpoint_->receive(buffer, std::move(receivePromise)); } -void TCPTransport::send(common::Data data, SendPromise::Pointer promise) +void TCPTransport::enqueueSend(SendQueue::iterator queueElement) { - sendStrand_.dispatch([this, self = this->shared_from_this(), data = std::move(data), promise = std::move(promise)]() mutable { - sendQueue_.emplace_back(std::make_pair(std::move(data), std::move(promise))); + auto sendPromise = tcp::ITCPEndpoint::Promise::defer(sendStrand_); - if(sendQueue_.size() == 1) - { - this->doSend(); - } + sendPromise->then([this, self = this->shared_from_this(), queueElement](auto) { + this->sendHandler(queueElement, error::Error()); + }, + [this, self = this->shared_from_this(), queueElement](auto e) { + this->sendHandler(queueElement, e); }); + + tcpEndpoint_->send(common::DataConstBuffer(queueElement->first), std::move(sendPromise)); } void TCPTransport::stop() @@ -63,76 +64,22 @@ void TCPTransport::stop() tcpEndpoint_->stop(); } -void TCPTransport::receiveHandler(size_t bytesTransferred) +void TCPTransport::sendHandler(SendQueue::iterator queueElement, const error::Error& e) { - try + if(!e) { - tcpReceivedDataSink_.commit(bytesTransferred); - this->distributeReceivedData(); + queueElement->second->resolve(); } - catch(const error::Error& e) + else { - //this->rejectReceivePromises(e); + queueElement->second->reject(e); } -} -void TCPTransport::receiveFailureHandler(const aasdk::error::Error& e) -{ -} - -void TCPTransport::doSend() -{ - auto queueElement = sendQueue_.begin(); - - auto sendPromise = tcp::ITCPEndpoint::Promise::defer(sendStrand_); - sendPromise->then(std::bind(&TCPTransport::sendHandler, this->shared_from_this(), std::placeholders::_1, queueElement), - std::bind(&TCPTransport::sendFailureHandler, this->shared_from_this(), std::placeholders::_1, queueElement)); - - tcpEndpoint_->send(common::DataConstBuffer(queueElement->first), std::move(sendPromise)); -} - -void TCPTransport::sendHandler(size_t, SendQueue::iterator queueElement) -{ - queueElement->second->resolve(); sendQueue_.erase(queueElement); if(!sendQueue_.empty()) { - this->doSend(); - } -} - -void TCPTransport::sendFailureHandler(const aasdk::error::Error& e, SendQueue::iterator queueElement) -{ - queueElement->second->reject(e); - sendQueue_.erase(queueElement); - - if(!sendQueue_.empty()) - { - this->doSend(); - } -} - -void TCPTransport::distributeReceivedData() -{ - for(auto queueElement = receiveQueue_.begin(); queueElement != receiveQueue_.end();) - { - if(tcpReceivedDataSink_.getAvailableSize() < queueElement->first) - { - auto buffer = tcpReceivedDataSink_.fill(); - - auto receivePromise = tcp::ITCPEndpoint::Promise::defer(receiveStrand_); - receivePromise->then(std::bind(&TCPTransport::receiveHandler, this->shared_from_this(), std::placeholders::_1), - std::bind(&TCPTransport::receiveFailureHandler, this->shared_from_this(), std::placeholders::_1)); - tcpEndpoint_->receive(buffer, std::move(receivePromise)); - break; - } - else - { - auto data(tcpReceivedDataSink_.consume(queueElement->first)); - queueElement->second->resolve(std::move(data)); - queueElement = receiveQueue_.erase(queueElement); - } + this->enqueueSend(sendQueue_.begin()); } } diff --git a/src/Transport/Transport.cpp b/src/Transport/Transport.cpp index 60c4935..9f64242 100644 --- a/src/Transport/Transport.cpp +++ b/src/Transport/Transport.cpp @@ -49,6 +49,19 @@ void Transport::receive(size_t size, ReceivePromise::Pointer promise) }); } +void Transport::receiveHandler(size_t bytesTransferred) +{ + try + { + receivedDataSink_.commit(bytesTransferred); + this->distributeReceivedData(); + } + catch(const error::Error& e) + { + this->rejectReceivePromises(e); + } +} + void Transport::distributeReceivedData() { for(auto queueElement = receiveQueue_.begin(); queueElement != receiveQueue_.end();) diff --git a/src/Transport/USBTransport.cpp b/src/Transport/USBTransport.cpp index 3eec8df..7a8a1cc 100644 --- a/src/Transport/USBTransport.cpp +++ b/src/Transport/USBTransport.cpp @@ -48,19 +48,6 @@ void USBTransport::enqueueSend(SendQueue::iterator queueElement) this->doSend(queueElement, 0); } -void USBTransport::receiveHandler(size_t bytesTransferred) -{ - try - { - receivedDataSink_.commit(bytesTransferred); - this->distributeReceivedData(); - } - catch(const error::Error& e) - { - this->rejectReceivePromises(e); - } -} - void USBTransport::doSend(SendQueue::iterator queueElement, common::Data::size_type offset) { auto usbEndpointPromise = usb::IUSBEndpoint::Promise::defer(sendStrand_);