Akonadi

server/connection.cpp
1 /***************************************************************************
2  * Copyright (C) 2006 by Till Adam <[email protected]> *
3  * Copyright (C) 2013 by Volker Krause <[email protected]> *
4  * *
5  * This program is free software; you can redistribute it and/or modify *
6  * it under the terms of the GNU Library General Public License as *
7  * published by the Free Software Foundation; either version 2 of the *
8  * License, or (at your option) any later version. *
9  * *
10  * This program is distributed in the hope that it will be useful, *
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13  * GNU General Public License for more details. *
14  * *
15  * You should have received a copy of the GNU Library General Public *
16  * License along with this program; if not, write to the *
17  * Free Software Foundation, Inc., *
18  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. *
19  ***************************************************************************/
20 #include "connection.h"
21 #include "akonadiserver_debug.h"
22 
23 
24 #include <QSettings>
25 #include <QEventLoop>
26 #include <QThreadStorage>
27 
28 #include "storage/datastore.h"
29 #include "storage/dbdeadlockcatcher.h"
30 #include "handler.h"
31 #include "notificationmanager.h"
32 
33 #include "tracer.h"
34 
35 #include <cassert>
36 
37 #ifndef Q_OS_WIN
38 #include <cxxabi.h>
39 #endif
40 
41 
42 #include <private/protocol_p.h>
43 #include <private/datastream_p_p.h>
44 #include <private/standarddirs_p.h>
45 
46 using namespace Akonadi;
47 using namespace Akonadi::Server;
48 
49 #define IDLE_TIMER_TIMEOUT 180000 // 3 min
50 
51 static QString connectionIdentifier(Connection *c) {
52  const QString id = QString::asprintf("%p", static_cast<void *>(c));
53  return id;
54 }
55 
56 Connection::Connection(AkonadiServer &akonadi)
57  : AkThread(connectionIdentifier(this), QThread::InheritPriority)
58  , m_akonadi(akonadi)
59 {
60 }
61 
62 Connection::Connection(quintptr socketDescriptor, AkonadiServer &akonadi)
63  : AkThread(connectionIdentifier(this), QThread::InheritPriority)
64  , m_akonadi(akonadi)
65 {
66  m_socketDescriptor = socketDescriptor;
67  m_identifier = connectionIdentifier(this); // same as objectName()
68 
69  const QSettings settings(Akonadi::StandardDirs::serverConfigFile(), QSettings::IniFormat);
70  m_verifyCacheOnRetrieval = settings.value(QStringLiteral("Cache/VerifyOnRetrieval"), m_verifyCacheOnRetrieval).toBool();
71 }
72 
73 void Connection::init()
74 {
75  AkThread::init();
76 
77  auto socket = std::make_unique<QLocalSocket>();
78  if (!socket->setSocketDescriptor(m_socketDescriptor)) {
79  qCWarning(AKONADISERVER_LOG) << "Connection(" << m_identifier
80  << ")::run: failed to set socket descriptor: "
81  << socket->error()
82  << "(" << socket->errorString() << ")";
83  return;
84  }
85 
86  m_socket = std::move(socket);
87  connect(m_socket.get(), &QLocalSocket::disconnected, this, &Connection::slotSocketDisconnected);
88 
89  m_idleTimer = std::make_unique<QTimer>();
90  connect(m_idleTimer.get(), &QTimer::timeout, this, &Connection::slotConnectionIdle);
91 
92  storageBackend()->notificationCollector()->setConnection(this);
93 
94  if (m_socket->state() == QLocalSocket::ConnectedState) {
95  QTimer::singleShot(0, this, &Connection::handleIncomingData);
96  } else {
97  connect(m_socket.get(), &QLocalSocket::connected, this, &Connection::handleIncomingData,
98  Qt::QueuedConnection);
99  }
100 
101  try {
102  slotSendHello();
103  } catch (const ProtocolException &e) {
104  qCWarning(AKONADISERVER_LOG) << "Protocol Exception sending \"hello\" on connection" << m_identifier << ":" << e.what();
105  m_socket->disconnectFromServer();
106  }
107 }
108 
109 void Connection::quit()
110 {
111  if (QThread::currentThread()->loopLevel() > 1) {
112  m_connectionClosing = true;
113  Q_EMIT connectionClosing();
114  return;
115  }
116 
117  m_akonadi.tracer().endConnection(m_identifier, QString());
118 
119  m_socket.reset();
120  m_idleTimer.reset();
121 
122  AkThread::quit();
123 }
124 
125 void Connection::slotSendHello()
126 {
127  SchemaVersion version = SchemaVersion::retrieveAll().at(0);
128 
129  Protocol::HelloResponse hello;
130  hello.setServerName(QStringLiteral("Akonadi"));
131  hello.setMessage(QStringLiteral("Not Really IMAP server"));
132  hello.setProtocolVersion(Protocol::version());
133  hello.setGeneration(version.generation());
134  sendResponse(0, std::move(hello));
135 }
136 
137 DataStore *Connection::storageBackend()
138 {
139  if (!m_backend) {
140  m_backend = DataStore::self();
141  }
142  return m_backend;
143 }
144 
145 Connection::~Connection()
146 {
147  quitThread();
148 
149  if (m_reportTime) {
150  reportTime();
151  }
152 }
153 
154 void Connection::slotConnectionIdle()
155 {
156  Q_ASSERT(m_currentHandler == nullptr);
157  if (m_backend && m_backend->isOpened()) {
158  if (m_backend->inTransaction()) {
159  // This is a programming error, the timer should not have fired.
160  // But it is safer to abort and leave the connection open, until
161  // a later operation causes the idle timer to fire (than crash
162  // the akonadi server).
163  qCInfo(AKONADISERVER_LOG) << m_sessionId << "NOT Closing idle db connection; we are in transaction";
164  return;
165  }
166  m_backend->close();
167  }
168 }
169 
170 void Connection::slotSocketDisconnected()
171 {
172  // If we have active handler, wait for it to finish, then we emit the signal
173  // from slotNewDate()
174  if (m_currentHandler) {
175  return;
176  }
177 
178  Q_EMIT disconnected();
179 }
180 
181 void Connection::parseStream(const Protocol::CommandPtr &cmd)
182 {
183  if (!m_currentHandler->parseStream()) {
184  try {
185  m_currentHandler->failureResponse("Error while handling a command");
186  } catch (...) {
187  m_connectionClosing = true;
188  }
189  qCWarning(AKONADISERVER_LOG) << "Error while handling command" << cmd->type()
190  << "on connection" << m_identifier;
191  }
192 }
193 
194 void Connection::handleIncomingData()
195 {
196  Q_FOREVER {
197 
198  if (m_connectionClosing || !m_socket || m_socket->state() != QLocalSocket::ConnectedState) {
199  break;
200  }
201 
202  // Blocks with event loop until some data arrive, allows us to still use QTimers
203  // and similar while waiting for some data to arrive
204  if (m_socket->bytesAvailable() < int(sizeof(qint64))) {
205  QEventLoop loop;
206  connect(m_socket.get(), &QLocalSocket::readyRead, &loop, &QEventLoop::quit);
207  connect(m_socket.get(), &QLocalSocket::stateChanged, &loop, &QEventLoop::quit);
208  connect(this, &Connection::connectionClosing, &loop, &QEventLoop::quit);
209  loop.exec();
210  }
211 
212  if (m_connectionClosing || !m_socket || m_socket->state() != QLocalSocket::ConnectedState) {
213  break;
214  }
215 
216  m_idleTimer->stop();
217 
218  // will only open() a previously idle backend.
219  // Otherwise, a new backend could lazily be constructed by later calls.
220  if (!storageBackend()->isOpened()) {
221  m_backend->open();
222  }
223 
224  QString currentCommand;
225  while (m_socket->bytesAvailable() >= int(sizeof(qint64))) {
226  Protocol::DataStream stream(m_socket.get());
227  qint64 tag = -1;
228  stream >> tag;
229  // TODO: Check tag is incremental sequence
230 
232  try {
233  cmd = Protocol::deserialize(m_socket.get());
234  } catch (const Akonadi::ProtocolException &e) {
235  qCWarning(AKONADISERVER_LOG) << "ProtocolException while deserializing incoming data on connection"
236  << m_identifier << ":" << e.what();
237  setState(Server::LoggingOut);
238  return;
239  } catch (const std::exception &e) {
240  qCWarning(AKONADISERVER_LOG) << "Unknown exception while deserializing incoming data on connection"
241  << m_identifier << ":" << e.what();
242  setState(Server::LoggingOut);
243  return;
244  }
245  if (cmd->type() == Protocol::Command::Invalid) {
246  qCWarning(AKONADISERVER_LOG) << "Received an invalid command on connection" << m_identifier
247  << ": resetting connection";
248  setState(Server::LoggingOut);
249  return;
250  }
251 
252  // Tag context and collection context is not persistent.
253  m_context.setTag(std::nullopt);
254  m_context.setCollection({});
255  if (m_akonadi.tracer().currentTracer() != QLatin1String("null")) {
256  m_akonadi.tracer().connectionInput(m_identifier, tag, cmd);
257  }
258 
259  m_currentHandler = findHandlerForCommand(cmd->type());
260  if (!m_currentHandler) {
261  qCWarning(AKONADISERVER_LOG) << "Invalid command: no such handler for" << cmd->type()
262  << "on connection" << m_identifier;
263  setState(Server::LoggingOut);
264  return;
265  }
266  if (m_reportTime) {
267  startTime();
268  }
269 
270  m_currentHandler->setConnection(this);
271  m_currentHandler->setTag(tag);
272  m_currentHandler->setCommand(cmd);
273  try {
274  DbDeadlockCatcher catcher([this, &cmd]() { parseStream(cmd); });
275  } catch (const Akonadi::Server::HandlerException &e) {
276  if (m_currentHandler) {
277  try {
278  m_currentHandler->failureResponse(e.what());
279  } catch (...) {
280  m_connectionClosing = true;
281  }
282  qCWarning(AKONADISERVER_LOG) << "Handler exception when handling command" << cmd->type()
283  << "on connection" << m_identifier << ":" << e.what();
284  }
285  } catch (const Akonadi::Server::Exception &e) {
286  if (m_currentHandler) {
287  try {
288  m_currentHandler->failureResponse(QString::fromUtf8(e.type()) + QLatin1String(": ") + QString::fromUtf8(e.what()));
289  } catch (...) {
290  m_connectionClosing = true;
291  }
292  qCWarning(AKONADISERVER_LOG) << "General exception when handling command" << cmd->type()
293  << "on connection" << m_identifier << ":" << e.what();
294  }
295  } catch (const Akonadi::ProtocolException &e) {
296  // No point trying to send anything back to client, the connection is
297  // already messed up
298  qCWarning(AKONADISERVER_LOG) << "Protocol exception when handling command" << cmd->type()
299  << "on connection" << m_identifier << ":" << e.what();
300  m_connectionClosing = true;
301 #if defined(Q_OS_LINUX) && !defined(_LIBCPP_VERSION)
302  } catch (abi::__forced_unwind&) {
303  // HACK: NPTL throws __forced_unwind during thread cancellation and
304  // we *must* rethrow it otherwise the program aborts. Due to the issue
305  // described in #376385 we might end up destroying (cancelling) the
306  // thread from a nested loop executed inside parseStream() above,
307  // so the exception raised in there gets caught by this try..catch
308  // statement and it must be rethrown at all cost. Remove this hack
309  // once the root problem is fixed.
310  throw;
311 #endif
312  } catch (...) {
313  qCCritical(AKONADISERVER_LOG) << "Unknown exception while handling command" << cmd->type()
314  << "on connection" << m_identifier;
315  if (m_currentHandler) {
316  try {
317  m_currentHandler->failureResponse("Unknown exception caught");
318  } catch (...) {
319  m_connectionClosing = true;
320  }
321  }
322  }
323  if (m_reportTime) {
324  stopTime(currentCommand);
325  }
326  m_currentHandler.reset();
327 
328  if (!m_socket || m_socket->state() != QLocalSocket::ConnectedState) {
329  Q_EMIT disconnected();
330  return;
331  }
332 
333  if (m_connectionClosing) {
334  break;
335  }
336  }
337 
338  // reset, arm the timer
339  m_idleTimer->start(IDLE_TIMER_TIMEOUT);
340 
341  if (m_connectionClosing) {
342  break;
343  }
344  }
345 
346  if (m_connectionClosing) {
347  m_socket->disconnect(this);
348  m_socket->close();
349  QTimer::singleShot(0, this, &Connection::quit);
350  }
351 }
352 
353 const CommandContext &Connection::context() const
354 {
355  return m_context;
356 }
357 
358 void Connection::setContext(const CommandContext &context)
359 {
360  m_context = context;
361 }
362 
363 std::unique_ptr<Handler> Connection::findHandlerForCommand(Protocol::Command::Type command)
364 {
365  auto handler = Handler::findHandlerForCommandAlwaysAllowed(command, m_akonadi);
366  if (handler) {
367  return handler;
368  }
369 
370  switch (m_connectionState) {
371  case NonAuthenticated:
372  handler = Handler::findHandlerForCommandNonAuthenticated(command, m_akonadi);
373  break;
374  case Authenticated:
375  handler = Handler::findHandlerForCommandAuthenticated(command, m_akonadi);
376  break;
377  case LoggingOut:
378  break;
379  }
380 
381  return handler;
382 }
383 
384 qint64 Connection::currentTag() const
385 {
386  return m_currentHandler->tag();
387 }
388 
389 void Connection::setState(ConnectionState state)
390 {
391  if (state == m_connectionState) {
392  return;
393  }
394  m_connectionState = state;
395  switch (m_connectionState) {
396  case NonAuthenticated:
397  assert(0); // can't happen, it's only the initial state, we can't go back to it
398  break;
399  case Authenticated:
400  break;
401  case LoggingOut:
402  m_socket->disconnectFromServer();
403  break;
404  }
405 }
406 
407 void Connection::setSessionId(const QByteArray &id)
408 {
409  m_identifier = QString::asprintf("%s (%p)", id.data(), static_cast<void *>(this));
410  m_akonadi.tracer().beginConnection(m_identifier, QString());
411  //m_streamParser->setTracerIdentifier(m_identifier);
412 
413  m_sessionId = id;
415  // this races with the use of objectName() in QThreadPrivate::start
416  //thread()->setObjectName(objectName() + QStringLiteral("-Thread"));
417  storageBackend()->setSessionId(id);
418 }
419 
420 QByteArray Connection::sessionId() const
421 {
422  return m_sessionId;
423 }
424 
425 bool Connection::isOwnerResource(const PimItem &item) const
426 {
427  if (context().resource().isValid() && item.collection().resourceId() == context().resource().id()) {
428  return true;
429  }
430  // fallback for older resources
431  if (sessionId() == item.collection().resource().name().toUtf8()) {
432  return true;
433  }
434  return false;
435 }
436 
437 bool Connection::isOwnerResource(const Collection &collection) const
438 {
439  if (context().resource().isValid() && collection.resourceId() == context().resource().id()) {
440  return true;
441  }
442  if (sessionId() == collection.resource().name().toUtf8()) {
443  return true;
444  }
445  return false;
446 }
447 
448 bool Connection::verifyCacheOnRetrieval() const
449 {
450  return m_verifyCacheOnRetrieval;
451 }
452 
453 void Connection::startTime()
454 {
455  m_time.start();
456 }
457 
458 void Connection::stopTime(const QString &identifier)
459 {
460  int elapsed = m_time.elapsed();
461  m_totalTime += elapsed;
462  m_totalTimeByHandler[identifier] += elapsed;
463  m_executionsByHandler[identifier]++;
464  qCDebug(AKONADISERVER_LOG) << identifier << " time : " << elapsed << " total: " << m_totalTime;
465 }
466 
467 void Connection::reportTime() const
468 {
469  qCDebug(AKONADISERVER_LOG) << "===== Time report for " << m_identifier << " =====";
470  qCDebug(AKONADISERVER_LOG) << " total: " << m_totalTime;
471  for (auto it = m_totalTimeByHandler.cbegin(), end = m_totalTimeByHandler.cend(); it != end; ++it) {
472  const QString &handler = it.key();
473  qCDebug(AKONADISERVER_LOG) << "handler : " << handler << " time: " << m_totalTimeByHandler.value(handler) << " executions " << m_executionsByHandler.value(handler) << " avg: " << m_totalTimeByHandler.value(handler) / m_executionsByHandler.value(handler);
474  }
475 }
476 
477 void Connection::sendResponse(qint64 tag, const Protocol::CommandPtr &response)
478 {
479  if (m_akonadi.tracer().currentTracer() != QLatin1String("null")) {
480  m_akonadi.tracer().connectionOutput(m_identifier, tag, response);
481  }
482  Protocol::DataStream stream(m_socket.get());
483  stream << tag;
484  Protocol::serialize(stream, response);
485  stream.flush();
486  if (!m_socket->waitForBytesWritten()) {
487  if (m_socket->state() == QLocalSocket::ConnectedState) {
488  throw ProtocolException("Server write timeout");
489  } else {
490  // The client has disconnected before we managed to send our response,
491  // which is not an error
492  }
493  }
494 }
495 
496 
497 Protocol::CommandPtr Connection::readCommand()
498 {
499  while (m_socket->bytesAvailable() < (int) sizeof(qint64)) {
500  Protocol::DataStream::waitForData(m_socket.get(), 10000); // 10 seconds, just in case client is busy
501  }
502 
503  Protocol::DataStream stream(m_socket.get());
504  qint64 tag;
505  stream >> tag;
506 
507  // TODO: compare tag with m_currentHandler->tag() ?
508  return Protocol::deserialize(m_socket.get());
509 }
unsigned int version()
void quit()
void connected()
Base class for exception used internally by the Akonadi server.
QString asprintf(const char *cformat,...)
Resource resource() const
Retrieve the Resource record referred to by the resourceId column of this record. ...
void setState(QAbstractItemView::State state)
qint64 resourceId() const
Returns the value of the resourceId column of this record.
Definition: entities.cpp:1455
This class catches DbDeadlockException (as emitted by QueryBuilder) and retries execution of the meth...
void timeout()
QString fromUtf8(const char *str, int size)
Collection collection() const
Retrieve the Collection record referred to by the collectionId column of this record.
Definition: entities.cpp:4555
void setObjectName(const QString &name)
void stateChanged(QLocalSocket::LocalSocketState socketState)
int generation() const
Returns the value of the generation column of this record.
Definition: entities.cpp:143
QAbstractItemView::State state() const const
QThread * currentThread()
Helper integration between Akonadi and Qt.
void disconnected()
QString fromLatin1(const char *str, int size)
Representation of a record in the SchemaVersion table.
Definition: entities.h:77
void readyRead()
This class handles all the database access.
Definition: datastore.h:102
QMetaObject::Connection connect(const QObject *sender, const char *signal, const QObject *receiver, const char *method, Qt::ConnectionType type)
QString name() const
Returns the value of the name column of this record.
Definition: entities.cpp:545
Q_EMITQ_EMIT
Representation of a record in the PimItem table.
Definition: entities.h:1194
An Connection represents one connection of a client to the server.
Definition: connection.h:53
Representation of a record in the Collection table.
Definition: entities.h:451
QByteArray toUtf8() const const
This file is part of the KDE documentation.
Documentation copyright © 1996-2020 The KDE developers.
Generated on Tue May 26 2020 22:46:18 by doxygen 1.8.11 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.