7 #include "connection.h"
8 #include "akonadiserver_debug.h"
12 #include <QThreadStorage>
15 #include "notificationmanager.h"
16 #include "storage/datastore.h"
17 #include "storage/dbdeadlockcatcher.h"
25 #include <private/standarddirs_p.h>
28 using namespace Akonadi::Server;
30 #define IDLE_TIMER_TIMEOUT 180000 // 3 min
38 Connection::Connection(AkonadiServer &akonadi)
39 : AkThread(connectionIdentifier(this),
QThread::InheritPriority)
44 Connection::Connection(quintptr socketDescriptor, AkonadiServer &akonadi)
45 : AkThread(connectionIdentifier(this),
QThread::InheritPriority)
48 m_socketDescriptor = socketDescriptor;
49 m_identifier = connectionIdentifier(
this);
52 m_verifyCacheOnRetrieval = settings.value(QStringLiteral(
"Cache/VerifyOnRetrieval"), m_verifyCacheOnRetrieval).toBool();
55 void Connection::init()
59 auto socket = std::make_unique<QLocalSocket>();
60 if (!socket->setSocketDescriptor(m_socketDescriptor)) {
61 qCWarning(AKONADISERVER_LOG) <<
"Connection(" << m_identifier <<
")::run: failed to set socket descriptor: " << socket->error() <<
"("
62 << socket->errorString() <<
")";
66 m_socket = std::move(socket);
69 m_idleTimer = std::make_unique<QTimer>();
70 connect(m_idleTimer.get(), &
QTimer::timeout,
this, &Connection::slotConnectionIdle);
72 storageBackend()->notificationCollector()->setConnection(
this);
82 }
catch (
const ProtocolException &e) {
83 qCWarning(AKONADISERVER_LOG) <<
"Protocol Exception sending \"hello\" on connection" << m_identifier <<
":" << e.what();
84 m_socket->disconnectFromServer();
88 void Connection::quit()
91 m_connectionClosing =
true;
92 Q_EMIT connectionClosing();
96 m_akonadi.tracer().endConnection(m_identifier,
QString());
104 void Connection::slotSendHello()
106 SchemaVersion
version = SchemaVersion::retrieveAll().at(0);
108 Protocol::HelloResponse hello;
109 hello.setServerName(QStringLiteral(
"Akonadi"));
110 hello.setMessage(QStringLiteral(
"Not Really IMAP server"));
111 hello.setProtocolVersion(Protocol::version());
112 hello.setGeneration(
version.generation());
113 sendResponse(0, std::move(hello));
119 m_backend = DataStore::self();
124 Connection::~Connection()
133 void Connection::slotConnectionIdle()
135 Q_ASSERT(m_currentHandler ==
nullptr);
136 if (m_backend && m_backend->isOpened()) {
137 if (m_backend->inTransaction()) {
142 qCInfo(AKONADISERVER_LOG) << m_sessionId <<
"NOT Closing idle db connection; we are in transaction";
149 void Connection::slotSocketDisconnected()
153 if (m_currentHandler) {
157 Q_EMIT disconnected();
162 if (!m_currentHandler->parseStream()) {
164 m_currentHandler->failureResponse(
"Error while handling a command");
166 m_connectionClosing =
true;
168 qCWarning(AKONADISERVER_LOG) <<
"Error while handling command" << cmd->type() <<
"on connection" << m_identifier;
172 void Connection::handleIncomingData()
181 if (m_socket->bytesAvailable() <
int(
sizeof(qint64))) {
202 if (!storageBackend()->isOpened()) {
207 while (m_socket->bytesAvailable() >=
int(
sizeof(qint64))) {
208 Protocol::DataStream stream(m_socket.get());
215 cmd = Protocol::deserialize(m_socket.get());
216 }
catch (
const Akonadi::ProtocolException &e) {
217 qCWarning(AKONADISERVER_LOG) <<
"ProtocolException while deserializing incoming data on connection" << m_identifier <<
":" << e.what();
218 setState(Server::LoggingOut);
220 }
catch (
const std::exception &e) {
221 qCWarning(AKONADISERVER_LOG) <<
"Unknown exception while deserializing incoming data on connection" << m_identifier <<
":" << e.what();
222 setState(Server::LoggingOut);
225 if (cmd->type() == Protocol::Command::Invalid) {
226 qCWarning(AKONADISERVER_LOG) <<
"Received an invalid command on connection" << m_identifier <<
": resetting connection";
227 setState(Server::LoggingOut);
232 m_context.setTag(std::nullopt);
233 m_context.setCollection({});
234 if (m_akonadi.tracer().currentTracer() !=
QLatin1String(
"null")) {
235 m_akonadi.tracer().connectionInput(m_identifier, tag, cmd);
238 m_currentHandler = findHandlerForCommand(cmd->type());
239 if (!m_currentHandler) {
240 qCWarning(AKONADISERVER_LOG) <<
"Invalid command: no such handler for" << cmd->type() <<
"on connection" << m_identifier;
241 setState(Server::LoggingOut);
248 m_currentHandler->setConnection(
this);
249 m_currentHandler->setTag(tag);
250 m_currentHandler->setCommand(cmd);
255 }
catch (
const Akonadi::Server::HandlerException &e) {
256 if (m_currentHandler) {
258 m_currentHandler->failureResponse(e.what());
260 m_connectionClosing =
true;
262 qCWarning(AKONADISERVER_LOG) <<
"Handler exception when handling command" << cmd->type() <<
"on connection" << m_identifier <<
":"
266 if (m_currentHandler) {
270 m_connectionClosing =
true;
272 qCWarning(AKONADISERVER_LOG) <<
"General exception when handling command" << cmd->type() <<
"on connection" << m_identifier <<
":"
275 }
catch (
const Akonadi::ProtocolException &e) {
278 qCWarning(AKONADISERVER_LOG) <<
"Protocol exception when handling command" << cmd->type() <<
"on connection" << m_identifier <<
":" << e.what();
279 m_connectionClosing =
true;
280 #if defined(Q_OS_LINUX) && !defined(_LIBCPP_VERSION)
281 }
catch (abi::__forced_unwind &) {
292 qCCritical(AKONADISERVER_LOG) <<
"Unknown exception while handling command" << cmd->type() <<
"on connection" << m_identifier;
293 if (m_currentHandler) {
295 m_currentHandler->failureResponse(
"Unknown exception caught");
297 m_connectionClosing =
true;
302 stopTime(currentCommand);
304 m_currentHandler.reset();
307 Q_EMIT disconnected();
311 if (m_connectionClosing) {
317 m_idleTimer->start(IDLE_TIMER_TIMEOUT);
319 if (m_connectionClosing) {
324 if (m_connectionClosing) {
325 m_socket->disconnect(
this);
331 const CommandContext &Connection::context()
const
336 void Connection::setContext(
const CommandContext &context)
341 std::unique_ptr<Handler> Connection::findHandlerForCommand(Protocol::Command::Type command)
343 auto handler = Handler::findHandlerForCommandAlwaysAllowed(command, m_akonadi);
348 switch (m_connectionState) {
349 case NonAuthenticated:
350 handler = Handler::findHandlerForCommandNonAuthenticated(command, m_akonadi);
353 handler = Handler::findHandlerForCommandAuthenticated(command, m_akonadi);
362 qint64 Connection::currentTag()
const
364 return m_currentHandler->tag();
367 void Connection::setState(ConnectionState state)
369 if (state == m_connectionState) {
372 m_connectionState = state;
373 switch (m_connectionState) {
374 case NonAuthenticated:
380 m_socket->disconnectFromServer();
385 void Connection::setSessionId(
const QByteArray &
id)
387 m_identifier =
QString::asprintf(
"%s (%p)",
id.data(),
static_cast<void *
>(
this));
388 m_akonadi.tracer().beginConnection(m_identifier,
QString());
395 storageBackend()->setSessionId(
id);
403 bool Connection::isOwnerResource(
const PimItem &item)
const
405 if (context().resource().isValid() && item.collection().resourceId() == context().resource().
id()) {
409 if (sessionId() == item.collection().resource().name().toUtf8()) {
415 bool Connection::isOwnerResource(
const Collection &collection)
const
417 if (context().resource().isValid() && collection.resourceId() == context().resource().
id()) {
420 if (sessionId() == collection.resource().name().
toUtf8()) {
426 bool Connection::verifyCacheOnRetrieval()
const
428 return m_verifyCacheOnRetrieval;
431 void Connection::startTime()
436 void Connection::stopTime(
const QString &identifier)
438 int elapsed = m_time.elapsed();
439 m_totalTime += elapsed;
440 m_totalTimeByHandler[identifier] += elapsed;
441 m_executionsByHandler[identifier]++;
442 qCDebug(AKONADISERVER_LOG) << identifier <<
" time : " << elapsed <<
" total: " << m_totalTime;
445 void Connection::reportTime()
const
447 qCDebug(AKONADISERVER_LOG) <<
"===== Time report for " << m_identifier <<
" =====";
448 qCDebug(AKONADISERVER_LOG) <<
" total: " << m_totalTime;
449 for (
auto it = m_totalTimeByHandler.cbegin(), end = m_totalTimeByHandler.cend(); it != end; ++it) {
450 const QString &handler = it.key();
451 qCDebug(AKONADISERVER_LOG) <<
"handler : " << handler <<
" time: " << m_totalTimeByHandler.value(handler) <<
" executions "
452 << m_executionsByHandler.value(handler)
453 <<
" avg: " << m_totalTimeByHandler.value(handler) / m_executionsByHandler.value(handler);
459 if (m_akonadi.tracer().currentTracer() !=
QLatin1String(
"null")) {
460 m_akonadi.tracer().connectionOutput(m_identifier, tag, response);
462 Protocol::DataStream stream(m_socket.get());
464 Protocol::serialize(stream, response);
466 if (!m_socket->waitForBytesWritten()) {
468 throw ProtocolException(
"Server write timeout");
478 while (m_socket->bytesAvailable() <
static_cast<int>(
sizeof(qint64))) {
479 Protocol::DataStream::waitForData(m_socket.get(), 30000);
482 Protocol::DataStream stream(m_socket.get());
487 return Protocol::deserialize(m_socket.get());