Akonadi

server/connection.cpp
1/***************************************************************************
2 * SPDX-FileCopyrightText: 2006 Till Adam <adam@kde.org> *
3 * SPDX-FileCopyrightText: 2013 Volker Krause <vkrause@kde.org> *
4 * *
5 * SPDX-License-Identifier: LGPL-2.0-or-later *
6 ***************************************************************************/
7#include "connection.h"
8#include "akonadiserver_debug.h"
9
10#include <QEventLoop>
11#include <QSettings>
12#include <QThreadStorage>
13
14#include "handler.h"
15#include "notificationmanager.h"
16#include "storage/datastore.h"
17#include "storage/dbdeadlockcatcher.h"
18
19#include <cassert>
20
21#ifndef Q_OS_WIN
22#include <cxxabi.h>
23#endif
24
25#include "private/standarddirs_p.h"
26
27using namespace Akonadi;
28using namespace Akonadi::Server;
29
30#define IDLE_TIMER_TIMEOUT 180000 // 3 min
31
32static QString connectionIdentifier(Connection *c)
33{
34 const QString id = QString::asprintf("%p", static_cast<void *>(c));
35 return id;
36}
37
38Connection::Connection(AkonadiServer &akonadi)
39 : AkThread(connectionIdentifier(this), QThread::InheritPriority)
40 , m_akonadi(akonadi)
41{
42}
43
44Connection::Connection(quintptr socketDescriptor, AkonadiServer &akonadi)
45 : AkThread(connectionIdentifier(this), QThread::InheritPriority)
46 , m_akonadi(akonadi)
47{
48 m_socketDescriptor = socketDescriptor;
49 m_identifier = connectionIdentifier(this); // same as objectName()
50
51 const QSettings settings(Akonadi::StandardDirs::serverConfigFile(), QSettings::IniFormat);
52 m_verifyCacheOnRetrieval = settings.value(QStringLiteral("Cache/VerifyOnRetrieval"), m_verifyCacheOnRetrieval).toBool();
53}
54
55void Connection::init()
56{
57 AkThread::init();
58
59 auto socket = std::make_unique<QLocalSocket>();
60 if (!socket->setSocketDescriptor(m_socketDescriptor)) {
61 qCWarning(AKONADISERVER_LOG) << "Connection(" << m_identifier << ")::run: failed to set socket descriptor: " << socket->error() << "("
62 << socket->errorString() << ")";
63 return;
64 }
65
66 m_socket = std::move(socket);
67 connect(m_socket.get(), &QLocalSocket::disconnected, this, &Connection::slotSocketDisconnected);
68
69 m_idleTimer = std::make_unique<QTimer>();
70 connect(m_idleTimer.get(), &QTimer::timeout, this, &Connection::slotConnectionIdle);
71
72 storageBackend()->notificationCollector()->setConnection(this);
73
74 if (m_socket->state() == QLocalSocket::ConnectedState) {
75 QTimer::singleShot(0, this, &Connection::handleIncomingData);
76 } else {
77 connect(m_socket.get(), &QLocalSocket::connected, this, &Connection::handleIncomingData, Qt::QueuedConnection);
78 }
79
80 try {
81 slotSendHello();
82 } catch (const ProtocolException &e) {
83 qCWarning(AKONADISERVER_LOG) << "Protocol Exception sending \"hello\" on connection" << m_identifier << ":" << e.what();
84 m_socket->disconnectFromServer();
85 }
86}
87
88void Connection::quit()
89{
90 if (QThread::currentThread()->loopLevel() > 1) {
91 m_connectionClosing = true;
92 Q_EMIT connectionClosing();
93 return;
94 }
95
96 m_akonadi.tracer().endConnection(m_identifier, QString());
97
98 m_socket.reset();
99 m_idleTimer.reset();
100
101 AkThread::quit();
102}
103
104void Connection::slotSendHello()
105{
106 SchemaVersion version = SchemaVersion::retrieveAll().at(0);
107
108 Protocol::HelloResponse hello;
109 hello.setServerName(QStringLiteral("Akonadi"));
110 hello.setMessage(QStringLiteral("Not Really IMAP server"));
111 hello.setProtocolVersion(Protocol::version());
112 hello.setGeneration(version.generation());
113 sendResponse(0, std::move(hello));
114}
115
116DataStore *Connection::storageBackend()
117{
118 if (!m_backend) {
119 m_backend = DataStore::self();
120 }
121 return m_backend;
122}
123
124Connection::~Connection()
125{
126 quitThread();
127
128 if (m_reportTime) {
129 reportTime();
130 }
131}
132
133void Connection::slotConnectionIdle()
134{
135 Q_ASSERT(m_currentHandler == nullptr);
136 if (m_backend && m_backend->isOpened()) {
137 if (m_backend->inTransaction()) {
138 // This is a programming error, the timer should not have fired.
139 // But it is safer to abort and leave the connection open, until
140 // a later operation causes the idle timer to fire (than crash
141 // the akonadi server).
142 qCInfo(AKONADISERVER_LOG) << m_sessionId << "NOT Closing idle db connection; we are in transaction";
143 return;
144 }
145 m_backend->close();
146 }
147}
148
149void Connection::slotSocketDisconnected()
150{
151 // If we have active handler, wait for it to finish, then we emit the signal
152 // from slotNewDate()
153 if (m_currentHandler) {
154 return;
155 }
156
157 Q_EMIT disconnected();
158}
159
160void Connection::parseStream(const Protocol::CommandPtr &cmd)
161{
162 if (!m_currentHandler->parseStream()) {
163 try {
164 m_currentHandler->failureResponse("Error while handling a command");
165 } catch (...) {
166 m_connectionClosing = true;
167 }
168 qCWarning(AKONADISERVER_LOG) << "Error while handling command" << cmd->type() << "on connection" << m_identifier;
169 }
170}
171
172void Connection::handleIncomingData()
173{
174 for (;;) {
175 if (m_connectionClosing || !m_socket || m_socket->state() != QLocalSocket::ConnectedState) {
176 break;
177 }
178
179 // Blocks with event loop until some data arrive, allows us to still use QTimers
180 // and similar while waiting for some data to arrive
181 if (m_socket->bytesAvailable() < int(sizeof(qint64))) {
182 QEventLoop loop;
183 connect(m_socket.get(), &QLocalSocket::readyRead, &loop, &QEventLoop::quit);
184 connect(m_socket.get(), &QLocalSocket::stateChanged, &loop, &QEventLoop::quit);
185 connect(this, &Connection::connectionClosing, &loop, &QEventLoop::quit);
186 loop.exec();
187
188 // RAII fails to clean QT's slot/signal connections above, leaking memory. Manually disconnect.
189 disconnect(m_socket.get(), &QLocalSocket::readyRead, &loop, &QEventLoop::quit);
190 disconnect(m_socket.get(), &QLocalSocket::stateChanged, &loop, &QEventLoop::quit);
191 disconnect(this, &Connection::connectionClosing, &loop, &QEventLoop::quit);
192 }
193
194 if (m_connectionClosing || !m_socket || m_socket->state() != QLocalSocket::ConnectedState) {
195 break;
196 }
197
198 m_idleTimer->stop();
199
200 // will only open() a previously idle backend.
201 // Otherwise, a new backend could lazily be constructed by later calls.
202 if (!storageBackend()->isOpened()) {
203 m_backend->open();
204 }
205
206 QString currentCommand;
207 while (m_socket->bytesAvailable() >= int(sizeof(qint64))) {
208 Protocol::DataStream stream(m_socket.get());
209 qint64 tag = -1;
210 stream >> tag;
211 // TODO: Check tag is incremental sequence
212
213 Protocol::CommandPtr cmd;
214 try {
215 cmd = Protocol::deserialize(m_socket.get());
216 } catch (const Akonadi::ProtocolException &e) {
217 qCWarning(AKONADISERVER_LOG) << "ProtocolException while deserializing incoming data on connection" << m_identifier << ":" << e.what();
218 setState(Server::LoggingOut);
219 return;
220 } catch (const std::exception &e) {
221 qCWarning(AKONADISERVER_LOG) << "Unknown exception while deserializing incoming data on connection" << m_identifier << ":" << e.what();
222 setState(Server::LoggingOut);
223 return;
224 }
225 if (cmd->type() == Protocol::Command::Invalid) {
226 qCWarning(AKONADISERVER_LOG) << "Received an invalid command on connection" << m_identifier << ": resetting connection";
227 setState(Server::LoggingOut);
228 return;
229 }
230
231 // Tag context and collection context is not persistent.
232 m_context.setTag(std::nullopt);
233 m_context.setCollection({});
234 if (m_akonadi.tracer().currentTracer() != QLatin1StringView("null")) {
235 m_akonadi.tracer().connectionInput(m_identifier, tag, cmd);
236 }
237
238 m_currentHandler = findHandlerForCommand(cmd->type());
239 if (!m_currentHandler) {
240 qCWarning(AKONADISERVER_LOG) << "Invalid command: no such handler for" << cmd->type() << "on connection" << m_identifier;
241 setState(Server::LoggingOut);
242 return;
243 }
244 if (m_reportTime) {
245 startTime();
246 }
247
248 m_currentHandler->setConnection(this);
249 m_currentHandler->setTag(tag);
250 m_currentHandler->setCommand(cmd);
251 try {
252 DbDeadlockCatcher catcher([this, &cmd]() {
253 parseStream(cmd);
254 });
255 } catch (const Akonadi::Server::HandlerException &e) {
256 if (m_currentHandler) {
257 try {
258 m_currentHandler->failureResponse(e.what());
259 } catch (...) {
260 m_connectionClosing = true;
261 }
262 qCWarning(AKONADISERVER_LOG) << "Handler exception when handling command" << cmd->type() << "on connection" << m_identifier << ":"
263 << e.what();
264 }
265 } catch (const Akonadi::Server::Exception &e) {
266 if (m_currentHandler) {
267 try {
268 m_currentHandler->failureResponse(QString::fromUtf8(e.type()) + QLatin1StringView(": ") + QString::fromUtf8(e.what()));
269 } catch (...) {
270 m_connectionClosing = true;
271 }
272 qCWarning(AKONADISERVER_LOG) << "General exception when handling command" << cmd->type() << "on connection" << m_identifier << ":"
273 << e.what();
274 }
275 } catch (const Akonadi::ProtocolException &e) {
276 // No point trying to send anything back to client, the connection is
277 // already messed up
278 qCWarning(AKONADISERVER_LOG) << "Protocol exception when handling command" << cmd->type() << "on connection" << m_identifier << ":" << e.what();
279 m_connectionClosing = true;
280#if defined(Q_OS_LINUX) && !defined(_LIBCPP_VERSION)
281 } catch (abi::__forced_unwind &) {
282 // HACK: NPTL throws __forced_unwind during thread cancellation and
283 // we *must* rethrow it otherwise the program aborts. Due to the issue
284 // described in #376385 we might end up destroying (cancelling) the
285 // thread from a nested loop executed inside parseStream() above,
286 // so the exception raised in there gets caught by this try..catch
287 // statement and it must be rethrown at all cost. Remove this hack
288 // once the root problem is fixed.
289 throw;
290#endif
291 } catch (...) {
292 qCCritical(AKONADISERVER_LOG) << "Unknown exception while handling command" << cmd->type() << "on connection" << m_identifier;
293 if (m_currentHandler) {
294 try {
295 m_currentHandler->failureResponse("Unknown exception caught");
296 } catch (...) {
297 m_connectionClosing = true;
298 }
299 }
300 }
301 if (m_reportTime) {
302 stopTime(currentCommand);
303 }
304 m_currentHandler.reset();
305
306 if (!m_socket || m_socket->state() != QLocalSocket::ConnectedState) {
307 Q_EMIT disconnected();
308 return;
309 }
310
311 if (m_connectionClosing) {
312 break;
313 }
314 }
315
316 // reset, arm the timer
317 m_idleTimer->start(IDLE_TIMER_TIMEOUT);
318
319 if (m_connectionClosing) {
320 break;
321 }
322 }
323
324 if (m_connectionClosing) {
325 m_socket->disconnect(this);
326 m_socket->close();
327 QTimer::singleShot(0, this, &Connection::quit);
328 }
329}
330
331const CommandContext &Connection::context() const
332{
333 return m_context;
334}
335
336void Connection::setContext(const CommandContext &context)
337{
338 m_context = context;
339}
340
341std::unique_ptr<Handler> Connection::findHandlerForCommand(Protocol::Command::Type command)
342{
343 auto handler = Handler::findHandlerForCommandAlwaysAllowed(command, m_akonadi);
344 if (handler) {
345 return handler;
346 }
347
348 switch (m_connectionState) {
349 case NonAuthenticated:
350 handler = Handler::findHandlerForCommandNonAuthenticated(command, m_akonadi);
351 break;
352 case Authenticated:
353 handler = Handler::findHandlerForCommandAuthenticated(command, m_akonadi);
354 break;
355 case LoggingOut:
356 break;
357 }
358
359 return handler;
360}
361
362qint64 Connection::currentTag() const
363{
364 return m_currentHandler->tag();
365}
366
367void Connection::setState(ConnectionState state)
368{
369 if (state == m_connectionState) {
370 return;
371 }
372 m_connectionState = state;
373 switch (m_connectionState) {
374 case NonAuthenticated:
375 assert(0); // can't happen, it's only the initial state, we can't go back to it
376 break;
377 case Authenticated:
378 break;
379 case LoggingOut:
380 m_socket->disconnectFromServer();
381 break;
382 }
383}
384
385void Connection::setSessionId(const QByteArray &id)
386{
387 m_identifier = QString::asprintf("%s (%p)", id.data(), static_cast<void *>(this));
388 m_akonadi.tracer().beginConnection(m_identifier, QString());
389 // m_streamParser->setTracerIdentifier(m_identifier);
390
391 m_sessionId = id;
393 // this races with the use of objectName() in QThreadPrivate::start
394 // thread()->setObjectName(objectName() + QStringLiteral("-Thread"));
395 storageBackend()->setSessionId(id);
396}
397
398QByteArray Connection::sessionId() const
399{
400 return m_sessionId;
401}
402
403bool Connection::isOwnerResource(const PimItem &item) const
404{
405 if (context().resource().isValid() && item.collection().resourceId() == context().resource().id()) {
406 return true;
407 }
408 // fallback for older resources
409 if (sessionId() == item.collection().resource().name().toUtf8()) {
410 return true;
411 }
412 return false;
413}
414
415bool Connection::isOwnerResource(const Collection &collection) const
416{
417 if (context().resource().isValid() && collection.resourceId() == context().resource().id()) {
418 return true;
419 }
420 if (sessionId() == collection.resource().name().toUtf8()) {
421 return true;
422 }
423 return false;
424}
425
427{
428 return m_verifyCacheOnRetrieval;
429}
430
431void Connection::startTime()
432{
433 m_time.start();
434}
435
436void Connection::stopTime(const QString &identifier)
437{
438 int elapsed = m_time.elapsed();
439 m_totalTime += elapsed;
440 m_totalTimeByHandler[identifier] += elapsed;
441 m_executionsByHandler[identifier]++;
442 qCDebug(AKONADISERVER_LOG) << identifier << " time : " << elapsed << " total: " << m_totalTime;
443}
444
445void Connection::reportTime() const
446{
447 qCDebug(AKONADISERVER_LOG) << "===== Time report for " << m_identifier << " =====";
448 qCDebug(AKONADISERVER_LOG) << " total: " << m_totalTime;
449 for (auto it = m_totalTimeByHandler.cbegin(), end = m_totalTimeByHandler.cend(); it != end; ++it) {
450 const QString &handler = it.key();
451 qCDebug(AKONADISERVER_LOG) << "handler : " << handler << " time: " << m_totalTimeByHandler.value(handler) << " executions "
452 << m_executionsByHandler.value(handler)
453 << " avg: " << m_totalTimeByHandler.value(handler) / m_executionsByHandler.value(handler);
454 }
455}
456
457void Connection::sendResponse(qint64 tag, const Protocol::CommandPtr &response)
458{
459 if (m_akonadi.tracer().currentTracer() != QLatin1StringView("null")) {
460 m_akonadi.tracer().connectionOutput(m_identifier, tag, response);
461 }
462 Protocol::DataStream stream(m_socket.get());
463 stream << tag;
464 Protocol::serialize(stream, response);
465 stream.flush();
466 if (!m_socket->waitForBytesWritten()) {
467 if (m_socket->state() == QLocalSocket::ConnectedState) {
468 throw ProtocolException("Server write timeout");
469 } else {
470 // The client has disconnected before we managed to send our response,
471 // which is not an error
472 }
473 }
474}
475
476Protocol::CommandPtr Connection::readCommand()
477{
478 while (m_socket->bytesAvailable() < static_cast<int>(sizeof(qint64))) {
479 Protocol::DataStream::waitForData(m_socket.get(), 30000); // 30 seconds, just in case client is busy
480 }
481
482 Protocol::DataStream stream(m_socket.get());
483 qint64 tag;
484 stream >> tag;
485
486 // TODO: compare tag with m_currentHandler->tag() ?
487 return Protocol::deserialize(m_socket.get());
488}
489
490#include "moc_connection.cpp"
Represents a collection of PIM items.
Definition collection.h:62
An Connection represents one connection of a client to the server.
Definition connection.h:39
bool verifyCacheOnRetrieval() const
Returns true if permanent cache verification is enabled.
bool isOwnerResource(const PimItem &item) const
Returns true if this connection belongs to the owning resource of item.
Connection(quintptr socketDescriptor, AkonadiServer &akonadi)
Use AkThread::create() to construct and start a new connection thread.
This class handles all the database access.
Definition datastore.h:95
static DataStore * self()
Per thread singleton.
NotificationCollector * notificationCollector()
Returns the notification collector of this DataStore object.
static std::unique_ptr< Handler > findHandlerForCommandNonAuthenticated(Protocol::Command::Type cmd, AkonadiServer &akonadi)
Find a handler for a command that is allowed when the client is not yet authenticated,...
Definition handler.cpp:40
static std::unique_ptr< Handler > findHandlerForCommandAuthenticated(Protocol::Command::Type cmd, AkonadiServer &akonadi)
Find a handler for a command that is allowed when the client is authenticated, like LIST,...
Definition handler.cpp:59
static std::unique_ptr< Handler > findHandlerForCommandAlwaysAllowed(Protocol::Command::Type cmd, AkonadiServer &akonadi)
Find a handler for a command that is always allowed, like LOGOUT.
Definition handler.cpp:50
void setConnection(Connection *connection)
Sets the connection that is causing the changes.
Helper integration between Akonadi and Qt.
NETWORKMANAGERQT_EXPORT QString version()
qint64 elapsed() const const
int exec(ProcessEventsFlags flags)
void quit()
const_iterator cbegin() const const
const_iterator cend() const const
T value(const Key &key) const const
void readyRead()
void connected()
void disconnected()
void stateChanged(QLocalSocket::LocalSocketState socketState)
Q_EMITQ_EMIT
QMetaObject::Connection connect(const QObject *sender, PointerToMemberFunction signal, Functor functor)
bool disconnect(const QMetaObject::Connection &connection)
void setObjectName(QAnyStringView name)
QVariant value(QAnyStringView key) const const
QString asprintf(const char *cformat,...)
const QChar at(qsizetype position) const const
QString fromLatin1(QByteArrayView str)
QString fromUtf8(QByteArrayView str)
QByteArray toUtf8() const const
QueuedConnection
QThread * currentThread()
void timeout()
bool toBool() const const
This file is part of the KDE documentation.
Documentation copyright © 1996-2025 The KDE developers.
Generated on Fri Jan 24 2025 11:49:57 by doxygen 1.13.2 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.