Akonadi

core/connection.cpp
1 /*
2  * SPDX-FileCopyrightText: 2015 Daniel Vrátil <[email protected]>
3  *
4  * SPDX-License-Identifier: LGPL-2.0-or-later
5  */
6 
7 #include "akonadicore_debug.h"
8 #include "commandbuffer_p.h"
9 #include "connection_p.h"
10 #include "servermanager_p.h"
11 #include "session_p.h"
12 #include <private/instance_p.h>
13 
14 #include <QAbstractEventDispatcher>
15 #include <QApplication>
16 #include <QDateTime>
17 #include <QFile>
18 #include <QFileInfo>
19 #include <QSettings>
20 #include <QTimer>
21 
22 #include <private/datastream_p_p.h>
23 #include <private/protocol_exception_p.h>
24 #include <private/standarddirs_p.h>
25 
26 using namespace Akonadi;
27 
28 Connection::Connection(ConnectionType connType, const QByteArray &sessionId, CommandBuffer *commandBuffer, QObject *parent)
29  : QObject(parent)
30  , mConnectionType(connType)
31  , mSessionId(sessionId)
32  , mCommandBuffer(commandBuffer)
33 {
34  qRegisterMetaType<Protocol::CommandPtr>();
35  qRegisterMetaType<QAbstractSocket::SocketState>();
36 
37  const QByteArray sessionLogFile = qgetenv("AKONADI_SESSION_LOGFILE");
38  if (!sessionLogFile.isEmpty()) {
39  mLogFile = new QFile(QStringLiteral("%1.%2.%3.%4-%5")
40  .arg(QString::fromLatin1(sessionLogFile))
42  .arg(QString::number(reinterpret_cast<qulonglong>(this), 16),
43  QString::fromLatin1(mSessionId.replace('/', '_')),
44  connType == CommandConnection ? QStringLiteral("Cmd") : QStringLiteral("Ntf")));
45  if (!mLogFile->open(QIODevice::WriteOnly | QIODevice::Truncate)) {
46  qCWarning(AKONADICORE_LOG) << "Failed to open Akonadi Session log file" << mLogFile->fileName();
47  delete mLogFile;
48  mLogFile = nullptr;
49  }
50  }
51 }
52 
53 Connection::~Connection()
54 {
55  delete mLogFile;
56  if (mSocket) {
57  mSocket->disconnect();
58  mSocket->disconnectFromServer();
59  mSocket->close();
60  mSocket.reset();
61  }
62 }
63 
64 void Connection::reconnect()
65 {
66  const bool ok = QMetaObject::invokeMethod(this, &Connection::doReconnect, Qt::QueuedConnection);
67  Q_ASSERT(ok);
68  Q_UNUSED(ok)
69 }
70 
71 QString Connection::defaultAddressForTypeAndMethod(ConnectionType type, const QString &method)
72 {
73  if (method == QLatin1String("UnixPath")) {
74  const QString defaultSocketDir = StandardDirs::saveDir("data");
75  if (type == CommandConnection) {
76  return defaultSocketDir % QStringLiteral("akonadiserver-cmd.socket");
77  } else if (type == NotificationConnection) {
78  return defaultSocketDir % QStringLiteral("akonadiserver-ntf.socket");
79  }
80  } else if (method == QLatin1String("NamedPipe")) {
81  QString suffix;
82  if (Instance::hasIdentifier()) {
83  suffix += QStringLiteral("%1-").arg(Instance::identifier());
84  }
85  suffix += QString::fromUtf8(QUrl::toPercentEncoding(qApp->applicationDirPath()));
86  if (type == CommandConnection) {
87  return QStringLiteral("Akonadi-Cmd-") % suffix;
88  } else if (type == NotificationConnection) {
89  return QStringLiteral("Akonadi-Ntf-") % suffix;
90  }
91  }
92 
93  Q_UNREACHABLE();
94 }
95 
96 void Connection::doReconnect()
97 {
98  Q_ASSERT(QThread::currentThread() == thread());
99 
100  if (mSocket && (mSocket->state() == QLocalSocket::ConnectedState || mSocket->state() == QLocalSocket::ConnectingState)) {
101  // nothing to do, we are still/already connected
102  return;
103  }
104 
105  if (ServerManager::self()->state() != ServerManager::Running) {
106  return;
107  }
108 
109  // try to figure out where to connect to
110  QString serverAddress;
111 
112  // env var has precedence
113  const QByteArray serverAddressEnvVar = qgetenv("AKONADI_SERVER_ADDRESS");
114  if (!serverAddressEnvVar.isEmpty()) {
115  const int pos = serverAddressEnvVar.indexOf(':');
116  const QByteArray protocol = serverAddressEnvVar.left(pos);
117  QMap<QString, QString> options;
118  const QStringList lst = QString::fromLatin1(serverAddressEnvVar.mid(pos + 1)).split(QLatin1Char(','));
119  for (const QString &entry : lst) {
120  const QStringList pair = entry.split(QLatin1Char('='));
121  if (pair.size() != 2) {
122  continue;
123  }
124  options.insert(pair.first(), pair.last());
125  }
126 
127  if (protocol == "unix") {
128  serverAddress = options.value(QStringLiteral("path"));
129  } else if (protocol == "pipe") {
130  serverAddress = options.value(QStringLiteral("name"));
131  }
132  }
133 
134  // try config file next, fall back to defaults if that fails as well
135  if (serverAddress.isEmpty()) {
136  const QString connectionConfigFile = StandardDirs::connectionConfigFile();
137  const QFileInfo fileInfo(connectionConfigFile);
138  if (!fileInfo.exists()) {
139  qCWarning(AKONADICORE_LOG) << "Akonadi Client Session: connection config file '"
140  "akonadi/akonadiconnectionrc' can not be found!";
141  }
142 
143  QSettings connectionSettings(connectionConfigFile, QSettings::IniFormat);
144 
145  QString connectionType;
146  if (mConnectionType == CommandConnection) {
147  connectionType = QStringLiteral("Data");
148  } else if (mConnectionType == NotificationConnection) {
149  connectionType = QStringLiteral("Notifications");
150  }
151 
152  connectionSettings.beginGroup(connectionType);
153  const auto method = connectionSettings.value(QStringLiteral("Method"), QStringLiteral("UnixPath")).toString();
154  serverAddress = connectionSettings.value(method, defaultAddressForTypeAndMethod(mConnectionType, method)).toString();
155  }
156 
157  mSocket.reset(new QLocalSocket(this));
158  connect(mSocket.data(), &QLocalSocket::errorOccurred, this, [this](QLocalSocket::LocalSocketError /*unused*/) {
159  qCWarning(AKONADICORE_LOG) << mSocket->errorString() << mSocket->serverName();
160  Q_EMIT socketError(mSocket->errorString());
161  Q_EMIT socketDisconnected();
162  });
163  connect(mSocket.data(), &QLocalSocket::disconnected, this, &Connection::socketDisconnected);
164  // note: we temporarily disconnect from readyRead-signal inside handleIncomingData()
165  connect(mSocket.data(), &QLocalSocket::readyRead, this, &Connection::handleIncomingData);
166 
167  // actually do connect
168  qCDebug(AKONADICORE_LOG) << "connectToServer" << serverAddress;
169  mSocket->connectToServer(serverAddress);
170  if (!mSocket->waitForConnected()) {
171  qCWarning(AKONADICORE_LOG) << "Failed to connect to server!";
172  Q_EMIT socketError(tr("Failed to connect to server!"));
173  mSocket.reset();
174  return;
175  }
176 
177  QTimer::singleShot(0, this, &Connection::handleIncomingData);
178 
179  Q_EMIT reconnected();
180 }
181 
182 void Connection::forceReconnect()
183 {
184  const bool ok = QMetaObject::invokeMethod(this, &Connection::doForceReconnect, Qt::QueuedConnection);
185 
186  Q_ASSERT(ok);
187  Q_UNUSED(ok)
188 }
189 
190 void Connection::doForceReconnect()
191 {
192  Q_ASSERT(QThread::currentThread() == thread());
193 
194  if (mSocket) {
195  disconnect(mSocket.get(), &QLocalSocket::disconnected, this, &Connection::socketDisconnected);
196  mSocket->disconnectFromServer();
197  mSocket.reset();
198  }
199 }
200 
201 void Connection::closeConnection()
202 {
203  const bool ok = QMetaObject::invokeMethod(this, &Connection::doCloseConnection, Qt::QueuedConnection);
204  Q_ASSERT(ok);
205  Q_UNUSED(ok)
206 }
207 
208 void Connection::doCloseConnection()
209 {
210  Q_ASSERT(QThread::currentThread() == thread());
211 
212  if (mSocket) {
213  mSocket->close();
214  mSocket.reset();
215  }
216 }
217 
218 QLocalSocket *Connection::socket() const
219 {
220  return mSocket.data();
221 }
222 
223 void Connection::handleIncomingData()
224 {
225  Q_ASSERT(QThread::currentThread() == thread());
226 
227  if (!mSocket) { // not connected yet
228  return;
229  }
230 
231  while (mSocket->bytesAvailable() >= int(sizeof(qint64))) {
232  Protocol::DataStream stream(mSocket.data());
233  qint64 tag;
234  stream >> tag;
235 
236  // temporarily disconnect from readyRead-signal to avoid re-entering this function when we
237  // call waitForData() deep inside Protocol::deserialize
238  disconnect(mSocket.data(), &QLocalSocket::readyRead, this, &Connection::handleIncomingData);
239 
241  try {
242  cmd = Protocol::deserialize(mSocket.data());
243  } catch (const Akonadi::ProtocolException &e) {
244  qCWarning(AKONADICORE_LOG) << "Protocol exception:" << e.what();
245  // cmd's type will be Invalid by default, so fall-through
246  }
247 
248  // reconnect to the signal again
249  connect(mSocket.data(), &QLocalSocket::readyRead, this, &Connection::handleIncomingData);
250 
251  if (!cmd || (cmd->type() == Protocol::Command::Invalid)) {
252  qCWarning(AKONADICORE_LOG) << "Invalid command, the world is going to end!";
253  mSocket->close();
254  reconnect();
255  return;
256  }
257 
258  if (mLogFile) {
259  mLogFile->write("S: ");
260  mLogFile->write(QDateTime::currentDateTime().toString(QStringLiteral("yyyy-MM-dd hh:mm:ss.zzz ")).toUtf8());
261  mLogFile->write(QByteArray::number(tag));
262  mLogFile->write(" ");
263  mLogFile->write(Protocol::debugString(cmd).toUtf8());
264  mLogFile->write("\n\n");
265  mLogFile->flush();
266  }
267 
268  if (cmd->type() == Protocol::Command::Hello) {
269  Q_ASSERT(cmd->isResponse());
270  }
271 
272  {
273  CommandBufferLocker locker(mCommandBuffer);
274  mCommandBuffer->enqueue(tag, cmd);
275  }
276  }
277 }
278 
279 void Connection::sendCommand(qint64 tag, const Protocol::CommandPtr &cmd)
280 {
281  const bool ok = QMetaObject::invokeMethod(this, "doSendCommand", Qt::QueuedConnection, Q_ARG(qint64, tag), Q_ARG(Akonadi::Protocol::CommandPtr, cmd));
282  Q_ASSERT(ok);
283  Q_UNUSED(ok)
284 }
285 
286 void Connection::doSendCommand(qint64 tag, const Protocol::CommandPtr &cmd)
287 {
288  Q_ASSERT(QThread::currentThread() == thread());
289 
290  if (mLogFile) {
291  mLogFile->write("C: ");
292  mLogFile->write(QDateTime::currentDateTime().toString(QStringLiteral("yyyy-MM-dd hh:mm:ss.zzz ")).toUtf8());
293  mLogFile->write(QByteArray::number(tag));
294  mLogFile->write(" ");
295  mLogFile->write(Protocol::debugString(cmd).toUtf8());
296  mLogFile->write("\n\n");
297  mLogFile->flush();
298  }
299 
300  if (mSocket && mSocket->isOpen()) {
301  Protocol::DataStream stream(mSocket.data());
302  try {
303  stream << tag;
304  Protocol::serialize(stream, cmd);
305  stream.flush();
306  } catch (const Akonadi::ProtocolException &e) {
307  qCWarning(AKONADICORE_LOG) << "Protocol Exception:" << QString::fromUtf8(e.what());
308  mSocket->close();
309  reconnect();
310  return;
311  }
312  if (!mSocket->waitForBytesWritten()) {
313  qCWarning(AKONADICORE_LOG) << "Socket write timeout";
314  mSocket->close();
315  reconnect();
316  return;
317  }
318  } else {
319  // TODO: Queue the commands and resend on reconnect?
320  }
321 }
T & first()
QString toString(const T &enumerator)
QString number(int n, int base)
QString fromUtf8(const char *str, int size)
int indexOf(char ch, int from) const const
const T value(const Key &key, const T &defaultValue) const const
QDateTime currentDateTime()
QStringList split(const QString &sep, QString::SplitBehavior behavior, Qt::CaseSensitivity cs) const const
static ServerManager * self()
Returns the singleton instance of this class, for connecting to its signals.
QByteArray number(int n, int base)
@ Running
Server is running and operational.
Definition: servermanager.h:39
qint64 applicationPid()
void errorOccurred(QLocalSocket::LocalSocketError socketError)
QMap::iterator insert(const Key &key, const T &value)
int size() const const
QByteArray mid(int pos, int len) const const
bool isEmpty() const const
void disconnected()
void readyRead()
QThread * currentThread()
QueuedConnection
QByteArray toPercentEncoding(const QString &input, const QByteArray &exclude, const QByteArray &include)
T & last()
QByteArray left(int len) const const
bool isEmpty() const const
QString arg(qlonglong a, int fieldWidth, int base, QChar fillChar) const const
QString fromLatin1(const char *str, int size)
bool invokeMethod(QObject *obj, const char *member, Qt::ConnectionType type, QGenericReturnArgument ret, QGenericArgument val0, QGenericArgument val1, QGenericArgument val2, QGenericArgument val3, QGenericArgument val4, QGenericArgument val5, QGenericArgument val6, QGenericArgument val7, QGenericArgument val8, QGenericArgument val9)
Helper integration between Akonadi and Qt.
This file is part of the KDE documentation.
Documentation copyright © 1996-2023 The KDE developers.
Generated on Mon May 8 2023 03:52:15 by doxygen 1.8.17 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.