Akonadi

server/connection.cpp
1 /***************************************************************************
2  * SPDX-FileCopyrightText: 2006 Till Adam <[email protected]> *
3  * SPDX-FileCopyrightText: 2013 Volker Krause <[email protected]> *
4  * *
5  * SPDX-License-Identifier: LGPL-2.0-or-later *
6  ***************************************************************************/
7 #include "connection.h"
8 #include "akonadiserver_debug.h"
9 
10 #include <QEventLoop>
11 #include <QSettings>
12 #include <QThreadStorage>
13 
14 #include "handler.h"
15 #include "notificationmanager.h"
16 #include "storage/datastore.h"
17 #include "storage/dbdeadlockcatcher.h"
18 
19 #include <cassert>
20 
21 #ifndef Q_OS_WIN
22 #include <cxxabi.h>
23 #endif
24 
25 #include <private/standarddirs_p.h>
26 
27 using namespace Akonadi;
28 using namespace Akonadi::Server;
29 
30 #define IDLE_TIMER_TIMEOUT 180000 // 3 min
31 
32 static QString connectionIdentifier(Connection *c)
33 {
34  const QString id = QString::asprintf("%p", static_cast<void *>(c));
35  return id;
36 }
37 
38 Connection::Connection(AkonadiServer &akonadi)
39  : AkThread(connectionIdentifier(this), QThread::InheritPriority)
40  , m_akonadi(akonadi)
41 {
42 }
43 
44 Connection::Connection(quintptr socketDescriptor, AkonadiServer &akonadi)
45  : AkThread(connectionIdentifier(this), QThread::InheritPriority)
46  , m_akonadi(akonadi)
47 {
48  m_socketDescriptor = socketDescriptor;
49  m_identifier = connectionIdentifier(this); // same as objectName()
50 
51  const QSettings settings(Akonadi::StandardDirs::serverConfigFile(), QSettings::IniFormat);
52  m_verifyCacheOnRetrieval = settings.value(QStringLiteral("Cache/VerifyOnRetrieval"), m_verifyCacheOnRetrieval).toBool();
53 }
54 
55 void Connection::init()
56 {
57  AkThread::init();
58 
59  auto socket = std::make_unique<QLocalSocket>();
60  if (!socket->setSocketDescriptor(m_socketDescriptor)) {
61  qCWarning(AKONADISERVER_LOG) << "Connection(" << m_identifier << ")::run: failed to set socket descriptor: " << socket->error() << "("
62  << socket->errorString() << ")";
63  return;
64  }
65 
66  m_socket = std::move(socket);
67  connect(m_socket.get(), &QLocalSocket::disconnected, this, &Connection::slotSocketDisconnected);
68 
69  m_idleTimer = std::make_unique<QTimer>();
70  connect(m_idleTimer.get(), &QTimer::timeout, this, &Connection::slotConnectionIdle);
71 
72  storageBackend()->notificationCollector()->setConnection(this);
73 
74  if (m_socket->state() == QLocalSocket::ConnectedState) {
75  QTimer::singleShot(0, this, &Connection::handleIncomingData);
76  } else {
77  connect(m_socket.get(), &QLocalSocket::connected, this, &Connection::handleIncomingData, Qt::QueuedConnection);
78  }
79 
80  try {
81  slotSendHello();
82  } catch (const ProtocolException &e) {
83  qCWarning(AKONADISERVER_LOG) << "Protocol Exception sending \"hello\" on connection" << m_identifier << ":" << e.what();
84  m_socket->disconnectFromServer();
85  }
86 }
87 
88 void Connection::quit()
89 {
90  if (QThread::currentThread()->loopLevel() > 1) {
91  m_connectionClosing = true;
92  Q_EMIT connectionClosing();
93  return;
94  }
95 
96  m_akonadi.tracer().endConnection(m_identifier, QString());
97 
98  m_socket.reset();
99  m_idleTimer.reset();
100 
101  AkThread::quit();
102 }
103 
104 void Connection::slotSendHello()
105 {
106  SchemaVersion version = SchemaVersion::retrieveAll().at(0);
107 
108  Protocol::HelloResponse hello;
109  hello.setServerName(QStringLiteral("Akonadi"));
110  hello.setMessage(QStringLiteral("Not Really IMAP server"));
111  hello.setProtocolVersion(Protocol::version());
112  hello.setGeneration(version.generation());
113  sendResponse(0, std::move(hello));
114 }
115 
116 DataStore *Connection::storageBackend()
117 {
118  if (!m_backend) {
119  m_backend = DataStore::self();
120  }
121  return m_backend;
122 }
123 
124 Connection::~Connection()
125 {
126  quitThread();
127 
128  if (m_reportTime) {
129  reportTime();
130  }
131 }
132 
133 void Connection::slotConnectionIdle()
134 {
135  Q_ASSERT(m_currentHandler == nullptr);
136  if (m_backend && m_backend->isOpened()) {
137  if (m_backend->inTransaction()) {
138  // This is a programming error, the timer should not have fired.
139  // But it is safer to abort and leave the connection open, until
140  // a later operation causes the idle timer to fire (than crash
141  // the akonadi server).
142  qCInfo(AKONADISERVER_LOG) << m_sessionId << "NOT Closing idle db connection; we are in transaction";
143  return;
144  }
145  m_backend->close();
146  }
147 }
148 
149 void Connection::slotSocketDisconnected()
150 {
151  // If we have active handler, wait for it to finish, then we emit the signal
152  // from slotNewDate()
153  if (m_currentHandler) {
154  return;
155  }
156 
157  Q_EMIT disconnected();
158 }
159 
160 void Connection::parseStream(const Protocol::CommandPtr &cmd)
161 {
162  if (!m_currentHandler->parseStream()) {
163  try {
164  m_currentHandler->failureResponse("Error while handling a command");
165  } catch (...) {
166  m_connectionClosing = true;
167  }
168  qCWarning(AKONADISERVER_LOG) << "Error while handling command" << cmd->type() << "on connection" << m_identifier;
169  }
170 }
171 
172 void Connection::handleIncomingData()
173 {
174  for (;;) {
175  if (m_connectionClosing || !m_socket || m_socket->state() != QLocalSocket::ConnectedState) {
176  break;
177  }
178 
179  // Blocks with event loop until some data arrive, allows us to still use QTimers
180  // and similar while waiting for some data to arrive
181  if (m_socket->bytesAvailable() < int(sizeof(qint64))) {
182  QEventLoop loop;
183  connect(m_socket.get(), &QLocalSocket::readyRead, &loop, &QEventLoop::quit);
184  connect(m_socket.get(), &QLocalSocket::stateChanged, &loop, &QEventLoop::quit);
185  connect(this, &Connection::connectionClosing, &loop, &QEventLoop::quit);
186  loop.exec();
187 
188  // RAII fails to clean QT's slot/signal connections above, leaking memory. Manually disconnect.
189  disconnect(m_socket.get(), &QLocalSocket::readyRead, &loop, &QEventLoop::quit);
190  disconnect(m_socket.get(), &QLocalSocket::stateChanged, &loop, &QEventLoop::quit);
191  disconnect(this, &Connection::connectionClosing, &loop, &QEventLoop::quit);
192  }
193 
194  if (m_connectionClosing || !m_socket || m_socket->state() != QLocalSocket::ConnectedState) {
195  break;
196  }
197 
198  m_idleTimer->stop();
199 
200  // will only open() a previously idle backend.
201  // Otherwise, a new backend could lazily be constructed by later calls.
202  if (!storageBackend()->isOpened()) {
203  m_backend->open();
204  }
205 
206  QString currentCommand;
207  while (m_socket->bytesAvailable() >= int(sizeof(qint64))) {
208  Protocol::DataStream stream(m_socket.get());
209  qint64 tag = -1;
210  stream >> tag;
211  // TODO: Check tag is incremental sequence
212 
214  try {
215  cmd = Protocol::deserialize(m_socket.get());
216  } catch (const Akonadi::ProtocolException &e) {
217  qCWarning(AKONADISERVER_LOG) << "ProtocolException while deserializing incoming data on connection" << m_identifier << ":" << e.what();
218  setState(Server::LoggingOut);
219  return;
220  } catch (const std::exception &e) {
221  qCWarning(AKONADISERVER_LOG) << "Unknown exception while deserializing incoming data on connection" << m_identifier << ":" << e.what();
222  setState(Server::LoggingOut);
223  return;
224  }
225  if (cmd->type() == Protocol::Command::Invalid) {
226  qCWarning(AKONADISERVER_LOG) << "Received an invalid command on connection" << m_identifier << ": resetting connection";
227  setState(Server::LoggingOut);
228  return;
229  }
230 
231  // Tag context and collection context is not persistent.
232  m_context.setTag(std::nullopt);
233  m_context.setCollection({});
234  if (m_akonadi.tracer().currentTracer() != QLatin1String("null")) {
235  m_akonadi.tracer().connectionInput(m_identifier, tag, cmd);
236  }
237 
238  m_currentHandler = findHandlerForCommand(cmd->type());
239  if (!m_currentHandler) {
240  qCWarning(AKONADISERVER_LOG) << "Invalid command: no such handler for" << cmd->type() << "on connection" << m_identifier;
241  setState(Server::LoggingOut);
242  return;
243  }
244  if (m_reportTime) {
245  startTime();
246  }
247 
248  m_currentHandler->setConnection(this);
249  m_currentHandler->setTag(tag);
250  m_currentHandler->setCommand(cmd);
251  try {
252  DbDeadlockCatcher catcher([this, &cmd]() {
253  parseStream(cmd);
254  });
255  } catch (const Akonadi::Server::HandlerException &e) {
256  if (m_currentHandler) {
257  try {
258  m_currentHandler->failureResponse(e.what());
259  } catch (...) {
260  m_connectionClosing = true;
261  }
262  qCWarning(AKONADISERVER_LOG) << "Handler exception when handling command" << cmd->type() << "on connection" << m_identifier << ":"
263  << e.what();
264  }
265  } catch (const Akonadi::Server::Exception &e) {
266  if (m_currentHandler) {
267  try {
268  m_currentHandler->failureResponse(QString::fromUtf8(e.type()) + QLatin1String(": ") + QString::fromUtf8(e.what()));
269  } catch (...) {
270  m_connectionClosing = true;
271  }
272  qCWarning(AKONADISERVER_LOG) << "General exception when handling command" << cmd->type() << "on connection" << m_identifier << ":"
273  << e.what();
274  }
275  } catch (const Akonadi::ProtocolException &e) {
276  // No point trying to send anything back to client, the connection is
277  // already messed up
278  qCWarning(AKONADISERVER_LOG) << "Protocol exception when handling command" << cmd->type() << "on connection" << m_identifier << ":" << e.what();
279  m_connectionClosing = true;
280 #if defined(Q_OS_LINUX) && !defined(_LIBCPP_VERSION)
281  } catch (abi::__forced_unwind &) {
282  // HACK: NPTL throws __forced_unwind during thread cancellation and
283  // we *must* rethrow it otherwise the program aborts. Due to the issue
284  // described in #376385 we might end up destroying (cancelling) the
285  // thread from a nested loop executed inside parseStream() above,
286  // so the exception raised in there gets caught by this try..catch
287  // statement and it must be rethrown at all cost. Remove this hack
288  // once the root problem is fixed.
289  throw;
290 #endif
291  } catch (...) {
292  qCCritical(AKONADISERVER_LOG) << "Unknown exception while handling command" << cmd->type() << "on connection" << m_identifier;
293  if (m_currentHandler) {
294  try {
295  m_currentHandler->failureResponse("Unknown exception caught");
296  } catch (...) {
297  m_connectionClosing = true;
298  }
299  }
300  }
301  if (m_reportTime) {
302  stopTime(currentCommand);
303  }
304  m_currentHandler.reset();
305 
306  if (!m_socket || m_socket->state() != QLocalSocket::ConnectedState) {
307  Q_EMIT disconnected();
308  return;
309  }
310 
311  if (m_connectionClosing) {
312  break;
313  }
314  }
315 
316  // reset, arm the timer
317  m_idleTimer->start(IDLE_TIMER_TIMEOUT);
318 
319  if (m_connectionClosing) {
320  break;
321  }
322  }
323 
324  if (m_connectionClosing) {
325  m_socket->disconnect(this);
326  m_socket->close();
327  QTimer::singleShot(0, this, &Connection::quit);
328  }
329 }
330 
331 const CommandContext &Connection::context() const
332 {
333  return m_context;
334 }
335 
336 void Connection::setContext(const CommandContext &context)
337 {
338  m_context = context;
339 }
340 
341 std::unique_ptr<Handler> Connection::findHandlerForCommand(Protocol::Command::Type command)
342 {
343  auto handler = Handler::findHandlerForCommandAlwaysAllowed(command, m_akonadi);
344  if (handler) {
345  return handler;
346  }
347 
348  switch (m_connectionState) {
349  case NonAuthenticated:
350  handler = Handler::findHandlerForCommandNonAuthenticated(command, m_akonadi);
351  break;
352  case Authenticated:
353  handler = Handler::findHandlerForCommandAuthenticated(command, m_akonadi);
354  break;
355  case LoggingOut:
356  break;
357  }
358 
359  return handler;
360 }
361 
362 qint64 Connection::currentTag() const
363 {
364  return m_currentHandler->tag();
365 }
366 
367 void Connection::setState(ConnectionState state)
368 {
369  if (state == m_connectionState) {
370  return;
371  }
372  m_connectionState = state;
373  switch (m_connectionState) {
374  case NonAuthenticated:
375  assert(0); // can't happen, it's only the initial state, we can't go back to it
376  break;
377  case Authenticated:
378  break;
379  case LoggingOut:
380  m_socket->disconnectFromServer();
381  break;
382  }
383 }
384 
385 void Connection::setSessionId(const QByteArray &id)
386 {
387  m_identifier = QString::asprintf("%s (%p)", id.data(), static_cast<void *>(this));
388  m_akonadi.tracer().beginConnection(m_identifier, QString());
389  // m_streamParser->setTracerIdentifier(m_identifier);
390 
391  m_sessionId = id;
392  setObjectName(QString::fromLatin1(id));
393  // this races with the use of objectName() in QThreadPrivate::start
394  // thread()->setObjectName(objectName() + QStringLiteral("-Thread"));
395  storageBackend()->setSessionId(id);
396 }
397 
398 QByteArray Connection::sessionId() const
399 {
400  return m_sessionId;
401 }
402 
403 bool Connection::isOwnerResource(const PimItem &item) const
404 {
405  if (context().resource().isValid() && item.collection().resourceId() == context().resource().id()) {
406  return true;
407  }
408  // fallback for older resources
409  if (sessionId() == item.collection().resource().name().toUtf8()) {
410  return true;
411  }
412  return false;
413 }
414 
415 bool Connection::isOwnerResource(const Collection &collection) const
416 {
417  if (context().resource().isValid() && collection.resourceId() == context().resource().id()) {
418  return true;
419  }
420  if (sessionId() == collection.resource().name().toUtf8()) {
421  return true;
422  }
423  return false;
424 }
425 
426 bool Connection::verifyCacheOnRetrieval() const
427 {
428  return m_verifyCacheOnRetrieval;
429 }
430 
431 void Connection::startTime()
432 {
433  m_time.start();
434 }
435 
436 void Connection::stopTime(const QString &identifier)
437 {
438  int elapsed = m_time.elapsed();
439  m_totalTime += elapsed;
440  m_totalTimeByHandler[identifier] += elapsed;
441  m_executionsByHandler[identifier]++;
442  qCDebug(AKONADISERVER_LOG) << identifier << " time : " << elapsed << " total: " << m_totalTime;
443 }
444 
445 void Connection::reportTime() const
446 {
447  qCDebug(AKONADISERVER_LOG) << "===== Time report for " << m_identifier << " =====";
448  qCDebug(AKONADISERVER_LOG) << " total: " << m_totalTime;
449  for (auto it = m_totalTimeByHandler.cbegin(), end = m_totalTimeByHandler.cend(); it != end; ++it) {
450  const QString &handler = it.key();
451  qCDebug(AKONADISERVER_LOG) << "handler : " << handler << " time: " << m_totalTimeByHandler.value(handler) << " executions "
452  << m_executionsByHandler.value(handler)
453  << " avg: " << m_totalTimeByHandler.value(handler) / m_executionsByHandler.value(handler);
454  }
455 }
456 
457 void Connection::sendResponse(qint64 tag, const Protocol::CommandPtr &response)
458 {
459  if (m_akonadi.tracer().currentTracer() != QLatin1String("null")) {
460  m_akonadi.tracer().connectionOutput(m_identifier, tag, response);
461  }
462  Protocol::DataStream stream(m_socket.get());
463  stream << tag;
464  Protocol::serialize(stream, response);
465  stream.flush();
466  if (!m_socket->waitForBytesWritten()) {
467  if (m_socket->state() == QLocalSocket::ConnectedState) {
468  throw ProtocolException("Server write timeout");
469  } else {
470  // The client has disconnected before we managed to send our response,
471  // which is not an error
472  }
473  }
474 }
475 
476 Protocol::CommandPtr Connection::readCommand()
477 {
478  while (m_socket->bytesAvailable() < static_cast<int>(sizeof(qint64))) {
479  Protocol::DataStream::waitForData(m_socket.get(), 30000); // 30 seconds, just in case client is busy
480  }
481 
482  Protocol::DataStream stream(m_socket.get());
483  qint64 tag;
484  stream >> tag;
485 
486  // TODO: compare tag with m_currentHandler->tag() ?
487  return Protocol::deserialize(m_socket.get());
488 }
This class handles all the database access.
Definition: datastore.h:94
QString fromUtf8(const char *str, int size)
Represents a collection of PIM items.
Definition: collection.h:61
void connected()
int exec(QEventLoop::ProcessEventsFlags flags)
Base class for exception used internally by the Akonadi server.
void quit()
void timeout()
void disconnected()
QByteArray toUtf8() const const
void readyRead()
QThread * currentThread()
QueuedConnection
void stateChanged(QLocalSocket::LocalSocketState socketState)
unsigned int version()
QString fromLatin1(const char *str, int size)
QString asprintf(const char *cformat,...)
An Connection represents one connection of a client to the server.
Definition: connection.h:46
This class catches DbDeadlockException (as emitted by QueryBuilder) and retries execution of the meth...
Helper integration between Akonadi and Qt.
This file is part of the KDE documentation.
Documentation copyright © 1996-2022 The KDE developers.
Generated on Sat Jun 25 2022 06:00:31 by doxygen 1.8.17 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.