PrisonerPoll - leave unused Kit sockets in the poll.

This lets us detect when Kit processes die without waiting for
the poll() timeout and feebly spinning the PrisonerPoll loop.

Instead we get notified immediately; but to do this we then need
to be able to safely transfer the socket between SocketPolls.

SocketPoll's should own Sockets - so by switching ChildProcess to
use a weak_ptr and also the NewChildren list - we can have standard
ownership and a sensible transfer between SocketPolls. A Socket is
owned either by PrisonerPoll or a DocumentBroker in the normal way.

Clean the NewChildren list as/when children are unexpectedly killed
apparently there are still some ownership issues probably around
the strong ChildProcess _ws pointer.

Change-Id: Ie541a9d03e36aee53fd57c45953e0de21ebe1828
Signed-off-by: Michael Meeks <michael.meeks@collabora.com>
pull/8515/head
Michael Meeks 2024-03-20 17:52:04 +00:00
parent 31f1ce360a
commit 005ba1567e
6 changed files with 182 additions and 109 deletions

View File

@ -502,7 +502,12 @@ int SocketPoll::poll(int64_t timeoutMaxMicroS)
size_t i = _pollStartIndex;
for (std::size_t j = 0; j < size; ++j)
{
if (_pollFds[i].fd == _pollSockets[i]->getFD())
if (!_pollSockets[i])
{
// removed in a callback
toErase.push_back(i);
}
else if (_pollFds[i].fd == _pollSockets[i]->getFD())
{
SocketDisposition disposition(_pollSockets[i]);
try
@ -577,6 +582,55 @@ void SocketPoll::closeAllSockets()
assert(_newSockets.size() == 0);
}
void SocketPoll::takeSocket(const std::shared_ptr<SocketPoll> &fromPoll,
const std::shared_ptr<Socket> &inSocket)
{
std::mutex mut;
std::condition_variable cond;
bool transferred = false;
// Important we're not blocking the fromPoll thread.
ASSERT_CORRECT_THREAD();
// hold a reference during transfer
std::shared_ptr<Socket> socket = inSocket;
SocketPoll *toPoll = this;
fromPoll->addCallback([fromPoll,socket,&mut,&cond,&transferred,toPoll](){
auto it = std::find(fromPoll->_pollSockets.begin(),
fromPoll->_pollSockets.end(), socket);
if (it != fromPoll->_pollSockets.end())
{
// Erasing messes up the tracking of poll results in 'poll'
// leave to be added to toErase and cleaned later.
*it = nullptr;
}
else
LOG_WRN("Trying to move socket out of the wrong poll");
// sockets in transit are un-owned
socket->resetThreadOwner();
toPoll->insertNewSocket(socket);
LOG_TRC("Socket #" << socket->getFD() << " moved across polls");
// Let the caller know we've done our job.
std::unique_lock<std::mutex> lock(mut);
transferred = true;
cond.notify_all();
});
LOG_TRC("Waiting to transfer Socket #" << socket->getFD() <<
" from: " << fromPoll->name() << " to new poll: " << name());
std::unique_lock<std::mutex> lock(mut);
while (!transferred && continuePolling()) // in case of exit during transfer.
cond.wait_for(lock, std::chrono::milliseconds(50));
LOG_TRC("Transfer of Socket #" << socket->getFD() <<
" from: " << fromPoll->name() << " to new poll: " << name() << " complete");
}
void SocketPoll::removeSockets()
{
LOG_DBG("Removing all " << _pollSockets.size() + _newSockets.size()

View File

@ -738,6 +738,11 @@ public:
}
}
/// Takes socket from @fromPoll and moves it to this current
/// poll. Blocks until the transfer is complete.
void takeSocket(const std::shared_ptr<SocketPoll> &fromPoll,
const std::shared_ptr<Socket> &socket);
#if !MOBILEAPP
/// Inserts a new remote websocket to be polled.
/// NOTE: The DNS lookup is synchronous.

View File

@ -621,97 +621,6 @@ static size_t addNewChild(std::shared_ptr<ChildProcess> child)
return count;
}
#if MOBILEAPP
#ifndef IOS
std::mutex COOLWSD::lokit_main_mutex;
#endif
#endif
std::shared_ptr<ChildProcess> getNewChild_Blocks(unsigned mobileAppDocId)
{
(void)mobileAppDocId;
const auto startTime = std::chrono::steady_clock::now();
std::unique_lock<std::mutex> lock(NewChildrenMutex);
#if !MOBILEAPP
assert(mobileAppDocId == 0 && "Unexpected to have mobileAppDocId in the non-mobile build");
int numPreSpawn = COOLWSD::NumPreSpawnedChildren;
++numPreSpawn; // Replace the one we'll dispatch just now.
LOG_DBG("getNewChild: Rebalancing children to " << numPreSpawn);
if (rebalanceChildren(numPreSpawn) < 0)
{
LOG_DBG("getNewChild: rebalancing of children failed. Scheduling housekeeping to recover.");
COOLWSD::doHousekeeping();
// Let the caller retry after a while.
return nullptr;
}
const auto timeout = std::chrono::milliseconds(ChildSpawnTimeoutMs / 2);
LOG_TRC("Waiting for a new child for a max of " << timeout);
#else
const auto timeout = std::chrono::hours(100);
#ifdef IOS
assert(mobileAppDocId > 0 && "Unexpected to have no mobileAppDocId in the iOS build");
#endif
std::thread([&]
{
#ifndef IOS
std::lock_guard<std::mutex> lock(COOLWSD::lokit_main_mutex);
Util::setThreadName("lokit_main");
#else
Util::setThreadName("lokit_main_" + Util::encodeId(mobileAppDocId, 3));
#endif
// Ugly to have that static global COOLWSD::prisonerServerSocketFD, Otoh we know
// there is just one COOLWSD object. (Even in real Online.)
lokit_main(COOLWSD::prisonerServerSocketFD, COOLWSD::UserInterface, mobileAppDocId);
}).detach();
#endif
// FIXME: blocks ...
// Unfortunately we need to wait after spawning children to avoid bombing the system.
// If we fail fast and return, the next document will spawn more children without knowing
// there are some on the way already. And if the system is slow already, that wouldn't help.
LOG_TRC("Waiting for NewChildrenCV");
if (NewChildrenCV.wait_for(lock, timeout, []()
{
LOG_TRC("Predicate for NewChildrenCV wait: NewChildren.size()=" << NewChildren.size());
return !NewChildren.empty();
}))
{
LOG_TRC("NewChildrenCV wait successful");
std::shared_ptr<ChildProcess> child = NewChildren.back();
NewChildren.pop_back();
const size_t available = NewChildren.size();
// Validate before returning.
if (child && child->isAlive())
{
LOG_DBG("getNewChild: Have "
<< available << " spare " << (available == 1 ? "child" : "children")
<< " after popping [" << child->getPid() << "] to return in "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - startTime));
return child;
}
LOG_WRN("getNewChild: popped dead child, need to find another.");
}
else
{
LOG_TRC("NewChildrenCV wait failed");
LOG_WRN("getNewChild: No child available. Sending spawn request to forkit and failing.");
}
LOG_DBG("getNewChild: Timed out while waiting for new child.");
return nullptr;
}
#if !MOBILEAPP
namespace
@ -895,7 +804,105 @@ private:
/// This thread listens for and accepts prisoner kit processes.
/// And also cleans up and balances the correct number of children.
static std::unique_ptr<PrisonPoll> PrisonerPoll;
static std::shared_ptr<PrisonPoll> PrisonerPoll;
#if MOBILEAPP
#ifndef IOS
std::mutex COOLWSD::lokit_main_mutex;
#endif
#endif
std::shared_ptr<ChildProcess> getNewChild_Blocks(SocketPoll &destPoll, unsigned mobileAppDocId)
{
(void)mobileAppDocId;
const auto startTime = std::chrono::steady_clock::now();
std::unique_lock<std::mutex> lock(NewChildrenMutex);
#if !MOBILEAPP
assert(mobileAppDocId == 0 && "Unexpected to have mobileAppDocId in the non-mobile build");
int numPreSpawn = COOLWSD::NumPreSpawnedChildren;
++numPreSpawn; // Replace the one we'll dispatch just now.
LOG_DBG("getNewChild: Rebalancing children to " << numPreSpawn);
if (rebalanceChildren(numPreSpawn) < 0)
{
LOG_DBG("getNewChild: rebalancing of children failed. Scheduling housekeeping to recover.");
COOLWSD::doHousekeeping();
// Let the caller retry after a while.
return nullptr;
}
const auto timeout = std::chrono::milliseconds(ChildSpawnTimeoutMs / 2);
LOG_TRC("Waiting for a new child for a max of " << timeout);
#else // MOBILEAPP
const auto timeout = std::chrono::hours(100);
#ifdef IOS
assert(mobileAppDocId > 0 && "Unexpected to have no mobileAppDocId in the iOS build");
#endif
std::thread([&]
{
#ifndef IOS
std::lock_guard<std::mutex> lock(COOLWSD::lokit_main_mutex);
Util::setThreadName("lokit_main");
#else
Util::setThreadName("lokit_main_" + Util::encodeId(mobileAppDocId, 3));
#endif
// Ugly to have that static global COOLWSD::prisonerServerSocketFD, Otoh we know
// there is just one COOLWSD object. (Even in real Online.)
lokit_main(COOLWSD::prisonerServerSocketFD, COOLWSD::UserInterface, mobileAppDocId);
}).detach();
#endif // MOBILEAPP
// FIXME: blocks ...
// Unfortunately we need to wait after spawning children to avoid bombing the system.
// If we fail fast and return, the next document will spawn more children without knowing
// there are some on the way already. And if the system is slow already, that wouldn't help.
LOG_TRC("Waiting for NewChildrenCV");
if (NewChildrenCV.wait_for(lock, timeout, []()
{
LOG_TRC("Predicate for NewChildrenCV wait: NewChildren.size()=" << NewChildren.size());
return !NewChildren.empty();
}))
{
LOG_TRC("NewChildrenCV wait successful");
std::shared_ptr<ChildProcess> child = NewChildren.back();
NewChildren.pop_back();
const size_t available = NewChildren.size();
// Release early before moving sockets.
lock.unlock();
// Validate before returning.
if (child && child->isAlive())
{
LOG_DBG("getNewChild: Have "
<< available << " spare " << (available == 1 ? "child" : "children")
<< " after popping [" << child->getPid() << "] to return in "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - startTime));
// Change ownership now.
child->moveSocketFromTo(PrisonerPoll, destPoll);
return child;
}
LOG_WRN("getNewChild: popped dead child, need to find another.");
}
else
{
LOG_TRC("NewChildrenCV wait failed");
LOG_WRN("getNewChild: No child available. Sending spawn request to forkit and failing.");
}
LOG_DBG("getNewChild: Timed out while waiting for new child.");
return nullptr;
}
#ifdef __linux__
#if !MOBILEAPP
@ -3541,6 +3548,16 @@ private:
else if (!_associatedWithDoc && !SigUtil::getShutdownRequestFlag())
{
LOG_WRN("Unassociated Kit (" << _pid << ") disconnected unexpectedly");
std::unique_lock<std::mutex> lock(NewChildrenMutex);
auto it = std::find(NewChildren.begin(), NewChildren.end(), child);
if (it != NewChildren.end())
NewChildren.erase(it);
else
LOG_WRN("Unknown Kit process closed with pid " << child->getPid());
#if !MOBILEAPP
rebalanceChildren(COOLWSD::NumPreSpawnedChildren);
#endif
}
}
@ -3673,15 +3690,7 @@ private:
child->setSMapsFD(socket->getIncomingFD(SMAPS));
_childProcess = child; // weak
// Remove from prisoner poll since there is no activity
// until we attach the childProcess (with this socket)
// to a docBroker, which will do the polling.
disposition.setMove(
[this, child](const std::shared_ptr<Socket>&)
{
LOG_TRC("Calling addNewChild in disposition's move thing to add to NewChildren");
addNewChild(child);
});
addNewChild(child);
}
catch (const std::bad_weak_ptr&)
{

View File

@ -40,7 +40,7 @@ class DocumentBroker;
class ClipboardCache;
class FileServerRequestHandler;
std::shared_ptr<ChildProcess> getNewChild_Blocks(unsigned mobileAppDocId);
std::shared_ptr<ChildProcess> getNewChild_Blocks(SocketPoll &destPoll, unsigned mobileAppDocId);
// A WSProcess object in the WSD process represents a descendant process, either the direct child
// process ForKit or a grandchild Kit process, with which the WSD process communicates through a
@ -198,13 +198,13 @@ public:
protected:
std::shared_ptr<WebSocketHandler> getWSHandler() const { return _ws; }
std::shared_ptr<StreamSocket> getSocket() const { return _socket; };
std::shared_ptr<StreamSocket> getSocket() const { return _socket.lock(); };
private:
std::string _name;
std::atomic<pid_t> _pid; //< The process-id, which can be access from different threads.
std::shared_ptr<WebSocketHandler> _ws;
std::shared_ptr<StreamSocket> _socket;
std::shared_ptr<WebSocketHandler> _ws; // FIXME: should be weak ? ...
std::weak_ptr<StreamSocket> _socket;
};
#if !MOBILEAPP

