Proxy websocket prototype.

Try to read/write avoiding a websocket.

Change-Id: I382039fa88f1030a63df1e47f687df2ee5a6055b
Reviewed-on: https://gerrit.libreoffice.org/c/online/+/92805
Tested-by: Jenkins CollaboraOffice <jenkinscollaboraoffice@gmail.com>
Reviewed-by: Jan Holesovsky <kendy@collabora.com>
distro/collabora/co-4-2-3
Michael Meeks 2020-03-04 13:54:04 +00:00 committed by Jan Holesovsky
parent 87eac2079b
commit bf0662bda1
9 changed files with 399 additions and 17 deletions

View File

@ -111,6 +111,7 @@ loolwsd_sources = common/Crypto.cpp \
wsd/AdminModel.cpp \
wsd/Auth.cpp \
wsd/DocumentBroker.cpp \
wsd/ProxyProtocol.cpp \
wsd/LOOLWSD.cpp \
wsd/ClientSession.cpp \
wsd/FileServer.cpp \
@ -217,6 +218,7 @@ wsd_headers = wsd/Admin.hpp \
wsd/Auth.hpp \
wsd/ClientSession.hpp \
wsd/DocumentBroker.hpp \
wsd/ProxyProtocol.hpp \
wsd/Exceptions.hpp \
wsd/FileServer.hpp \
wsd/LOOLWSD.hpp \

View File

