Akonadi

core/connection.cpp
1/*
2 * SPDX-FileCopyrightText: 2015 Daniel Vrátil <dvratil@redhat.com>
3 *
4 * SPDX-License-Identifier: LGPL-2.0-or-later
5 */
6
7#include "akonadicore_debug.h"
8#include "commandbuffer_p.h"
9#include "connection_p.h"
10#include "private/instance_p.h"
11#include "servermanager_p.h"
12#include "session_p.h"
13
14#include <QAbstractEventDispatcher>
15#include <QApplication>
16#include <QDateTime>
17#include <QFile>
18#include <QFileInfo>
19#include <QSettings>
20#include <QTimer>
21
22#include "private/datastream_p_p.h"
23#include "private/protocol_exception_p.h"
24#include "private/standarddirs_p.h"
25
26using namespace Akonadi;
27
28Connection::Connection(ConnectionType connType, const QByteArray &sessionId, CommandBuffer *commandBuffer, QObject *parent)
29 : QObject(parent)
30 , mConnectionType(connType)
31 , mSessionId(sessionId)
32 , mCommandBuffer(commandBuffer)
33{
34 qRegisterMetaType<Protocol::CommandPtr>();
35 qRegisterMetaType<QAbstractSocket::SocketState>();
36
37 const QByteArray sessionLogFile = qgetenv("AKONADI_SESSION_LOGFILE");
38 if (!sessionLogFile.isEmpty()) {
39 mLogFile = new QFile(QStringLiteral("%1.%2.%3.%4-%5")
40 .arg(QString::fromLatin1(sessionLogFile))
42 .arg(QString::number(reinterpret_cast<qulonglong>(this), 16),
43 QString::fromLatin1(mSessionId.replace('/', '_')),
44 connType == CommandConnection ? QStringLiteral("Cmd") : QStringLiteral("Ntf")));
45 if (!mLogFile->open(QIODevice::WriteOnly | QIODevice::Truncate)) {
46 qCWarning(AKONADICORE_LOG) << "Failed to open Akonadi Session log file" << mLogFile->fileName();
47 delete mLogFile;
48 mLogFile = nullptr;
49 }
50 }
51}
52
53Connection::~Connection()
54{
55 delete mLogFile;
56 if (mSocket) {
57 mSocket->disconnect();
58 mSocket->disconnectFromServer();
59 mSocket->close();
60 mSocket.reset();
61 }
62}
63
64void Connection::reconnect()
65{
66 const bool ok = QMetaObject::invokeMethod(this, &Connection::doReconnect, Qt::QueuedConnection);
67 Q_ASSERT(ok);
68 Q_UNUSED(ok)
69}
70
71QString Connection::defaultAddressForTypeAndMethod(ConnectionType type, const QString &method)
72{
73 if (method == QLatin1StringView("UnixPath")) {
74 const QString defaultSocketDir = StandardDirs::saveDir("data");
75 if (type == CommandConnection) {
76 return defaultSocketDir % QStringLiteral("akonadiserver-cmd.socket");
77 } else if (type == NotificationConnection) {
78 return defaultSocketDir % QStringLiteral("akonadiserver-ntf.socket");
79 }
80 } else if (method == QLatin1StringView("NamedPipe")) {
81 QString suffix;
82 if (Instance::hasIdentifier()) {
83 suffix += QStringLiteral("%1-").arg(Instance::identifier());
84 }
85 suffix += QString::fromUtf8(QUrl::toPercentEncoding(qApp->applicationDirPath()));
86 if (type == CommandConnection) {
87 return QStringLiteral("Akonadi-Cmd-") % suffix;
88 } else if (type == NotificationConnection) {
89 return QStringLiteral("Akonadi-Ntf-") % suffix;
90 }
91 }
92
93 Q_UNREACHABLE();
94}
95
96void Connection::doReconnect()
97{
98 Q_ASSERT(QThread::currentThread() == thread());
99
100 if (mSocket && (mSocket->state() == QLocalSocket::ConnectedState || mSocket->state() == QLocalSocket::ConnectingState)) {
101 // nothing to do, we are still/already connected
102 return;
103 }
104
105 if (ServerManager::self()->state() != ServerManager::Running) {
106 return;
107 }
108
109 // try to figure out where to connect to
110 QString serverAddress;
111
112 // env var has precedence
113 const QByteArray serverAddressEnvVar = qgetenv("AKONADI_SERVER_ADDRESS");
114 if (!serverAddressEnvVar.isEmpty()) {
115 const int pos = serverAddressEnvVar.indexOf(':');
116 const QByteArray protocol = serverAddressEnvVar.left(pos);
118 const QStringList lst = QString::fromLatin1(serverAddressEnvVar.mid(pos + 1)).split(QLatin1Char(','));
119 for (const QString &entry : lst) {
120 const QStringList pair = entry.split(QLatin1Char('='));
121 if (pair.size() != 2) {
122 continue;
123 }
124 options.insert(pair.first(), pair.last());
125 }
126
127 if (protocol == "unix") {
128 serverAddress = options.value(QStringLiteral("path"));
129 } else if (protocol == "pipe") {
130 serverAddress = options.value(QStringLiteral("name"));
131 }
132 }
133
134 // try config file next, fall back to defaults if that fails as well
135 if (serverAddress.isEmpty()) {
136 const QString connectionConfigFile = StandardDirs::connectionConfigFile();
137 const QFileInfo fileInfo(connectionConfigFile);
138 if (!fileInfo.exists()) {
139 qCWarning(AKONADICORE_LOG) << "Akonadi Client Session: connection config file '"
140 "akonadi/akonadiconnectionrc' can not be found!";
141 }
142
143 QSettings connectionSettings(connectionConfigFile, QSettings::IniFormat);
144
145 QString connectionType;
146 if (mConnectionType == CommandConnection) {
147 connectionType = QStringLiteral("Data");
148 } else if (mConnectionType == NotificationConnection) {
149 connectionType = QStringLiteral("Notifications");
150 }
151
152 connectionSettings.beginGroup(connectionType);
153 const auto method = connectionSettings.value(QStringLiteral("Method"), QStringLiteral("UnixPath")).toString();
154 serverAddress = connectionSettings.value(method, defaultAddressForTypeAndMethod(mConnectionType, method)).toString();
155 }
156
157 mSocket.reset(new QLocalSocket(this));
158 connect(mSocket.data(), &QLocalSocket::errorOccurred, this, [this](QLocalSocket::LocalSocketError /*unused*/) {
159 qCWarning(AKONADICORE_LOG) << mSocket->errorString() << mSocket->serverName();
160 Q_EMIT socketError(mSocket->errorString());
161 Q_EMIT socketDisconnected();
162 });
163 connect(mSocket.data(), &QLocalSocket::disconnected, this, &Connection::socketDisconnected);
164 // note: we temporarily disconnect from readyRead-signal inside handleIncomingData()
165 connect(mSocket.data(), &QLocalSocket::readyRead, this, &Connection::handleIncomingData);
166
167 // actually do connect
168 qCDebug(AKONADICORE_LOG) << "connectToServer" << serverAddress;
169 mSocket->connectToServer(serverAddress);
170 if (!mSocket->waitForConnected()) {
171 qCWarning(AKONADICORE_LOG) << "Failed to connect to server!";
172 Q_EMIT socketError(tr("Failed to connect to server!"));
173 mSocket.reset();
174 return;
175 }
176
177 QTimer::singleShot(0, this, &Connection::handleIncomingData);
178
179 Q_EMIT reconnected();
180}
181
182void Connection::forceReconnect()
183{
184 const bool ok = QMetaObject::invokeMethod(this, &Connection::doForceReconnect, Qt::QueuedConnection);
185
186 Q_ASSERT(ok);
187 Q_UNUSED(ok)
188}
189
190void Connection::doForceReconnect()
191{
192 Q_ASSERT(QThread::currentThread() == thread());
193
194 if (mSocket) {
195 disconnect(mSocket.get(), &QLocalSocket::disconnected, this, &Connection::socketDisconnected);
196 mSocket->disconnectFromServer();
197 mSocket.reset();
198 }
199}
200
201void Connection::closeConnection()
202{
203 const bool ok = QMetaObject::invokeMethod(this, &Connection::doCloseConnection, Qt::QueuedConnection);
204 Q_ASSERT(ok);
205 Q_UNUSED(ok)
206}
207
208void Connection::doCloseConnection()
209{
210 Q_ASSERT(QThread::currentThread() == thread());
211
212 if (mSocket) {
213 mSocket->close();
214 mSocket.reset();
215 }
216}
217
218QLocalSocket *Connection::socket() const
219{
220 return mSocket.data();
221}
222
223void Connection::handleIncomingData()
224{
225 Q_ASSERT(QThread::currentThread() == thread());
226
227 if (!mSocket) { // not connected yet
228 return;
229 }
230
231 while (mSocket->bytesAvailable() >= int(sizeof(qint64))) {
232 Protocol::DataStream stream(mSocket.data());
233 qint64 tag;
234 stream >> tag;
235
236 // temporarily disconnect from readyRead-signal to avoid re-entering this function when we
237 // call waitForData() deep inside Protocol::deserialize
238 disconnect(mSocket.data(), &QLocalSocket::readyRead, this, &Connection::handleIncomingData);
239
241 try {
242 cmd = Protocol::deserialize(mSocket.data());
243 } catch (const Akonadi::ProtocolException &e) {
244 qCWarning(AKONADICORE_LOG) << "Protocol exception:" << e.what();
245 // cmd's type will be Invalid by default, so fall-through
246 }
247
248 // reconnect to the signal again
249 connect(mSocket.data(), &QLocalSocket::readyRead, this, &Connection::handleIncomingData);
250
251 if (!cmd || (cmd->type() == Protocol::Command::Invalid)) {
252 qCWarning(AKONADICORE_LOG) << "Invalid command, the world is going to end!";
253 mSocket->close();
254 reconnect();
255 return;
256 }
257
258 if (mLogFile) {
259 mLogFile->write("S: ");
260 mLogFile->write(QDateTime::currentDateTime().toString(QStringLiteral("yyyy-MM-dd hh:mm:ss.zzz ")).toUtf8());
261 mLogFile->write(QByteArray::number(tag));
262 mLogFile->write(" ");
263 mLogFile->write(Protocol::debugString(cmd).toUtf8());
264 mLogFile->write("\n\n");
265 mLogFile->flush();
266 }
267
268 if (cmd->type() == Protocol::Command::Hello) {
269 Q_ASSERT(cmd->isResponse());
270 }
271
272 {
273 CommandBufferLocker locker(mCommandBuffer);
274 mCommandBuffer->enqueue(tag, cmd);
275 }
276 }
277}
278
279void Connection::sendCommand(qint64 tag, const Protocol::CommandPtr &cmd)
280{
281 const bool ok = QMetaObject::invokeMethod(this, "doSendCommand", Qt::QueuedConnection, Q_ARG(qint64, tag), Q_ARG(Akonadi::Protocol::CommandPtr, cmd));
282 Q_ASSERT(ok);
283 Q_UNUSED(ok)
284}
285
286void Connection::doSendCommand(qint64 tag, const Protocol::CommandPtr &cmd)
287{
288 Q_ASSERT(QThread::currentThread() == thread());
289
290 if (mLogFile) {
291 mLogFile->write("C: ");
292 mLogFile->write(QDateTime::currentDateTime().toString(QStringLiteral("yyyy-MM-dd hh:mm:ss.zzz ")).toUtf8());
293 mLogFile->write(QByteArray::number(tag));
294 mLogFile->write(" ");
295 mLogFile->write(Protocol::debugString(cmd).toUtf8());
296 mLogFile->write("\n\n");
297 mLogFile->flush();
298 }
299
300 if (mSocket && mSocket->isOpen()) {
301 Protocol::DataStream stream(mSocket.data());
302 try {
303 stream << tag;
304 Protocol::serialize(stream, cmd);
305 stream.flush();
306 } catch (const Akonadi::ProtocolException &e) {
307 qCWarning(AKONADICORE_LOG) << "Protocol Exception:" << QString::fromUtf8(e.what());
308 mSocket->close();
309 reconnect();
310 return;
311 }
312 if (!mSocket->waitForBytesWritten()) {
313 qCWarning(AKONADICORE_LOG) << "Socket write timeout";
314 mSocket->close();
315 reconnect();
316 return;
317 }
318 } else {
319 // TODO: Queue the commands and resend on reconnect?
320 }
321}
322
323#include "moc_connection_p.cpp"
Helper integration between Akonadi and Qt.
char * toString(const EngineQuery &query)
qsizetype indexOf(QByteArrayView bv, qsizetype from) const const
bool isEmpty() const const
QByteArray left(qsizetype len) const const
QByteArray mid(qsizetype pos, qsizetype len) const const
QByteArray number(double n, char format, int precision)
qint64 applicationPid()
QDateTime currentDateTime()
virtual QString fileName() const const override
void readyRead()
T & first()
T & last()
qsizetype size() const const
void disconnected()
void errorOccurred(QLocalSocket::LocalSocketError socketError)
iterator insert(const Key &key, const T &value)
T value(const Key &key, const T &defaultValue) const const
bool invokeMethod(QObject *context, Functor &&function, FunctorReturnType *ret)
QString arg(Args &&... args) const const
QString fromLatin1(QByteArrayView str)
QString fromUtf8(QByteArrayView str)
bool isEmpty() const const
QString number(double n, char format, int precision)
QStringList split(QChar sep, Qt::SplitBehavior behavior, Qt::CaseSensitivity cs) const const
QueuedConnection
QFuture< ArgsType< Signal > > connect(Sender *sender, Signal signal)
QThread * currentThread()
QByteArray toPercentEncoding(const QString &input, const QByteArray &exclude, const QByteArray &include)
This file is part of the KDE documentation.
Documentation copyright © 1996-2024 The KDE developers.
Generated on Tue Mar 26 2024 11:13:38 by doxygen 1.10.0 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.