View File

@ -92,8 +92,8 @@ void ChildProcess::setDocumentBroker(const std::shared_ptr<DocumentBroker>& docB
assert(docBroker && "Invalid DocumentBroker instance.");
_docBroker = docBroker;
// Add the prisoner socket to the docBroker poll.
docBroker->addSocketToPoll(getSocket());
// The prisoner socket is added in 'takeSocket'
// if URP is enabled, also add its socket to the poll
if (_urpFromKit)
docBroker->addSocketToPoll(_urpFromKit);
@ -254,7 +254,7 @@ void DocumentBroker::pollThread()
do
{
static constexpr std::chrono::milliseconds timeoutMs(COMMAND_TIMEOUT_MS * 5);
_childProcess = getNewChild_Blocks(_mobileAppDocId);
_childProcess = getNewChild_Blocks(*_poll, _mobileAppDocId);
if (_childProcess
|| std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - _threadStart)

View File

@ -159,6 +159,11 @@ public:
void setSMapsFD(int smapsFD) { _smapsFD = smapsFD;}
int getSMapsFD(){ return _smapsFD; }
void moveSocketFromTo(const std::shared_ptr<SocketPoll> &from, SocketPoll &to)
{
to.takeSocket(from, getSocket());
}
private:
const std::string _jailId;
std::weak_ptr<DocumentBroker> _docBroker;