From 466c31d59abb2f1d794bf0b22c296b5dd68ce7a7 Mon Sep 17 00:00:00 2001 From: Michael Meeks Date: Wed, 8 May 2024 15:56:56 +0100 Subject: [PATCH] MessageQueue cleanup, and avoid accidental use in tests. The MessageQueue is not a generic message queue, it has much more un-anticipated functionality; so don't use it where we don't need to. In particular unexpected re-writing and merging of messages during tests is probably not a great idea. Signed-off-by: Michael Meeks Change-Id: I657738307e611be18f5f83e11c055bf8a88826da --- common/MessageQueue.hpp | 3 +++ common/Session.hpp | 1 - kit/Kit.hpp | 2 ++ test/UnitTyping.cpp | 8 ++++---- test/WebSocketSession.hpp | 33 +++++++++++++++++++++------------ test/WhiteBoxTests.cpp | 1 - 6 files changed, 30 insertions(+), 18 deletions(-) diff --git a/common/MessageQueue.hpp b/common/MessageQueue.hpp index 037b435ff2..af71396961 100644 --- a/common/MessageQueue.hpp +++ b/common/MessageQueue.hpp @@ -366,4 +366,7 @@ private: std::vector _viewOrder; }; +// Really this queue is used only in the Kit process +typedef TileQueue KitQueue; + /* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/common/Session.hpp b/common/Session.hpp index ea45e5b172..eb3ea47f57 100644 --- a/common/Session.hpp +++ b/common/Session.hpp @@ -23,7 +23,6 @@ #include "Protocol.hpp" #include "Log.hpp" -#include "MessageQueue.hpp" #include "Message.hpp" #include "TileCache.hpp" #include "WebSocketHandler.hpp" diff --git a/kit/Kit.hpp b/kit/Kit.hpp index ccc674e89f..ac8cba076b 100644 --- a/kit/Kit.hpp +++ b/kit/Kit.hpp @@ -19,6 +19,8 @@ #include #include #include +#include + #include #include "Socket.hpp" diff --git a/test/UnitTyping.cpp b/test/UnitTyping.cpp index 549632c1e7..b61e309f2b 100644 --- a/test/UnitTyping.cpp +++ b/test/UnitTyping.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -131,13 +132,12 @@ public: TestResult testMessageQueueMerging() { - MessageQueue queue; + KitQueue queue; queue.put("child-foo textinput id=0 text=a"); queue.put("child-foo textinput id=0 text=b"); - MessageQueue::Payload v; - v = queue.get(); + auto v = queue.get(); if (!queue.isEmpty()) { @@ -201,7 +201,7 @@ public: v = queue.get(); if (!queue.isEmpty()) { - LOG_ERR("MessageQueue contains more than was put into it"); + LOG_ERR("KitQueue contains more than was put into it"); return TestResult::Failed; } diff --git a/test/WebSocketSession.hpp b/test/WebSocketSession.hpp index c3712fb01b..af72e86e77 100644 --- a/test/WebSocketSession.hpp +++ b/test/WebSocketSession.hpp @@ -19,7 +19,6 @@ #include #include -#include #include "NetUtil.hpp" #include "SigUtil.hpp" #include @@ -36,6 +35,7 @@ namespace http { + /// A client socket for asynchronous Web-Socket protocol. class WebSocketSession final : public WebSocketHandler { @@ -47,6 +47,15 @@ public: }; private: + typedef std::vector> BufferQueue; + + std::vector pop(BufferQueue &queue) + { + auto result = queue.front(); + queue.erase(queue.begin()); + return result; + } + WebSocketSession(const std::string& hostname, Protocol protocolType, int portNumber) : WebSocketHandler(/* isClient = */ true, /* isMasking = */ true) , _host(hostname) @@ -181,9 +190,9 @@ public: for (;;) { // Drain the queue, first. - while (!_inQueue.isEmpty()) + while (!_inQueue.empty()) { - std::vector message = _inQueue.pop(); + std::vector message = pop(_inQueue); if (cb(message)) return message; } @@ -199,7 +208,7 @@ public: const std::chrono::milliseconds remaining = timeout - elapsed; _inCv.wait_for(lock, remaining / 20, [this]() - { return !_inQueue.isEmpty() || SigUtil::getShutdownRequestFlag(); }); + { return !_inQueue.empty() || SigUtil::getShutdownRequestFlag(); }); } LOG_DBG(context << "Giving up polling after " << sw.elapsed()); @@ -247,7 +256,7 @@ public: { { std::unique_lock lock(_outMutex); - _outQueue.put(std::vector(msg.data(), msg.data() + msg.size())); + _outQueue.emplace_back(std::vector(msg.data(), msg.data() + msg.size())); } const auto pollPtr = _socketPoll.lock(); @@ -325,7 +334,7 @@ private: LOG_TRC("Got message: " << COOLProtocol::getAbbreviatedMessage(data)); { std::unique_lock lock(_inMutex); - _inQueue.put(data); + _inQueue.emplace_back(data); } _inCv.notify_one(); @@ -345,7 +354,7 @@ private: int64_t& /*timeoutMaxMicroS*/) override { std::unique_lock lock(_outMutex); - if (!_outQueue.isEmpty() || _shutdown) // Graceful disconnection needs to send a frame. + if (!_outQueue.empty() || _shutdown) // Graceful disconnection needs to send a frame. return POLLIN | POLLOUT; return POLLIN; } @@ -360,9 +369,9 @@ private: try { // Drain the queue, for efficient communication. - while (capacity > wrote && !_outQueue.isEmpty()) + while (capacity > wrote && !_outQueue.empty()) { - std::vector item = _outQueue.get(); + std::vector item = pop(_outQueue); const auto size = item.size(); assert(size && "Zero-sized messages must never be queued for sending."); @@ -372,7 +381,7 @@ private: LOG_TRC("WebSocketSession: wrote " << size << ", total " << wrote << " bytes."); } - if (_shutdown && _outQueue.isEmpty()) + if (_shutdown && _outQueue.empty()) { sendCloseFrame(); } @@ -411,10 +420,10 @@ private: const std::string _port; const Protocol _protocol; Request _request; - MessageQueue _inQueue; //< The incoming message queue. + BufferQueue _inQueue; //< The incoming message queue. std::condition_variable _inCv; //< The incoming queue cond_var. std::mutex _inMutex; //< The incoming queue lock. - MessageQueue _outQueue; //< The outgoing message queue. + BufferQueue _outQueue; //< The outgoing message queue. std::mutex _outMutex; //< The outgoing queue lock. std::condition_variable _disconnectCv; //< Traps disconnections. std::mutex _disconnectMutex; //< The disconnection event lock. diff --git a/test/WhiteBoxTests.cpp b/test/WhiteBoxTests.cpp index 7439cc9bc3..48471b3701 100644 --- a/test/WhiteBoxTests.cpp +++ b/test/WhiteBoxTests.cpp @@ -20,7 +20,6 @@ #include #include #include -#include #include #include #include