Akonadi

server/connection.cpp
1/***************************************************************************
2 * SPDX-FileCopyrightText: 2006 Till Adam <adam@kde.org> *
3 * SPDX-FileCopyrightText: 2013 Volker Krause <vkrause@kde.org> *
4 * *
5 * SPDX-License-Identifier: LGPL-2.0-or-later *
6 ***************************************************************************/
7#include "connection.h"
8#include "akonadiserver_debug.h"
9
10#include <QEventLoop>
11#include <QSettings>
12#include <QThreadStorage>
13
14#include "handler.h"
15#include "notificationmanager.h"
16#include "storage/datastore.h"
17#include "storage/dbdeadlockcatcher.h"
18
19#include <cassert>
20
21#ifndef Q_OS_WIN
22#include <cxxabi.h>
23#endif
24
25#include "private/standarddirs_p.h"
26
27using namespace Akonadi;
28using namespace Akonadi::Server;
29
30#define IDLE_TIMER_TIMEOUT 180000 // 3 min
31
32static QString connectionIdentifier(Connection *c)
33{
34 const QString id = QString::asprintf("%p", static_cast<void *>(c));
35 return id;
36}
37
38Connection::Connection(AkonadiServer &akonadi)
39 : AkThread(connectionIdentifier(this), QThread::InheritPriority)
40 , m_akonadi(akonadi)
41{
42}
43
44Connection::Connection(quintptr socketDescriptor, AkonadiServer &akonadi)
45 : AkThread(connectionIdentifier(this), QThread::InheritPriority)
46 , m_akonadi(akonadi)
47{
48 m_socketDescriptor = socketDescriptor;
49 m_identifier = connectionIdentifier(this); // same as objectName()
50
51 const QSettings settings(Akonadi::StandardDirs::serverConfigFile(), QSettings::IniFormat);
52 m_verifyCacheOnRetrieval = settings.value(QStringLiteral("Cache/VerifyOnRetrieval"), m_verifyCacheOnRetrieval).toBool();
53}
54
55void Connection::init()
56{
57 AkThread::init();
58
59 auto socket = std::make_unique<QLocalSocket>();
60 if (!socket->setSocketDescriptor(m_socketDescriptor)) {
61 qCWarning(AKONADISERVER_LOG) << "Connection(" << m_identifier << ")::run: failed to set socket descriptor: " << socket->error() << "("
62 << socket->errorString() << ")";
63 return;
64 }
65
66 m_socket = std::move(socket);
67 connect(m_socket.get(), &QLocalSocket::disconnected, this, &Connection::slotSocketDisconnected);
68
69 m_idleTimer = std::make_unique<QTimer>();
70 connect(m_idleTimer.get(), &QTimer::timeout, this, &Connection::slotConnectionIdle);
71
72 storageBackend()->notificationCollector()->setConnection(this);
73
74 if (m_socket->state() == QLocalSocket::ConnectedState) {
75 QTimer::singleShot(0, this, &Connection::handleIncomingData);
76 } else {
77 connect(m_socket.get(), &QLocalSocket::connected, this, &Connection::handleIncomingData, Qt::QueuedConnection);
78 }
79
80 try {
81 slotSendHello();
82 } catch (const ProtocolException &e) {
83 qCWarning(AKONADISERVER_LOG) << "Protocol Exception sending \"hello\" on connection" << m_identifier << ":" << e.what();
84 m_socket->disconnectFromServer();
85 }
86}
87
88void Connection::quit()
89{
90 if (QThread::currentThread()->loopLevel() > 1) {
91 m_connectionClosing = true;
92 Q_EMIT connectionClosing();
93 return;
94 }
95
96 m_akonadi.tracer().endConnection(m_identifier, QString());
97
98 m_socket.reset();
99 m_idleTimer.reset();
100
101 AkThread::quit();
102}
103
104void Connection::slotSendHello()
105{
106 SchemaVersion version = SchemaVersion::retrieveAll().at(0);
107
108 Protocol::HelloResponse hello;
109 hello.setServerName(QStringLiteral("Akonadi"));
110 hello.setMessage(QStringLiteral("Not Really IMAP server"));
111 hello.setProtocolVersion(Protocol::version());
112 hello.setGeneration(version.generation());
113 sendResponse(0, std::move(hello));
114}
115
116DataStore *Connection::storageBackend()
117{
118 if (!m_backend) {
119 m_backend = DataStore::self();
120 }
121 return m_backend;
122}
123
124Connection::~Connection()
125{
126 quitThread();
127
128 if (m_reportTime) {
129 reportTime();
130 }
131}
132
133void Connection::slotConnectionIdle()
134{
135 Q_ASSERT(m_currentHandler == nullptr);
136 if (m_backend && m_backend->isOpened()) {
137 if (m_backend->inTransaction()) {
138 // This is a programming error, the timer should not have fired.
139 // But it is safer to abort and leave the connection open, until
140 // a later operation causes the idle timer to fire (than crash
141 // the akonadi server).
142 qCInfo(AKONADISERVER_LOG) << m_sessionId << "NOT Closing idle db connection; we are in transaction";
143 return;
144 }
145 m_backend->close();
146 }
147}
148
149void Connection::slotSocketDisconnected()
150{
151 // If we have active handler, wait for it to finish, then we emit the signal
152 // from slotNewDate()
153 if (m_currentHandler) {
154 return;
155 }
156
157 Q_EMIT disconnected();
158}
159
160void Connection::parseStream(const Protocol::CommandPtr &cmd)
161{
162 if (!m_currentHandler->parseStream()) {
163 try {
164 m_currentHandler->failureResponse("Error while handling a command");
165 } catch (...) {
166 m_connectionClosing = true;
167 }
168 qCWarning(AKONADISERVER_LOG) << "Error while handling command" << cmd->type() << "on connection" << m_identifier;
169 }
170}
171
172void Connection::handleIncomingData()
173{
174 for (;;) {
175 if (m_connectionClosing || !m_socket || m_socket->state() != QLocalSocket::ConnectedState) {
176 break;
177 }
178
179 // Blocks with event loop until some data arrive, allows us to still use QTimers
180 // and similar while waiting for some data to arrive
181 if (m_socket->bytesAvailable() < int(sizeof(qint64))) {
182 QEventLoop loop;
183 connect(m_socket.get(), &QLocalSocket::readyRead, &loop, &QEventLoop::quit);
184 connect(m_socket.get(), &QLocalSocket::stateChanged, &loop, &QEventLoop::quit);
185 connect(this, &Connection::connectionClosing, &loop, &QEventLoop::quit);
186 loop.exec();
187
188 // RAII fails to clean QT's slot/signal connections above, leaking memory. Manually disconnect.
189 disconnect(m_socket.get(), &QLocalSocket::readyRead, &loop, &QEventLoop::quit);
190 disconnect(m_socket.get(), &QLocalSocket::stateChanged, &loop, &QEventLoop::quit);
191 disconnect(this, &Connection::connectionClosing, &loop, &QEventLoop::quit);
192 }
193
194 if (m_connectionClosing || !m_socket || m_socket->state() != QLocalSocket::ConnectedState) {
195 break;
196 }
197
198 m_idleTimer->stop();
199
200 // will only open() a previously idle backend.
201 // Otherwise, a new backend could lazily be constructed by later calls.
202 if (!storageBackend()->isOpened()) {
203 m_backend->open();
204 }
205
207 while (m_socket->bytesAvailable() >= int(sizeof(qint64))) {
208 Protocol::DataStream stream(m_socket.get());
209 qint64 tag = -1;
210 stream >> tag;
211 // TODO: Check tag is incremental sequence
212
214 try {
215 cmd = Protocol::deserialize(m_socket.get());
216 } catch (const Akonadi::ProtocolException &e) {
217 qCWarning(AKONADISERVER_LOG) << "ProtocolException while deserializing incoming data on connection" << m_identifier << ":" << e.what();
218 setState(Server::LoggingOut);
219 return;
220 } catch (const std::exception &e) {
221 qCWarning(AKONADISERVER_LOG) << "Unknown exception while deserializing incoming data on connection" << m_identifier << ":" << e.what();
222 setState(Server::LoggingOut);
223 return;
224 }
225 if (cmd->type() == Protocol::Command::Invalid) {
226 qCWarning(AKONADISERVER_LOG) << "Received an invalid command on connection" << m_identifier << ": resetting connection";
227 setState(Server::LoggingOut);
228 return;
229 }
230
231 // Tag context and collection context is not persistent.
232 m_context.setTag(std::nullopt);
233 m_context.setCollection({});
234 if (m_akonadi.tracer().currentTracer() != QLatin1StringView("null")) {
235 m_akonadi.tracer().connectionInput(m_identifier, tag, cmd);
236 }
237
238 m_currentHandler = findHandlerForCommand(cmd->type());
239 if (!m_currentHandler) {
240 qCWarning(AKONADISERVER_LOG) << "Invalid command: no such handler for" << cmd->type() << "on connection" << m_identifier;
241 setState(Server::LoggingOut);
242 return;
243 }
244 if (m_reportTime) {
245 startTime();
246 }
247
248 m_currentHandler->setConnection(this);
249 m_currentHandler->setTag(tag);
250 m_currentHandler->setCommand(cmd);
251 try {
252 DbDeadlockCatcher catcher([this, &cmd]() {
253 parseStream(cmd);
254 });
255 } catch (const Akonadi::Server::HandlerException &e) {
256 if (m_currentHandler) {
257 try {
258 m_currentHandler->failureResponse(e.what());
259 } catch (...) {
260 m_connectionClosing = true;
261 }
262 qCWarning(AKONADISERVER_LOG) << "Handler exception when handling command" << cmd->type() << "on connection" << m_identifier << ":"
263 << e.what();
264 }
265 } catch (const Akonadi::Server::Exception &e) {
266 if (m_currentHandler) {
267 try {
268 m_currentHandler->failureResponse(QString::fromUtf8(e.type()) + QLatin1StringView(": ") + QString::fromUtf8(e.what()));
269 } catch (...) {
270 m_connectionClosing = true;
271 }
272 qCWarning(AKONADISERVER_LOG) << "General exception when handling command" << cmd->type() << "on connection" << m_identifier << ":"
273 << e.what();
274 }
275 } catch (const Akonadi::ProtocolException &e) {
276 // No point trying to send anything back to client, the connection is
277 // already messed up
278 qCWarning(AKONADISERVER_LOG) << "Protocol exception when handling command" << cmd->type() << "on connection" << m_identifier << ":" << e.what();
279 m_connectionClosing = true;
280#if defined(Q_OS_LINUX) && !defined(_LIBCPP_VERSION)
281 } catch (abi::__forced_unwind &) {
282 // HACK: NPTL throws __forced_unwind during thread cancellation and
283 // we *must* rethrow it otherwise the program aborts. Due to the issue
284 // described in #376385 we might end up destroying (cancelling) the
285 // thread from a nested loop executed inside parseStream() above,
286 // so the exception raised in there gets caught by this try..catch
287 // statement and it must be rethrown at all cost. Remove this hack
288 // once the root problem is fixed.
289 throw;
290#endif
291 } catch (...) {
292 qCCritical(AKONADISERVER_LOG) << "Unknown exception while handling command" << cmd->type() << "on connection" << m_identifier;
293 if (m_currentHandler) {
294 try {
295 m_currentHandler->failureResponse("Unknown exception caught");
296 } catch (...) {
297 m_connectionClosing = true;
298 }
299 }
300 }
301 if (m_reportTime) {
302 stopTime(currentCommand);
303 }
304 m_currentHandler.reset();
305
306 if (!m_socket || m_socket->state() != QLocalSocket::ConnectedState) {
307 Q_EMIT disconnected();
308 return;
309 }
310
311 if (m_connectionClosing) {
312 break;
313 }
314 }
315
316 // reset, arm the timer
317 m_idleTimer->start(IDLE_TIMER_TIMEOUT);
318
319 if (m_connectionClosing) {
320 break;
321 }
322 }
323
324 if (m_connectionClosing) {
325 m_socket->disconnect(this);
326 m_socket->close();
327 QTimer::singleShot(0, this, &Connection::quit);
328 }
329}
330
331const CommandContext &Connection::context() const
332{
333 return m_context;
334}
335
336void Connection::setContext(const CommandContext &context)
337{
338 m_context = context;
339}
340
341std::unique_ptr<Handler> Connection::findHandlerForCommand(Protocol::Command::Type command)
342{
343 auto handler = Handler::findHandlerForCommandAlwaysAllowed(command, m_akonadi);
344 if (handler) {
345 return handler;
346 }
347
348 switch (m_connectionState) {
349 case NonAuthenticated:
350 handler = Handler::findHandlerForCommandNonAuthenticated(command, m_akonadi);
351 break;
352 case Authenticated:
353 handler = Handler::findHandlerForCommandAuthenticated(command, m_akonadi);
354 break;
355 case LoggingOut:
356 break;
357 }
358
359 return handler;
360}
361
362qint64 Connection::currentTag() const
363{
364 return m_currentHandler->tag();
365}
366
367void Connection::setState(ConnectionState state)
368{
369 if (state == m_connectionState) {
370 return;
371 }
372 m_connectionState = state;
373 switch (m_connectionState) {
374 case NonAuthenticated:
375 assert(0); // can't happen, it's only the initial state, we can't go back to it
376 break;
377 case Authenticated:
378 break;
379 case LoggingOut:
380 m_socket->disconnectFromServer();
381 break;
382 }
383}
384
385void Connection::setSessionId(const QByteArray &id)
386{
387 m_identifier = QString::asprintf("%s (%p)", id.data(), static_cast<void *>(this));
388 m_akonadi.tracer().beginConnection(m_identifier, QString());
389 // m_streamParser->setTracerIdentifier(m_identifier);
390
391 m_sessionId = id;
393 // this races with the use of objectName() in QThreadPrivate::start
394 // thread()->setObjectName(objectName() + QStringLiteral("-Thread"));
395 storageBackend()->setSessionId(id);
396}
397
398QByteArray Connection::sessionId() const
399{
400 return m_sessionId;
401}
402
403bool Connection::isOwnerResource(const PimItem &item) const
404{
405 if (context().resource().isValid() && item.collection().resourceId() == context().resource().id()) {
406 return true;
407 }
408 // fallback for older resources
409 if (sessionId() == item.collection().resource().name().toUtf8()) {
410 return true;
411 }
412 return false;
413}
414
415bool Connection::isOwnerResource(const Collection &collection) const
416{
417 if (context().resource().isValid() && collection.resourceId() == context().resource().id()) {
418 return true;
419 }
420 if (sessionId() == collection.resource().name().toUtf8()) {
421 return true;
422 }
423 return false;
424}
425
427{
428 return m_verifyCacheOnRetrieval;
429}
430
431void Connection::startTime()
432{
433 m_time.start();
434}
435
436void Connection::stopTime(const QString &identifier)
437{
438 int elapsed = m_time.elapsed();
439 m_totalTime += elapsed;
440 m_totalTimeByHandler[identifier] += elapsed;
441 m_executionsByHandler[identifier]++;
442 qCDebug(AKONADISERVER_LOG) << identifier << " time : " << elapsed << " total: " << m_totalTime;
443}
444
445void Connection::reportTime() const
446{
447 qCDebug(AKONADISERVER_LOG) << "===== Time report for " << m_identifier << " =====";
448 qCDebug(AKONADISERVER_LOG) << " total: " << m_totalTime;
449 for (auto it = m_totalTimeByHandler.cbegin(), end = m_totalTimeByHandler.cend(); it != end; ++it) {
450 const QString &handler = it.key();
451 qCDebug(AKONADISERVER_LOG) << "handler : " << handler << " time: " << m_totalTimeByHandler.value(handler) << " executions "
452 << m_executionsByHandler.value(handler)
453 << " avg: " << m_totalTimeByHandler.value(handler) / m_executionsByHandler.value(handler);
454 }
455}
456
457void Connection::sendResponse(qint64 tag, const Protocol::CommandPtr &response)
458{
459 if (m_akonadi.tracer().currentTracer() != QLatin1StringView("null")) {
460 m_akonadi.tracer().connectionOutput(m_identifier, tag, response);
461 }
462 Protocol::DataStream stream(m_socket.get());
463 stream << tag;
464 Protocol::serialize(stream, response);
465 stream.flush();
466 if (!m_socket->waitForBytesWritten()) {
467 if (m_socket->state() == QLocalSocket::ConnectedState) {
468 throw ProtocolException("Server write timeout");
469 } else {
470 // The client has disconnected before we managed to send our response,
471 // which is not an error
472 }
473 }
474}
475
476Protocol::CommandPtr Connection::readCommand()
477{
478 while (m_socket->bytesAvailable() < static_cast<int>(sizeof(qint64))) {
479 Protocol::DataStream::waitForData(m_socket.get(), 30000); // 30 seconds, just in case client is busy
480 }
481
482 Protocol::DataStream stream(m_socket.get());
483 qint64 tag;
484 stream >> tag;
485
486 // TODO: compare tag with m_currentHandler->tag() ?
487 return Protocol::deserialize(m_socket.get());
488}
489
490#include "moc_connection.cpp"
Represents a collection of PIM items.
Definition collection.h:62
An Connection represents one connection of a client to the server.
Definition connection.h:39
bool verifyCacheOnRetrieval() const
Returns true if permanent cache verification is enabled.
bool isOwnerResource(const PimItem &item) const
Returns true if this connection belongs to the owning resource of item.
Connection(quintptr socketDescriptor, AkonadiServer &akonadi)
Use AkThread::create() to construct and start a new connection thread.
This class handles all the database access.
Definition datastore.h:95
void close()
Closes the database connection.
bool inTransaction() const
Returns true if there is a transaction in progress.
static DataStore * self()
Per thread singleton.
bool isOpened() const
Returns if the database is currently open.
Definition datastore.h:295
void setSessionId(const QByteArray &sessionId)
Sets the current session id.
Definition datastore.h:287
NotificationCollector * notificationCollector()
Returns the notification collector of this DataStore object.
virtual void open()
Opens the database connection.
This class catches DbDeadlockException (as emitted by QueryBuilder) and retries execution of the meth...
Base class for exception used internally by the Akonadi server.
static std::unique_ptr< Handler > findHandlerForCommandNonAuthenticated(Protocol::Command::Type cmd, AkonadiServer &akonadi)
Find a handler for a command that is allowed when the client is not yet authenticated,...
Definition handler.cpp:43
static std::unique_ptr< Handler > findHandlerForCommandAuthenticated(Protocol::Command::Type cmd, AkonadiServer &akonadi)
Find a handler for a command that is allowed when the client is authenticated, like LIST,...
Definition handler.cpp:62
static std::unique_ptr< Handler > findHandlerForCommandAlwaysAllowed(Protocol::Command::Type cmd, AkonadiServer &akonadi)
Find a handler for a command that is always allowed, like LOGOUT.
Definition handler.cpp:53
void setConnection(Connection *connection)
Sets the connection that is causing the changes.
void connectionInput(const QString &identifier, const QByteArray &msg) override
This method is called whenever the akonadi server retrieves some data from the outside.
Definition tracer.cpp:53
void beginConnection(const QString &identifier, const QString &msg) override
This method is called whenever a new data (imap) connection to the akonadi server is established.
Definition tracer.cpp:37
void endConnection(const QString &identifier, const QString &msg) override
This method is called whenever a data (imap) connection to akonadi server is closed.
Definition tracer.cpp:45
QString currentTracer() const
Returns the currently activated tracer type.
Definition tracer.cpp:138
Helper integration between Akonadi and Qt.
KDB_EXPORT KDbVersionInfo version()
qint64 elapsed() const const
int exec(ProcessEventsFlags flags)
void quit()
const_iterator cbegin() const const
const_iterator cend() const const
T value(const Key &key) const const
void readyRead()
void connected()
void disconnected()
void stateChanged(QLocalSocket::LocalSocketState socketState)
Q_EMITQ_EMIT
QMetaObject::Connection connect(const QObject *sender, PointerToMemberFunction signal, Functor functor)
bool disconnect(const QMetaObject::Connection &connection)
T qobject_cast(QObject *object)
void setObjectName(QAnyStringView name)
QVariant value(QAnyStringView key) const const
QString asprintf(const char *cformat,...)
QString fromLatin1(QByteArrayView str)
QString fromUtf8(QByteArrayView str)
QByteArray toUtf8() const const
QueuedConnection
QThread * currentThread()
void timeout()
bool toBool() const const
This file is part of the KDE documentation.
Documentation copyright © 1996-2024 The KDE developers.
Generated on Tue Mar 26 2024 11:13:38 by doxygen 1.10.0 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.