MessageQueue cleanup, and avoid accidental use in tests.

The MessageQueue is not a generic message queue, it has much more
un-anticipated functionality; so don't use it where we don't need to.

In particular unexpected re-writing and merging of messages
during tests is probably not a great idea.

Signed-off-by: Michael Meeks <michael.meeks@collabora.com>
Change-Id: I657738307e611be18f5f83e11c055bf8a88826da
private/caolan/emplace_back
Michael Meeks 2024-05-08 15:56:56 +01:00 committed by Caolán McNamara
parent c1724983f7
commit 466c31d59a
6 changed files with 30 additions and 18 deletions

View File

@ -366,4 +366,7 @@ private:
std::vector<int> _viewOrder;
};
// Really this queue is used only in the Kit process
typedef TileQueue KitQueue;
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */

View File

@ -23,7 +23,6 @@
#include "Protocol.hpp"
#include "Log.hpp"
#include "MessageQueue.hpp"
#include "Message.hpp"
#include "TileCache.hpp"
#include "WebSocketHandler.hpp"

View File

@ -19,6 +19,8 @@
#include <common/StateEnum.hpp>
#include <common/Session.hpp>
#include <common/ThreadPool.hpp>
#include <common/MessageQueue.hpp>
#include <wsd/TileDesc.hpp>
#include "Socket.hpp"

View File

@ -18,6 +18,7 @@
#include <Log.hpp>
#include <Unit.hpp>
#include <UnitHTTP.hpp>
#include <MessageQueue.hpp>
#include <WebSocketSession.hpp>
#include <helpers.hpp>
#include <wsd/TileDesc.hpp>
@ -131,13 +132,12 @@ public:
TestResult testMessageQueueMerging()
{
MessageQueue queue;
KitQueue queue;
queue.put("child-foo textinput id=0 text=a");
queue.put("child-foo textinput id=0 text=b");
MessageQueue::Payload v;
v = queue.get();
auto v = queue.get();
if (!queue.isEmpty())
{
@ -201,7 +201,7 @@ public:
v = queue.get();
if (!queue.isEmpty())
{
LOG_ERR("MessageQueue contains more than was put into it");
LOG_ERR("KitQueue contains more than was put into it");
return TestResult::Failed;
}

View File

@ -19,7 +19,6 @@
#include <mutex>
#include <string>
#include <common/MessageQueue.hpp>
#include "NetUtil.hpp"
#include "SigUtil.hpp"
#include <net/Socket.hpp>
@ -36,6 +35,7 @@
namespace http
{
/// A client socket for asynchronous Web-Socket protocol.
class WebSocketSession final : public WebSocketHandler
{
@ -47,6 +47,15 @@ public:
};
private:
typedef std::vector<std::vector<char>> BufferQueue;
std::vector<char> pop(BufferQueue &queue)
{
auto result = queue.front();
queue.erase(queue.begin());
return result;
}
WebSocketSession(const std::string& hostname, Protocol protocolType, int portNumber)
: WebSocketHandler(/* isClient = */ true, /* isMasking = */ true)
, _host(hostname)
@ -181,9 +190,9 @@ public:
for (;;)
{
// Drain the queue, first.
while (!_inQueue.isEmpty())
while (!_inQueue.empty())
{
std::vector<char> message = _inQueue.pop();
std::vector<char> message = pop(_inQueue);
if (cb(message))
return message;
}
@ -199,7 +208,7 @@ public:
const std::chrono::milliseconds remaining = timeout - elapsed;
_inCv.wait_for(lock, remaining / 20,
[this]()
{ return !_inQueue.isEmpty() || SigUtil::getShutdownRequestFlag(); });
{ return !_inQueue.empty() || SigUtil::getShutdownRequestFlag(); });
}
LOG_DBG(context << "Giving up polling after " << sw.elapsed());
@ -247,7 +256,7 @@ public:
{
{
std::unique_lock<std::mutex> lock(_outMutex);
_outQueue.put(std::vector<char>(msg.data(), msg.data() + msg.size()));
_outQueue.emplace_back(std::vector<char>(msg.data(), msg.data() + msg.size()));
}
const auto pollPtr = _socketPoll.lock();
@ -325,7 +334,7 @@ private:
LOG_TRC("Got message: " << COOLProtocol::getAbbreviatedMessage(data));
{
std::unique_lock<std::mutex> lock(_inMutex);
_inQueue.put(data);
_inQueue.emplace_back(data);
}
_inCv.notify_one();
@ -345,7 +354,7 @@ private:
int64_t& /*timeoutMaxMicroS*/) override
{
std::unique_lock<std::mutex> lock(_outMutex);
if (!_outQueue.isEmpty() || _shutdown) // Graceful disconnection needs to send a frame.
if (!_outQueue.empty() || _shutdown) // Graceful disconnection needs to send a frame.
return POLLIN | POLLOUT;
return POLLIN;
}
@ -360,9 +369,9 @@ private:
try
{
// Drain the queue, for efficient communication.
while (capacity > wrote && !_outQueue.isEmpty())
while (capacity > wrote && !_outQueue.empty())
{
std::vector<char> item = _outQueue.get();
std::vector<char> item = pop(_outQueue);
const auto size = item.size();
assert(size && "Zero-sized messages must never be queued for sending.");
@ -372,7 +381,7 @@ private:
LOG_TRC("WebSocketSession: wrote " << size << ", total " << wrote << " bytes.");
}
if (_shutdown && _outQueue.isEmpty())
if (_shutdown && _outQueue.empty())
{
sendCloseFrame();
}
@ -411,10 +420,10 @@ private:
const std::string _port;
const Protocol _protocol;
Request _request;
MessageQueue _inQueue; //< The incoming message queue.
BufferQueue _inQueue; //< The incoming message queue.
std::condition_variable _inCv; //< The incoming queue cond_var.
std::mutex _inMutex; //< The incoming queue lock.
MessageQueue _outQueue; //< The outgoing message queue.
BufferQueue _outQueue; //< The outgoing message queue.
std::mutex _outMutex; //< The outgoing queue lock.
std::condition_variable _disconnectCv; //< Traps disconnections.
std::mutex _disconnectMutex; //< The disconnection event lock.

View File

@ -20,7 +20,6 @@
#include <Common.hpp>
#include <FileUtil.hpp>
#include <Kit.hpp>
#include <MessageQueue.hpp>
#include <Protocol.hpp>
#include <TileDesc.hpp>
#include <Util.hpp>