Split outbound callback processing from incoming message queueing.

Now we always send callbacks as soon as possible back to wsd from the
kit. This has several implications:

1. even when InputProcessing is disabled we will send outbound
   progress updates.
2. we should send callbacks much more quickly without waiting for
   other queue events to be processed eg. tilecombine:

We also drastically simplify storage of callbacks, avoiding lots of
re-parsing of the same strings, and allow much more efficient
comparison and merging at a small space cost in queue size.

Signed-off-by: Michael Meeks <michael.meeks@collabora.com>
Change-Id: Ia1ede5406767f895616a52775316ee6ab1c5db09
pull/8165/merge
Michael Meeks 2024-05-09 09:19:44 +01:00 committed by Caolán McNamara
parent 320606c225
commit 6f49f9398e
6 changed files with 284 additions and 274 deletions

View File

@ -1082,8 +1082,8 @@ void Document::trimAfterInactivity()
assert(descriptor && "Null callback data.");
assert(descriptor->getDoc() && "Null Document instance.");
std::shared_ptr<KitQueue> tileQueue = descriptor->getDoc()->_queue;
assert(tileQueue && "Null KitQueue.");
std::shared_ptr<KitQueue> queue = descriptor->getDoc()->_queue;
assert(queue && "Null KitQueue.");
const std::string payload = p ? p : "(nil)";
LOG_TRC("Document::ViewCallback [" << descriptor->getViewId() <<
@ -1104,7 +1104,7 @@ void Document::trimAfterInactivity()
int cursorWidth = std::stoi(tokens[2]);
int cursorHeight = std::stoi(tokens[3]);
tileQueue->updateCursorPosition(0, 0, cursorX, cursorY, cursorWidth, cursorHeight);
queue->updateCursorPosition(0, 0, cursorX, cursorY, cursorWidth, cursorHeight);
}
}
else if (type == LOK_CALLBACK_INVALIDATE_VISIBLE_CURSOR)
@ -1122,7 +1122,7 @@ void Document::trimAfterInactivity()
int cursorWidth = std::stoi(tokens[2]);
int cursorHeight = std::stoi(tokens[3]);
tileQueue->updateCursorPosition(0, 0, cursorX, cursorY, cursorWidth, cursorHeight);
queue->updateCursorPosition(0, 0, cursorX, cursorY, cursorWidth, cursorHeight);
}
}
else if (type == LOK_CALLBACK_INVALIDATE_VIEW_CURSOR ||
@ -1143,7 +1143,7 @@ void Document::trimAfterInactivity()
int cursorWidth = std::stoi(tokens[2]);
int cursorHeight = std::stoi(tokens[3]);
tileQueue->updateCursorPosition(std::stoi(targetViewId), std::stoi(part), cursorX, cursorY, cursorWidth, cursorHeight);
queue->updateCursorPosition(std::stoi(targetViewId), std::stoi(part), cursorX, cursorY, cursorWidth, cursorHeight);
}
}
else if (type == LOK_CALLBACK_DOCUMENT_PASSWORD_RESET)
@ -1189,11 +1189,11 @@ void Document::trimAfterInactivity()
// merge various callback types together if possible
if (type == LOK_CALLBACK_INVALIDATE_TILES)
{
// all views have to be in sync
tileQueue->put("callback all " + std::to_string(type) + ' ' + payload);
// all views have to be in sync; FIXME: calc an issue here ?
queue->putCallback(-1, type, payload);
}
else
tileQueue->put("callback " + std::to_string(descriptor->getViewId()) + ' ' + std::to_string(type) + ' ' + payload);
queue->putCallback(descriptor->getViewId(), type, payload);
LOG_TRC("Document::ViewCallback end.");
}
@ -2121,16 +2121,78 @@ bool Document::processInputEnabled() const
return enabled;
}
void Document::drainCallbacks()
{
KitQueue::Callback cb;
LOG_TRC("drainCallbacks with " << _queue->callbackSize() << " items");
while (_queue && _queue->getCallback(cb))
{
if (_stop || SigUtil::getTerminationFlag())
{
LOG_INF("_stop or TerminationFlag is set, breaking Document::drainCallbacks");
break;
}
LOG_TRC("Kit handling callback " << cb);
int viewId = cb._view;
bool broadcast = cb._view == -1;
const int type = cb._type;
const std::string &payload = cb._payload;
// Forward the callback to the same view, demultiplexing is done by the LibreOffice core.
bool isFound = false;
for (const auto& it : _sessions)
{
ChildSession& session = *it.second;
if (broadcast || (!broadcast && (session.getViewId() == viewId)))
{
if (!session.isCloseFrame())
{
isFound = true;
session.loKitCallback(type, payload);
}
else
{
LOG_ERR("Session-thread of session ["
<< session.getId() << "] for view [" << viewId
<< "] is not running. Dropping ["
<< lokCallbackTypeToString(type) << "] payload ["
<< payload << ']');
}
if (!broadcast)
break;
}
if (!isFound)
LOG_ERR("Document::ViewCallback. Session [" << viewId <<
"] is no longer active to process [" << lokCallbackTypeToString(type) <<
"] [" << payload << "] message to Master Session.");
}
}
if (_websocketHandler)
_websocketHandler->flush();
}
void Document::drainQueue()
{
try
{
std::vector<TileCombined> tileRequests;
if (hasCallbacks())
drainCallbacks();
if (hasQueueItems())
LOG_TRC("drainQueue with " << _queue->size() <<
" items: " << (processInputEnabled() ? "processing" : "blocked") );
// FIXME: do we really want to process all of these items ?
while (processInputEnabled() && hasQueueItems())
{
if (_stop || SigUtil::getTerminationFlag())
@ -2152,8 +2214,7 @@ void Document::drainQueue()
LOG_INF("Received EOF. Finishing.");
break;
}
if (tokens.equals(0, "tile"))
else if (tokens.equals(0, "tile"))
{
tileRequests.emplace_back(TileDesc::parse(tokens));
}
@ -2174,67 +2235,7 @@ void Document::drainQueue()
}
else if (tokens.equals(0, "callback"))
{
if (tokens.size() >= 3)
{
bool broadcast = false;
int viewId = -1;
const std::string& target = tokens[1];
if (target == "all")
{
broadcast = true;
}
else
{
viewId = std::stoi(target);
}
const int type = std::stoi(tokens[2]);
// payload is the rest of the message
const std::size_t offset = tokens[0].length() + tokens[1].length()
+ tokens[2].length() + 3; // + delims
const std::string payload(input.data() + offset, input.size() - offset);
// Forward the callback to the same view, demultiplexing is done by the LibreOffice core.
bool isFound = false;
for (const auto& it : _sessions)
{
ChildSession& session = *it.second;
if (broadcast || (!broadcast && (session.getViewId() == viewId)))
{
if (!session.isCloseFrame())
{
isFound = true;
session.loKitCallback(type, payload);
}
else
{
LOG_ERR("Session-thread of session ["
<< session.getId() << "] for view [" << viewId
<< "] is not running. Dropping ["
<< lokCallbackTypeToString(type) << "] payload ["
<< payload << ']');
}
if (!broadcast)
{
break;
}
}
}
if (!isFound)
{
LOG_ERR("Document::ViewCallback. Session [" << viewId <<
"] is no longer active to process [" << lokCallbackTypeToString(type) <<
"] [" << payload << "] message to Master Session.");
}
}
else
{
LOG_ERR("Invalid callback message: [" << COOLProtocol::getAbbreviatedMessage(input) << "].");
}
assert(false && "callbacks cannot now appear on the incoming queue");
}
else
{
@ -2681,7 +2682,9 @@ int KitSocketPoll::kitPoll(int timeoutMicroS)
do
{
int realTimeout = timeoutMicroS;
if (_document && _document->hasQueueItems() && _document->processInputEnabled())
if (_document &&
(_document->hasCallbacks() ||
(_document->hasQueueItems() && _document->processInputEnabled())))
realTimeout = 0;
if (poll(std::chrono::microseconds(realTimeout)) <= 0)

View File

@ -259,7 +259,7 @@ private:
/// Helper method to broadcast callback and its payload to all clients
void broadcastCallbackToClients(const int type, const std::string& payload)
{
_queue->put("callback all " + std::to_string(type) + ' ' + payload);
_queue->putCallback(-1, type, payload);
}
public:
@ -326,10 +326,12 @@ public:
/// A new message from wsd for the queue
void queueMessage(const std::string &msg) { _queue->put(msg); }
bool hasQueueItems() const { return _queue && !_queue->isEmpty(); }
bool hasCallbacks() const { return _queue && _queue->callbackSize() > 0; }
// poll is idle, are we ?
void checkIdle();
void drainQueue();
void drainCallbacks();
void dumpState(std::ostream& oss);

View File

@ -24,6 +24,14 @@
#include "Log.hpp"
#include <TileDesc.hpp>
/* static */ std::string KitQueue::Callback::toString(int view, int type, const std::string payload)
{
std::ostringstream str;
str << view << " " << lokCallbackTypeToString(type) << " "
<< COOLProtocol::getAbbreviatedMessage(payload);
return str.str();
}
void KitQueue::put(const Payload& value)
{
if (value.empty())
@ -55,14 +63,8 @@ void KitQueue::put(const Payload& value)
_queue.push_back(value);
}
else if (firstToken == "callback")
{
const std::string newMsg = removeCallbackDuplicate(std::string(value.data(), value.size()));
assert(false && "callbacks should not come from the client");
if (newMsg.empty())
_queue.push_back(value);
else
_queue.emplace_back(newMsg.data(), newMsg.data() + newMsg.size());
}
else if (tokens.equals(1, "textinput"))
{
const std::string newMsg = combineTextInput(tokens);
@ -112,14 +114,11 @@ void KitQueue::removeTileDuplicate(const std::string& tileMsg)
namespace {
/// Read the viewId from the tokens.
std::string extractViewId(const std::string& origMsg, const StringVector& tokens)
/// Read the viewId from the payload.
std::string extractViewId(const std::string& payload)
{
size_t nonJson = tokens[0].size() + tokens[1].size() + tokens[2].size() + 3; // including spaces
std::string jsonString(origMsg.data() + nonJson, origMsg.size() - nonJson);
Poco::JSON::Parser parser;
const Poco::Dynamic::Var result = parser.parse(jsonString);
const Poco::Dynamic::Var result = parser.parse(payload);
const auto& json = result.extract<Poco::JSON::Object::Ptr>();
return json->get("viewId").toString();
}
@ -137,19 +136,7 @@ std::string extractUnoCommand(const std::string& command)
return command;
}
bool containsUnoCommand(const std::string_view token, const std::string_view command)
{
if (!COOLProtocol::matchPrefix(".uno:", token))
return false;
size_t equalPos = token.find('=');
if (equalPos != std::string::npos)
return token.substr(0, equalPos) == command;
return token == command;
}
/// Extract rectangle from the invalidation callback
/// Extract rectangle from the invalidation callback payload
bool extractRectangle(const StringVector& tokens, int& x, int& y, int& w, int& h, int& part, int& mode)
{
x = 0;
@ -159,124 +146,70 @@ bool extractRectangle(const StringVector& tokens, int& x, int& y, int& w, int& h
part = 0;
mode = 0;
if (tokens.size() < 5)
if (tokens.size() < 2)
return false;
if (tokens.equals(3, "EMPTY,"))
if (tokens.equals(0, "EMPTY,"))
{
part = std::atoi(tokens[4].c_str());
part = std::atoi(tokens[0].c_str());
return true;
}
if (tokens.size() < 8)
if (tokens.size() < 5)
return false;
x = std::atoi(tokens[3].c_str());
y = std::atoi(tokens[4].c_str());
w = std::atoi(tokens[5].c_str());
h = std::atoi(tokens[6].c_str());
part = std::atoi(tokens[7].c_str());
x = std::atoi(tokens[0].c_str());
y = std::atoi(tokens[1].c_str());
w = std::atoi(tokens[2].c_str());
h = std::atoi(tokens[3].c_str());
part = std::atoi(tokens[4].c_str());
if (tokens.size() == 9)
mode = std::atoi(tokens[8].c_str());
if (tokens.size() == 6)
mode = std::atoi(tokens[5].c_str());
return true;
}
class isDuplicateCommand
{
private:
const std::string& m_unoCommand;
const StringVector& m_tokens;
bool m_is_duplicate_command;
public:
isDuplicateCommand(const std::string& unoCommand, const StringVector& tokens)
: m_unoCommand(unoCommand)
, m_tokens(tokens)
, m_is_duplicate_command(false)
{
}
bool get_is_duplicate_command() const
{
return m_is_duplicate_command;
}
void reset()
{
m_is_duplicate_command = false;
}
bool operator()(size_t nIndex, std::string_view token)
{
switch (nIndex)
{
case 0:
case 1:
case 2:
// returns true to end tokenization as one of first 3 token doesn't match
return token != m_tokens[nIndex];
case 3:
// callback, the same target, state changed; now check it's
// the same .uno: command
m_is_duplicate_command = containsUnoCommand(token, m_unoCommand);
// returns true to end tokenization as 4 is all we need
return true;
break;
}
return false;
};
};
}
std::string KitQueue::removeCallbackDuplicate(const std::string& callbackMsg)
void KitQueue::putCallback(int view, int type, const std::string &payload)
{
assert(COOLProtocol::matchPrefix("callback", callbackMsg, /*ignoreWhitespace*/ true));
if (!elideDuplicateCallback(view, type, payload))
_callbacks.emplace_back(view, type, payload);
}
StringVector tokens = StringVector::tokenize(callbackMsg);
bool KitQueue::elideDuplicateCallback(int view, int type, const std::string &payload)
{
const auto callbackType = static_cast<LibreOfficeKitCallbackType>(type);
if (tokens.size() < 3)
return std::string();
// the message is "callback <view> <id> ..."
const auto pair = Util::i32FromString(tokens[2]);
if (!pair.second)
return std::string();
const auto callbackType = static_cast<LibreOfficeKitCallbackType>(pair.first);
// Nothing to combine in this case:
if (_callbacks.size() == 0)
return false;
switch (callbackType)
{
case LOK_CALLBACK_INVALIDATE_TILES: // invalidation
{
int msgX, msgY, msgW, msgH, msgPart, msgMode;
StringVector tokens = StringVector::tokenize(payload);
int msgX, msgY, msgW, msgH, msgPart, msgMode;
if (!extractRectangle(tokens, msgX, msgY, msgW, msgH, msgPart, msgMode))
return std::string();
return false;
bool performedMerge = false;
// we always travel the entire queue
std::size_t i = 0;
while (i < _queue.size())
while (i < _callbacks.size())
{
auto& it = _queue[i];
auto& it = _callbacks[i];
StringVector queuedTokens = StringVector::tokenize(it.data(), it.size());
if (queuedTokens.size() < 3)
{
++i;
continue;
}
// not a invalidation callback
if (queuedTokens[0] != tokens[0] || queuedTokens[1] != tokens[1]
|| queuedTokens[2] != tokens[2])
if (it._type != type || it._view != view)
{
++i;
continue;
}
StringVector queuedTokens = StringVector::tokenize(it._payload);
int queuedX, queuedY, queuedW, queuedH, queuedPart, queuedMode;
@ -298,18 +231,17 @@ std::string KitQueue::removeCallbackDuplicate(const std::string& callbackMsg)
continue;
}
// the invalidation in the queue is fully covered by the message,
// the invalidation in the queue is fully covered by the payload,
// just remove it
if (msgX <= queuedX && queuedX + queuedW <= msgX + msgW && msgY <= queuedY
&& queuedY + queuedH <= msgY + msgH)
{
LOG_TRC("Removing smaller invalidation: "
<< std::string(it.data(), it.size()) << " -> " << tokens[0] << ' '
<< tokens[1] << ' ' << tokens[2] << ' ' << msgX << ' ' << msgY << ' '
<< it._payload << " -> " << ' ' << msgX << ' ' << msgY << ' '
<< msgW << ' ' << msgH << ' ' << msgPart << ' ' << msgMode);
// remove from the queue
_queue.erase(_queue.begin() + i);
_callbacks.erase(_callbacks.begin() + i);
continue;
}
@ -332,7 +264,7 @@ std::string KitQueue::removeCallbackDuplicate(const std::string& callbackMsg)
}
LOG_TRC("Merging invalidations: "
<< std::string(it.data(), it.size()) << " and "
<< Callback::toString(view, type, payload) << " and "
<< tokens[0] << ' ' << tokens[1] << ' ' << tokens[2] << ' '
<< msgX << ' ' << msgY << ' ' << msgW << ' ' << msgH << ' '
<< msgPart << ' ' << msgMode << " -> "
@ -347,7 +279,7 @@ std::string KitQueue::removeCallbackDuplicate(const std::string& callbackMsg)
performedMerge = true;
// remove from the queue
_queue.erase(_queue.begin() + i);
_callbacks.erase(_callbacks.begin() + i);
continue;
}
@ -356,55 +288,48 @@ std::string KitQueue::removeCallbackDuplicate(const std::string& callbackMsg)
if (performedMerge)
{
std::size_t pre = tokens[0].size() + tokens[1].size() + tokens[2].size() + 3;
std::size_t post = pre + tokens[3].size() + tokens[4].size() + tokens[5].size()
+ tokens[6].size() + 4;
std::string newPayload =
std::to_string(msgX) + ", " + std::to_string(msgY) + ", " +
std::to_string(msgW) + ", " + std::to_string(msgH) + ", " +
tokens.cat(' ', 4); // part etc. ...
std::string result = callbackMsg.substr(0, pre) + std::to_string(msgX) + ", "
+ std::to_string(msgY) + ", " + std::to_string(msgW) + ", "
+ std::to_string(msgH) + ", " + callbackMsg.substr(post);
LOG_TRC("Merge result: " << newPayload);
LOG_TRC("Merge result: " << result);
return result;
_callbacks.emplace_back(view, type, newPayload);
return true; // elide the original - use this instead
}
}
break;
case LOK_CALLBACK_STATE_CHANGED: // state changed
{
if (tokens.size() < 4)
return std::string();
std::string unoCommand = extractUnoCommand(tokens[3]);
std::string unoCommand = extractUnoCommand(payload);
if (unoCommand.empty())
return std::string();
return false;
// This is needed because otherwise it creates some problems when
// a save occurs while a cell is still edited in Calc.
if (unoCommand == ".uno:ModifiedStatus")
return std::string();
if (_queue.empty())
return std::string();
return false;
// remove obsolete states of the same .uno: command
isDuplicateCommand functor(unoCommand, tokens);
for (std::size_t i = 0; i < _queue.size(); ++i)
size_t unoCommandLen = unoCommand.size();
for (size_t i = 0; i < _callbacks.size(); ++i)
{
auto& it = _queue[i];
Callback& it = _callbacks[i];
if (it._type != type || it._view != view)
continue;
StringVector::tokenize_foreach(functor, it.data(), it.size());
size_t payloadLen = it._payload.size();
if (payloadLen < unoCommandLen + 1 ||
unoCommand.compare(0, unoCommandLen, it._payload) != 0 ||
it._payload[unoCommandLen] != '=')
continue;
if (functor.get_is_duplicate_command())
{
LOG_TRC("Remove obsolete uno command: "
<< std::string(it.data(), it.size()) << " -> "
<< COOLProtocol::getAbbreviatedMessage(callbackMsg));
_queue.erase(_queue.begin() + i);
break;
}
functor.reset();
LOG_TRC("Remove obsolete uno command: " << it << " -> "
<< Callback::toString(view, type, payload));
_callbacks.erase(_callbacks.begin() + i);
break;
}
}
break;
@ -418,50 +343,38 @@ std::string KitQueue::removeCallbackDuplicate(const std::string& callbackMsg)
case LOK_CALLBACK_CELL_VIEW_CURSOR: // the view cell cursor has moved
case LOK_CALLBACK_VIEW_CURSOR_VISIBLE: // the view cursor visibility has changed
{
const bool isViewCallback = (callbackType == LOK_CALLBACK_INVALIDATE_VIEW_CURSOR
|| callbackType == LOK_CALLBACK_CELL_VIEW_CURSOR
|| callbackType == LOK_CALLBACK_VIEW_CURSOR_VISIBLE);
const bool isViewCallback = (callbackType == LOK_CALLBACK_INVALIDATE_VIEW_CURSOR ||
callbackType == LOK_CALLBACK_CELL_VIEW_CURSOR ||
callbackType == LOK_CALLBACK_VIEW_CURSOR_VISIBLE);
const std::string viewId
= (isViewCallback ? extractViewId(callbackMsg, tokens) : std::string());
= (isViewCallback ? extractViewId(payload) : std::string());
for (std::size_t i = 0; i < _queue.size(); ++i)
for (std::size_t i = 0; i < _callbacks.size(); ++i)
{
const auto& it = _queue[i];
const auto& it = _callbacks[i];
// skip non-callbacks quickly
if (!COOLProtocol::matchPrefix("callback", it))
if (it._type != type || it._view != view)
continue;
StringVector queuedTokens = StringVector::tokenize(it.data(), it.size());
if (queuedTokens.size() < 3)
continue;
if (!isViewCallback
&& (queuedTokens.equals(1, tokens, 1) && queuedTokens.equals(2, tokens, 2)))
if (!isViewCallback)
{
LOG_TRC("Remove obsolete callback: "
<< std::string(it.data(), it.size()) << " -> "
<< COOLProtocol::getAbbreviatedMessage(callbackMsg));
_queue.erase(_queue.begin() + i);
LOG_TRC("Remove obsolete callback: " << it << " -> "
<< Callback::toString(view, type, payload));
_callbacks.erase(_callbacks.begin() + i);
break;
}
else if (isViewCallback
&& (queuedTokens.equals(1, tokens, 1)
&& queuedTokens.equals(2, tokens, 2)))
else if (isViewCallback)
{
// we additionally need to ensure that the payload is about
// the same viewid (otherwise we'd merge them all views into
// one)
const std::string queuedViewId
= extractViewId(std::string(it.data(), it.size()), queuedTokens);
const std::string queuedViewId = extractViewId(it._payload);
if (viewId == queuedViewId)
{
LOG_TRC("Remove obsolete view callback: "
<< std::string(it.data(), it.size()) << " -> "
<< COOLProtocol::getAbbreviatedMessage(callbackMsg));
_queue.erase(_queue.begin() + i);
LOG_TRC("Remove obsolete view callback: " << it << " -> "
<< Callback::toString(view, type, payload));
_callbacks.erase(_callbacks.begin() + i);
break;
}
}
@ -474,7 +387,8 @@ std::string KitQueue::removeCallbackDuplicate(const std::string& callbackMsg)
} // switch
return std::string();
// Append the new command to the callbacks list
return false;
}
int KitQueue::priority(const std::string& tileMsg)
@ -819,6 +733,11 @@ void KitQueue::dumpState(std::ostream& oss)
size_t i = 0;
for (Payload &it : _queue)
oss << "\t\t" << i++ << ": " << COOLProtocol::getFirstLine(it) << "\n";
oss << "\tCallbacks size: " << _callbacks.size() << "\n";
i = 0;
for (auto &it : _callbacks)
oss << "\t\t" << i++ << ": " << it << "\n";
}
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */

View File

@ -42,11 +42,47 @@ public:
put(Payload(value.data(), value.data() + value.size()));
}
struct Callback {
int _view; // -1 for all
int _type;
std::string _payload;
Callback() : _view(-1), _type(-1) { }
Callback(int view, int type, const std::string payload) :
_view(view), _type(type), _payload(payload) { }
static std::string toString(int view, int type, const std::string payload);
};
/// Queue a LibreOfficeKit callback for later emission
void putCallback(int view, int type, const std::string &message);
/// Work back over the queue to simplify & return false if we should not queue.
bool elideDuplicateCallback(int view, int type, const std::string &message);
/// Obtain the next message.
/// timeoutMs can be 0 to signify infinity.
/// Returns an empty payload on timeout.
Payload get();
/// Obtain the next callback
Callback getCallback()
{
assert(_callbacks.size() > 0);
const Callback front = _callbacks.front();
_callbacks.erase(_callbacks.begin());
return front;
}
bool getCallback(Callback &callback)
{
if (_callbacks.size() == 0)
return false;
callback = std::move(_callbacks.front());
_callbacks.erase(_callbacks.begin());
return true;
}
/// Get a message without waiting
Payload pop()
{
@ -66,10 +102,16 @@ public:
return _queue.size();
}
size_t callbackSize() const
{
return _callbacks.size();
}
/// Removal of all the pending messages.
void clear()
{
_queue.clear();
_callbacks.clear();
}
void dumpState(std::ostream& oss);
@ -148,6 +190,9 @@ private:
/// The incoming underlying queue
std::vector<Payload> _queue;
/// Outgoing queued callbacks
std::vector<Callback> _callbacks;
std::map<int, CursorPosition> _cursorPositions;
/// Check the views in the order of how the editing (cursor movement) has
@ -155,4 +200,10 @@ private:
std::vector<int> _viewOrder;
};
inline std::ostream& operator<<(std::ostream& os, const KitQueue::Callback &c)
{
os << KitQueue::Callback::toString(c._view, c._type, c._payload);
return os;
}
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */

View File

@ -700,6 +700,13 @@ public:
return true;
}
void flush()
{
std::shared_ptr<StreamSocket> socket = _socket.lock();
if (socket)
socket->flush();
}
protected:
#if !MOBILEAPP

View File

@ -470,33 +470,50 @@ void KitQueueTests::testInvalidateViewCursorDeduplication()
LOK_ASSERT_EQUAL(static_cast<size_t>(0), queue.size());
}
// back-compatible method from before putCallback implementation
void putCallback(KitQueue &queue, const std::string &str)
{
StringVector tokens = StringVector::tokenize(str);
assert(tokens[0] == "callback");
int view = std::atoi(tokens[1].c_str());
if (tokens[1] == "all")
view = -1;
int type = std::atoi(tokens[2].c_str());
queue.putCallback(view, type, tokens.cat(' ', 3));
}
void KitQueueTests::testCallbackInvalidation()
{
constexpr auto testname = __func__;
KitQueue queue;
KitQueue::Callback item;
// join tiles
queue.put("callback all 0 284, 1418, 11105, 275, 0");
queue.put("callback all 0 4299, 1418, 7090, 275, 0");
putCallback(queue, "callback all 0 284, 1418, 11105, 275, 0");
putCallback(queue, "callback all 0 4299, 1418, 7090, 275, 0");
LOK_ASSERT_EQUAL(1, static_cast<int>(queue.size()));
LOK_ASSERT_EQUAL(1, static_cast<int>(queue.callbackSize()));
LOK_ASSERT_EQUAL_STR("callback all 0 284, 1418, 11105, 275, 0", queue.get());
item = queue.getCallback();
LOK_ASSERT_EQUAL_STR("284, 1418, 11105, 275, 0", item._payload);
// invalidate everything with EMPTY, but keep the different part intact
queue.put("callback all 0 284, 1418, 11105, 275, 0");
queue.put("callback all 0 4299, 1418, 7090, 275, 1");
queue.put("callback all 0 4299, 10418, 7090, 275, 0");
queue.put("callback all 0 4299, 20418, 7090, 275, 0");
putCallback(queue, "callback all 0 284, 1418, 11105, 275, 0");
putCallback(queue, "callback all 0 4299, 1418, 7090, 275, 1");
putCallback(queue, "callback all 0 4299, 10418, 7090, 275, 0");
putCallback(queue, "callback all 0 4299, 20418, 7090, 275, 0");
LOK_ASSERT_EQUAL(4, static_cast<int>(queue.size()));
LOK_ASSERT_EQUAL(4, static_cast<int>(queue.callbackSize()));
queue.put("callback all 0 EMPTY, 0");
putCallback(queue, "callback all 0 EMPTY, 0");
LOK_ASSERT_EQUAL(2, static_cast<int>(queue.size()));
LOK_ASSERT_EQUAL_STR("callback all 0 4299, 1418, 7090, 275, 1", queue.get());
LOK_ASSERT_EQUAL_STR("callback all 0 EMPTY, 0", queue.get());
LOK_ASSERT_EQUAL(2, static_cast<int>(queue.callbackSize()));
item = queue.getCallback();
LOK_ASSERT_EQUAL_STR("4299, 1418, 7090, 275, 1", item._payload);
item = queue.getCallback();
LOK_ASSERT_EQUAL_STR("EMPTY, 0", item._payload);
}
void KitQueueTests::testCallbackIndicatorValue()
@ -504,13 +521,17 @@ void KitQueueTests::testCallbackIndicatorValue()
constexpr auto testname = __func__;
KitQueue queue;
KitQueue::Callback item;
// join tiles
queue.put("callback all 10 25");
queue.put("callback all 10 50");
putCallback(queue, "callback all 10 25");
putCallback(queue, "callback all 10 50");
LOK_ASSERT_EQUAL(1, static_cast<int>(queue.size()));
LOK_ASSERT_EQUAL_STR("callback all 10 50", queue.get());
LOK_ASSERT_EQUAL(1, static_cast<int>(queue.callbackSize()));
item = queue.getCallback();
LOK_ASSERT_EQUAL(item._view, -1);
LOK_ASSERT_EQUAL(item._type, 10);
LOK_ASSERT_EQUAL_STR("50", item._payload);
}
void KitQueueTests::testCallbackPageSize()
@ -518,13 +539,17 @@ void KitQueueTests::testCallbackPageSize()
constexpr auto testname = __func__;
KitQueue queue;
KitQueue::Callback item;
// join tiles
queue.put("callback all 13 12474, 188626");
queue.put("callback all 13 12474, 205748");
putCallback(queue, "callback all 13 12474, 188626");
putCallback(queue, "callback all 13 12474, 205748");
LOK_ASSERT_EQUAL(1, static_cast<int>(queue.size()));
LOK_ASSERT_EQUAL_STR("callback all 13 12474, 205748", queue.get());
LOK_ASSERT_EQUAL(1, static_cast<int>(queue.callbackSize()));
item = queue.getCallback();
LOK_ASSERT_EQUAL(item._view, -1);
LOK_ASSERT_EQUAL(item._type, 13);
LOK_ASSERT_EQUAL_STR("12474, 205748", item._payload);
}
void KitQueueTests::testCallbackModifiedStatusIsSkipped()
@ -532,6 +557,8 @@ void KitQueueTests::testCallbackModifiedStatusIsSkipped()
constexpr auto testname = __func__;
KitQueue queue;
KitQueue::Callback item;
std::stringstream ss;
ss << "callback all " << LOK_CALLBACK_STATE_CHANGED;
@ -545,15 +572,16 @@ void KitQueueTests::testCallbackModifiedStatusIsSkipped()
for (const auto& msg : messages)
{
queue.put(msg);
putCallback(queue, msg);
}
LOK_ASSERT_EQUAL(static_cast<size_t>(4), queue.size());
LOK_ASSERT_EQUAL(static_cast<size_t>(4), queue.callbackSize());
LOK_ASSERT_EQUAL_STR(messages[0], queue.get());
LOK_ASSERT_EQUAL_STR(messages[1], queue.get());
LOK_ASSERT_EQUAL_STR(messages[2], queue.get());
LOK_ASSERT_EQUAL_STR(messages[3], queue.get());
for (size_t i = 0; i < std::size(messages); i++)
{
item = queue.getCallback();
LOK_ASSERT_EQUAL_STR(messages[i].substr(ss.str().size() + 1), item._payload);
}
}
CPPUNIT_TEST_SUITE_REGISTRATION(KitQueueTests);