@ -185,16 +185,97 @@
};
this.onopen = function() {
};
this.close = function() {
};
};
global.FakeWebSocket.prototype.close = function() {
};
global.FakeWebSocket.prototype.send = function(data) {
this.sendCounter++;
window.postMobileMessage(data);
};
global.proxySocketCounter = 0;
global.ProxySocket = function (uri) {
this.uri = uri;
this.binaryType = 'arraybuffer';
this.bufferedAmount = 0;
this.extensions = '';
this.protocol = '';
this.connected = true;
this.readyState = 0; // connecting
this.sessionId = 'fetchsession';
this.id = window.proxySocketCounter++;
this.sendCounter = 0;
this.readWaiting = false;
this.onclose = function() {
};
this.onerror = function() {
};
this.onmessage = function() {
};
this.send = function(msg) {
console.debug('send msg "' + msg + '"');
var req = new XMLHttpRequest();
req.open('POST', this.getEndPoint('write'));
req.setRequestHeader('SessionId', this.sessionId);
if (this.sessionId === 'fetchsession')
req.addEventListener('load', function() {
console.debug('got session: ' + this.responseText);
that.sessionId = this.responseText;
that.readyState = 1;
that.onopen();
});
req.send(msg);
},
this.close = function() {
console.debug('close socket');
this.readyState = 3;
this.onclose();
};
this.getEndPoint = function(type) {
var base = this.uri;
return base.replace(/^ws/, 'http') + '/' + type;
};
console.debug('New proxy socket ' + this.id + ' ' + this.uri);
// FIXME: perhaps a little risky.
this.send('fetchsession');
var that = this;
// horrors ...
this.readInterval = setInterval(function() {
if (this.readWaiting) // one at a time for now
return;
if (this.sessionId == 'fetchsession')
return; // waiting for our session id.
var req = new XMLHttpRequest();
// fetch session id:
req.addEventListener('load', function() {
console.debug('read: ' + this.responseText);
if (this.status == 200)
{
that.onmessage({ data: this.response });
}
else
{
console.debug('Handle error ' + this.status);
}
that.readWaiting = false;
});
req.open('GET', that.getEndPoint('read'));
req.setRequestHeader('SessionId', this.sessionId);
req.send(that.sessionId);
that.readWaiting = true;
}, 250);
};
global.createWebSocket = function(uri) {
if (global.socketProxy) {
return new global.ProxySocket(uri);
} else {
return new WebSocket(uri);
}
};
// If not debug, don't print anything on the console
// except in tile debug mode (Ctrl-Shift-Alt-d)
console.log2 = console.log;
@ -219,7 +300,8 @@
window.postMobileError(log);
} else if (global.socket && (global.socket instanceof WebSocket) && global.socket.readyState === 1) {
global.socket.send(log);
} else if (global.socket && (global.socket instanceof global.L.Socket) && global.socket.connected()) {
} else if (global.socket && global.L && global.L.Socket &&
(global.socket instanceof global.L.Socket) && global.socket.connected()) {
global.socket.sendMessage(log);
} else {
var req = new XMLHttpRequest();
@ -296,7 +378,7 @@
var websocketURI = global.host + global.serviceRoot + '/lool/' + encodeURIComponent(global.docURL + (docParams ? '?' + docParams : '')) + '/ws' + wopiSrc;
try {
global.socket = new WebSocket(websocketURI);
global.socket = global.createWebSocket(websocketURI);
} catch (err) {
console.log(err);
}

View File

@ -49,7 +49,7 @@ L.Socket = L.Class.extend({
}
try {
this.socket = new WebSocket(this.getWebSocketBaseURI(map) + wopiSrc);
this.socket = window.createWebSocket(this.getWebSocketBaseURI(map) + wopiSrc);
} catch (e) {
// On IE 11 there is a limitation on the number of WebSockets open to a single domain (6 by default and can go to 128).
// Detect this and hint the user.

View File

@ -445,6 +445,11 @@ public:
}
}
std::shared_ptr<ProtocolHandlerInterface> getProtocol() const
{
return _protocol;
}
/// Do we have something to send ?
virtual bool hasQueuedMessages() const = 0;
/// Please send them to me then.

View File

@ -37,9 +37,6 @@ public:
void construct();
virtual ~ClientSession();
/// Lookup any session by id.
static std::shared_ptr<ClientSession> getById(const std::string &id);
void setReadOnly() override;
enum SessionState {

View File

@ -140,6 +140,15 @@ public:
const bool isReadOnly,
const std::string& hostNoTrust);
/// Find or create a new client session for the PHP proxy
void handleProxyRequest(
const std::string& sessionId,
const std::string& id,
const Poco::URI& uriPublic,
const bool isReadOnly,
const std::string& hostNoTrust,
const std::shared_ptr<Socket> &moveSocket);
/// Thread safe termination of this broker if it has a lingering thread
void joinThread();

View File

@ -236,6 +236,9 @@ namespace
#if ENABLE_SUPPORT_KEY
inline void shutdownLimitReached(const std::shared_ptr<ProtocolHandlerInterface>& proto)
{
if (!proto)
return;
const std::string error = Poco::format(PAYLOAD_UNAVAILABLE_LIMIT_REACHED, LOOLWSD::MaxDocuments, LOOLWSD::MaxConnections);
LOG_INF("Sending client 'hardlimitreached' message: " << error);
@ -1833,9 +1836,12 @@ static std::shared_ptr<DocumentBroker>
if (docBroker->isMarkedToDestroy())
{
LOG_WRN("DocBroker with docKey [" << docKey << "] that is marked to be destroyed. Rejecting client request.");
std::string msg("error: cmd=load kind=docunloading");
proto->sendTextMessage(msg.data(), msg.size());
proto->shutdown(true, "error: cmd=load kind=docunloading");
if (proto)
{
std::string msg("error: cmd=load kind=docunloading");
proto->sendTextMessage(msg.data(), msg.size());
proto->shutdown(true, "error: cmd=load kind=docunloading");
}
return nullptr;
}
}
@ -1851,9 +1857,12 @@ static std::shared_ptr<DocumentBroker>
}
// Indicate to the client that we're connecting to the docbroker.
const std::string statusConnect = "statusindicator: connect";
LOG_TRC("Sending to Client [" << statusConnect << "].");
proto->sendTextMessage(statusConnect.data(), statusConnect.size());
if (proto)
{
const std::string statusConnect = "statusindicator: connect";
LOG_TRC("Sending to Client [" << statusConnect << "].");
proto->sendTextMessage(statusConnect.data(), statusConnect.size());
}
if (!docBroker)
{
@ -2325,6 +2334,13 @@ private:
// Util::dumpHex(std::cerr, "clipboard:\n", "", socket->getInBuffer()); // lots of data ...
handleClipboardRequest(request, message, disposition);
}
else if (request.has("ProxyPrefix") && reqPathTokens.count() > 2 &&
(reqPathTokens[reqPathTokens.count()-2] == "ws"))
{
std::string decodedUri; // WOPISrc
Poco::URI::decode(reqPathTokens[1], decodedUri);
handleClientProxyRequest(request, decodedUri, message, disposition);
}
else if (!(request.find("Upgrade") != request.end() && Poco::icompare(request["Upgrade"], "websocket") == 0) &&
reqPathTokens.count() > 0 && reqPathTokens[0] == "lool")
{
@ -2858,6 +2874,99 @@ private:
}
#endif
void handleClientProxyRequest(const Poco::Net::HTTPRequest& request,
std::string url,
Poco::MemoryInputStream& message,
SocketDisposition &disposition)
{
if (!request.has("SessionId"))
throw BadRequestException("No session id header on proxied request");
std::string sessionId = request.get("SessionId");
LOG_INF("URL [" << url << "].");
const auto uriPublic = DocumentBroker::sanitizeURI(url);
LOG_INF("URI [" << uriPublic.getPath() << "].");
const auto docKey = DocumentBroker::getDocKey(uriPublic);
LOG_INF("DocKey [" << docKey << "].");
const std::string fileId = Util::getFilenameFromURL(docKey);
Util::mapAnonymized(fileId, fileId); // Identity mapping, since fileId is already obfuscated
LOG_INF("Starting Proxy request handler for session [" << _id << "] on url [" << LOOLWSD::anonymizeUrl(url) << "].");
// Check if readonly session is required
bool isReadOnly = false;
for (const auto& param : uriPublic.getQueryParameters())
{
LOG_DBG("Query param: " << param.first << ", value: " << param.second);
if (param.first == "permission" && param.second == "readonly")
{
isReadOnly = true;
}
}
const std::string hostNoTrust = (LOOLWSD::ServerName.empty() ? request.getHost() : LOOLWSD::ServerName);
LOG_INF("URL [" << LOOLWSD::anonymizeUrl(url) << "] is " << (isReadOnly ? "readonly" : "writable") << ".");
(void)request; (void)message; (void)disposition;
std::shared_ptr<ProtocolHandlerInterface> none;
// Request a kit process for this doc.
std::shared_ptr<DocumentBroker> docBroker = findOrCreateDocBroker(
none, url, docKey, _id, uriPublic);
if (docBroker)
{
// need to move into the DocumentBroker context before doing session lookup / creation etc.
std::string id = _id;
disposition.setMove([docBroker, id, uriPublic, isReadOnly, hostNoTrust, sessionId]
(const std::shared_ptr<Socket> &moveSocket)
{
LOG_TRC("Setting up docbroker thread for " << docBroker->getDocKey());
// Make sure the thread is running before adding callback.
docBroker->startThread();
// We no longer own this socket.
moveSocket->setThreadOwner(std::thread::id());
docBroker->addCallback([docBroker, id, uriPublic, isReadOnly, hostNoTrust, sessionId, moveSocket]()
{
// Now inside the document broker thread ...
LOG_TRC("In the docbroker thread for " << docBroker->getDocKey());
auto streamSocket = std::static_pointer_cast<StreamSocket>(moveSocket);
try
{
docBroker->handleProxyRequest(
sessionId, id, uriPublic, isReadOnly,
hostNoTrust, moveSocket);
return;
}
catch (const UnauthorizedRequestException& exc)
{
LOG_ERR("Unauthorized Request while loading session for " << docBroker->getDocKey() << ": " << exc.what());
}
catch (const StorageConnectionException& exc)
{
LOG_ERR("Error while loading : " << exc.what());
}
catch (const std::exception& exc)
{
LOG_ERR("Error while loading : " << exc.what());
}
// badness occured:
std::ostringstream oss;
oss << "HTTP/1.1 400\r\n"
<< "Date: " << Util::getHttpTimeNow() << "\r\n"
<< "User-Agent: LOOLWSD WOPI Agent\r\n"
<< "Content-Length: 0\r\n"
<< "\r\n";
streamSocket->send(oss.str());
streamSocket->shutdown();
});
});
}
}
void handleClientWsUpgrade(const Poco::Net::HTTPRequest& request, const std::string& url,
SocketDisposition &disposition)
{
@ -2920,7 +3029,7 @@ private:
// Request a kit process for this doc.
std::shared_ptr<DocumentBroker> docBroker = findOrCreateDocBroker(
std::static_pointer_cast<ProtocolHandlerInterface>(ws), url, docKey, _id, uriPublic);
if (docBroker)
if (docBroker)
{
#if MOBILEAPP
const std::string hostNoTrust;

View File

@ -0,0 +1,81 @@
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4; fill-column: 100 -*- */
/*
* This file is part of the LibreOffice project.
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
#include <config.h>
#include "DocumentBroker.hpp"
#include "ClientSession.hpp"
#include "ProxyProtocol.hpp"
#include "Exceptions.hpp"
#include "LOOLWSD.hpp"
#include <Socket.hpp>
#include <atomic>
#include <cassert>
void DocumentBroker::handleProxyRequest(
const std::string& sessionId,
const std::string& id,
const Poco::URI& uriPublic,
const bool isReadOnly,
const std::string& hostNoTrust,
const std::shared_ptr<Socket> &socket)
{
std::shared_ptr<ClientSession> clientSession;
if (sessionId == "fetchsession")
{
LOG_TRC("Create session for " << _docKey);
clientSession = createNewClientSession(
std::make_shared<ProxyProtocolHandler>(),
id, uriPublic, isReadOnly, hostNoTrust);
addSession(clientSession);
LOOLWSD::checkDiskSpaceAndWarnClients(true);
LOOLWSD::checkSessionLimitsAndWarnClients();
}
else
{
LOG_TRC("Find session for " << _docKey << " with id " << sessionId);
for (const auto &it : _sessions)
{
if (it.second->getId() == sessionId)
{
clientSession = it.second;
break;
}
}
if (!clientSession)
{
LOG_ERR("Invalid session id used " << sessionId);
throw BadRequestException("invalid session id");
}
}
auto protocol = clientSession->getProtocol();
auto streamSocket = std::static_pointer_cast<StreamSocket>(socket);
streamSocket->setHandler(protocol);
// this DocumentBroker's poll handles reading & writing
addSocketToPoll(socket);
auto proxy = std::static_pointer_cast<ProxyProtocolHandler>(
protocol);
proxy->handleRequest(uriPublic.toString(), socket);
}
void ProxyProtocolHandler::handleRequest(const std::string &uriPublic,
const std::shared_ptr<Socket> &socket)
{
bool bRead = uriPublic.find("/write") == std::string::npos;
LOG_INF("Proxy handle request " << uriPublic << " type: " <<
(bRead ? "read" : "write"));
(void)socket;
}
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */

View File

@ -0,0 +1,97 @@
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4; fill-column: 100 -*- */
/*
* This file is part of the LibreOffice project.
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
#pragma once
#include <net/Socket.hpp>
/// Interface for building a websocket from this ...
class ProxyProtocolHandler : public ProtocolHandlerInterface
{
public:
ProxyProtocolHandler()
{
}
virtual ~ProxyProtocolHandler()
{
}
/// Will be called exactly once by setHandler
void onConnect(const std::shared_ptr<StreamSocket>& /* socket */) override
{
}
/// Called after successful socket reads.
void handleIncomingMessage(SocketDisposition &/* disposition */) override
{
assert("we get our data a different way" && false);
}
int getPollEvents(std::chrono::steady_clock::time_point /* now */,
int64_t &/* timeoutMaxMs */) override
{
// underlying buffer based polling is fine.
return POLLIN;
}
void checkTimeout(std::chrono::steady_clock::time_point /* now */) override
{
}
void performWrites() override
{
}
void onDisconnect() override
{
// connections & sockets come and go a lot.
}
public:
/// Clear all external references
void dispose() override { _msgHandler.reset(); }
int sendTextMessage(const char *msg, const size_t len, bool flush = false) const override
{
LOG_TRC("ProxyHack - send text msg " + std::string(msg, len));
(void) flush;
return len;
}
int sendBinaryMessage(const char *data, const size_t len, bool flush = false) const override
{
(void) data; (void) flush;
LOG_TRC("ProxyHack - send binary msg len " << len);
return len;
}
void shutdown(bool goingAway = false, const std::string &statusMessage = "") override
{
LOG_TRC("ProxyHack - shutdown " << goingAway << ": " << statusMessage);
}
void getIOStats(uint64_t &sent, uint64_t &recv) override
{
sent = recv = 0;
}
void dumpState(std::ostream& os) override
{
os << "proxy protocol\n";
}
void handleRequest(const std::string &uriPublic,
const std::shared_ptr<Socket> &socket);
private:
std::vector<std::weak_ptr<StreamSocket>> _sockets;
};
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */