From 4432aba25b6ee68356e0ddfc724afb8373651945 Mon Sep 17 00:00:00 2001 From: Michael Meeks Date: Fri, 25 Nov 2016 09:48:59 +0000 Subject: [PATCH] Revert "loolwsd: support reading long messages directly" This reverts commit 84607b43a31574533471defcb4756ba855f835f1. LOOLWebSocket piece requires a much too recent Poco. --- loolwsd/IoUtil.cpp | 42 +++++++++++++++++++++++++---------- loolwsd/LOOLWebSocket.hpp | 46 +++++++++++---------------------------- loolwsd/test/helpers.hpp | 28 +++++++++++++++++++----- 3 files changed, 67 insertions(+), 49 deletions(-) diff --git a/loolwsd/IoUtil.cpp b/loolwsd/IoUtil.cpp index adf5c55f38..035dcf884e 100644 --- a/loolwsd/IoUtil.cpp +++ b/loolwsd/IoUtil.cpp @@ -50,17 +50,17 @@ void SocketProcessor(const std::shared_ptr& ws, // Timeout given is in microseconds. static const Poco::Timespan waitTime(POLL_TIMEOUT_MS * 1000); - constexpr auto bufferSize = READ_BUFFER_SIZE * 8; - + const auto bufferSize = READ_BUFFER_SIZE * 100; int flags = 0; int n = -1; bool stop = false; std::vector payload(bufferSize); - Poco::Buffer buffer(bufferSize); try { ws->setReceiveTimeout(0); + payload.resize(0); + for (;;) { stop = stopPredicate(); @@ -79,12 +79,10 @@ void SocketProcessor(const std::shared_ptr& ws, try { - payload.resize(0); - buffer.resize(0); + payload.resize(payload.capacity()); n = -1; - n = ws->receiveFrame(buffer, flags); - LOG_WRN("GOT: [" << LOOLProtocol::getAbbreviatedMessage(buffer.begin(), buffer.size()) << "]"); - payload.insert(payload.end(), buffer.begin(), buffer.end()); + n = ws->receiveFrame(payload.data(), payload.capacity(), flags); + payload.resize(n > 0 ? n : 0); } catch (const Poco::TimeoutException&) { @@ -101,7 +99,7 @@ void SocketProcessor(const std::shared_ptr& ws, assert(n > 0); - const std::string firstLine = LOOLProtocol::getFirstLine(buffer.begin(), buffer.size()); + const std::string firstLine = LOOLProtocol::getFirstLine(payload); if ((flags & WebSocket::FrameFlags::FRAME_FLAG_FIN) != WebSocket::FrameFlags::FRAME_FLAG_FIN) { // One WS message split into multiple frames. @@ -109,7 +107,8 @@ void SocketProcessor(const std::shared_ptr& ws, LOG_WRN("SocketProcessor [" << name << "]: Receiving multi-parm frame."); while (true) { - n = ws->receiveFrame(buffer, flags); + char buffer[READ_BUFFER_SIZE * 10]; + n = ws->receiveFrame(buffer, sizeof(buffer), flags); if (n <= 0 || (flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_CLOSE) { LOG_WRN("SocketProcessor [" << name << "]: Connection closed while reading multiframe message."); @@ -117,7 +116,7 @@ void SocketProcessor(const std::shared_ptr& ws, break; } - payload.insert(payload.end(), buffer.begin(), buffer.end()); + payload.insert(payload.end(), buffer, buffer + n); if ((flags & WebSocket::FrameFlags::FRAME_FLAG_FIN) == WebSocket::FrameFlags::FRAME_FLAG_FIN) { // No more frames. @@ -125,6 +124,27 @@ void SocketProcessor(const std::shared_ptr& ws, } } } + else + { + int size = 0; + Poco::StringTokenizer tokens(firstLine, " ", Poco::StringTokenizer::TOK_IGNORE_EMPTY | Poco::StringTokenizer::TOK_TRIM); + // Check if it is a "nextmessage:" and in that case read the large + // follow-up message separately, and handle that only. + if (tokens.count() == 2 && tokens[0] == "nextmessage:" && + LOOLProtocol::getTokenInteger(tokens[1], "size", size) && size > 0) + { + LOG_TRC("SocketProcessor [" << name << "]: Getting large message of " << size << " bytes."); + if (size > MAX_MESSAGE_SIZE) + { + LOG_ERR("SocketProcessor [" << name << "]: Large-message size (" << size << ") over limit or invalid."); + } + else + { + payload.resize(size); + continue; + } + } + } if (n <= 0 || (flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_CLOSE) { diff --git a/loolwsd/LOOLWebSocket.hpp b/loolwsd/LOOLWebSocket.hpp index fb1d88beb7..e10af2af11 100644 --- a/loolwsd/LOOLWebSocket.hpp +++ b/loolwsd/LOOLWebSocket.hpp @@ -108,39 +108,6 @@ public: return -1; } - - /// Wrapper for Poco::Net::WebSocket::receiveFrame() that handles PING frames - /// (by replying with a PONG frame) and PONG frames. PONG frames are ignored. - /// Should we also factor out the handling of non-final and continuation frames into this? - int receiveFrame(Poco::Buffer& buffer, int& flags) - { -#ifdef ENABLE_DEBUG - // Delay receiving the frame - std::this_thread::sleep_for(getWebSocketDelay()); -#endif - // Timeout given is in microseconds. - static const Poco::Timespan waitTime(POLL_TIMEOUT_MS * 1000); - - while (poll(waitTime, Poco::Net::Socket::SELECT_READ)) - { - const int n = Poco::Net::WebSocket::receiveFrame(buffer, flags); - if (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_PING) - { - sendFrame(buffer.begin(), n, WebSocket::FRAME_FLAG_FIN | WebSocket::FRAME_OP_PONG); - } - else if ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_PONG) - { - // In case we do send pongs in the future. - } - else - { - return n; - } - } - - return -1; - } - /// Wrapper for Poco::Net::WebSocket::sendFrame() that handles large frames. int sendFrame(const char* buffer, const int length, const int flags = FRAME_TEXT) { @@ -150,6 +117,19 @@ public: #endif std::unique_lock lock(_mutex); + // Size after which messages will be sent preceded with + // 'nextmessage' frame to let the receiver know in advance + // the size of larger coming message. All messages up to this + // size are considered small messages. + constexpr int SMALL_MESSAGE_SIZE = READ_BUFFER_SIZE / 2; + + if (length > SMALL_MESSAGE_SIZE) + { + const std::string nextmessage = "nextmessage: size=" + std::to_string(length); + Poco::Net::WebSocket::sendFrame(nextmessage.data(), nextmessage.size()); + Log::debug("Message is long, sent " + nextmessage); + } + const int result = Poco::Net::WebSocket::sendFrame(buffer, length, flags); lock.unlock(); diff --git a/loolwsd/test/helpers.hpp b/loolwsd/test/helpers.hpp index 7e5f03946e..30aeac1272 100644 --- a/loolwsd/test/helpers.hpp +++ b/loolwsd/test/helpers.hpp @@ -197,7 +197,6 @@ std::vector getResponseMessage(LOOLWebSocket& ws, const std::string& prefi int retries = timeoutMs / 500; const Poco::Timespan waitTime(retries ? timeoutMs * 1000 / retries : timeoutMs * 1000); std::vector response; - Poco::Buffer buffer(READ_BUFFER_SIZE); bool timedout = false; ws.setReceiveTimeout(0); @@ -211,10 +210,9 @@ std::vector getResponseMessage(LOOLWebSocket& ws, const std::string& prefi timedout = false; } - response.resize(0); - buffer.resize(0); - const int bytes = ws.receiveFrame(buffer, flags); - response.insert(response.end(), buffer.begin(), buffer.end()); + response.resize(READ_BUFFER_SIZE); + int bytes = ws.receiveFrame(response.data(), response.size(), flags); + response.resize(bytes >= 0 ? bytes : 0); std::cerr << name << "Got " << LOOLProtocol::getAbbreviatedFrameDump(response.data(), bytes, flags) << std::endl; const auto message = LOOLProtocol::getFirstLine(response); if (bytes > 0 && (flags & Poco::Net::WebSocket::FRAME_OP_BITMASK) != Poco::Net::WebSocket::FRAME_OP_CLOSE) @@ -223,6 +221,26 @@ std::vector getResponseMessage(LOOLWebSocket& ws, const std::string& prefi { return response; } + else if (LOOLProtocol::matchPrefix("nextmessage", message)) + { + int size = 0; + if (LOOLProtocol::getTokenIntegerFromMessage(message, "size", size) && size > 0) + { + response.resize(size); + bytes = ws.receiveFrame(response.data(), response.size(), flags); + response.resize(bytes >= 0 ? bytes : 0); + std::cerr << name << "Got " << LOOLProtocol::getAbbreviatedFrameDump(response.data(), bytes, flags) << std::endl; + if (bytes > 0 && + LOOLProtocol::matchPrefix(prefix, LOOLProtocol::getFirstLine(response))) + { + return response; + } + } + } + } + else + { + response.resize(0); } if (bytes <= 0)