Akonadi

core/session.cpp
1/*
2 SPDX-FileCopyrightText: 2007 Volker Krause <vkrause@kde.org>
3
4 SPDX-License-Identifier: LGPL-2.0-or-later
5*/
6
7#include "session.h"
8#include "session_p.h"
9
10#include "job.h"
11#include "job_p.h"
12#include "private/protocol_p.h"
13#include "protocolhelper_p.h"
14#include "servermanager.h"
15#include "servermanager_p.h"
16#include "sessionthread_p.h"
17
18#include "akonadicore_debug.h"
19
20#include <KLocalizedString>
21
22#include <QCoreApplication>
23#include <QPointer>
24#include <QRandomGenerator>
25#include <QThread>
26#include <QThreadStorage>
27#include <QTimer>
28
29#include <QApplication>
30#include <QHostAddress>
31
32// ### FIXME pipelining got broken by switching result emission in JobPrivate::handleResponse to delayed emission
33// in order to work around exec() deadlocks. As a result of that Session knows to late about a finished job and still
34// sends responses for the next one to the already finished one
35#define PIPELINE_LENGTH 0
36// #define PIPELINE_LENGTH 2
37
38using namespace Akonadi;
39using namespace std::chrono_literals;
40/// @cond PRIVATE
41
42void SessionPrivate::startNext()
43{
44 QTimer::singleShot(0, mParent, [this]() {
45 doStartNext();
46 });
47}
48
49void SessionPrivate::reconnect()
50{
51 if (!connection) {
52 connection = new Connection(Connection::CommandConnection, sessionId, &mCommandBuffer);
53 sessionThread()->addConnection(connection);
54 mParent->connect(connection, &Connection::reconnected, mParent, &Session::reconnected, Qt::QueuedConnection);
55 mParent->connect(
56 connection,
57 &Connection::socketDisconnected,
58 mParent,
59 [this]() {
60 socketDisconnected();
61 },
63 mParent->connect(
64 connection,
65 &Connection::socketError,
66 mParent,
67 [this](const QString &error) {
68 socketError(error);
69 },
71 }
72
73 connection->reconnect();
74}
75
76void SessionPrivate::socketError(const QString &error)
77{
78 qCWarning(AKONADICORE_LOG) << "Socket error occurred:" << error;
79 socketDisconnected();
80}
81
82void SessionPrivate::socketDisconnected()
83{
84 if (currentJob) {
85 currentJob->d_ptr->lostConnection();
86 }
87 connected = false;
88}
89
90bool SessionPrivate::handleCommands()
91{
92 CommandBufferLocker lock(&mCommandBuffer);
93 CommandBufferNotifyBlocker notify(&mCommandBuffer);
94 while (!mCommandBuffer.isEmpty()) {
95 const auto command = mCommandBuffer.dequeue();
96 lock.unlock();
97 const auto cmd = command.command;
98 const auto tag = command.tag;
99
100 // Handle Hello response -> send Login
101 if (cmd->type() == Protocol::Command::Hello) {
102 const auto &hello = Protocol::cmdCast<Protocol::HelloResponse>(cmd);
103 if (hello.isError()) {
104 qCWarning(AKONADICORE_LOG) << "Error when establishing connection with Akonadi server:" << hello.errorMessage();
105 connection->closeConnection();
106 QTimer::singleShot(1s, connection, &Connection::reconnect);
107 return false;
108 }
109
110 qCDebug(AKONADICORE_LOG) << "Connected to" << hello.serverName() << ", using protocol version" << hello.protocolVersion();
111 qCDebug(AKONADICORE_LOG) << "Server generation:" << hello.generation();
112 qCDebug(AKONADICORE_LOG) << "Server says:" << hello.message();
113 // Version mismatch is handled in SessionPrivate::startJob() so that
114 // we can report the error out via KJob API
115 protocolVersion = hello.protocolVersion();
116 Internal::setServerProtocolVersion(protocolVersion);
117 Internal::setGeneration(hello.generation());
118
119 sendCommand(nextTag(), Protocol::LoginCommandPtr::create(sessionId));
120 } else if (cmd->type() == Protocol::Command::Login) {
121 const auto &login = Protocol::cmdCast<Protocol::LoginResponse>(cmd);
122 if (login.isError()) {
123 qCWarning(AKONADICORE_LOG) << "Unable to login to Akonadi server:" << login.errorMessage();
124 connection->closeConnection();
125 QTimer::singleShot(1s, mParent, [this]() {
126 reconnect();
127 });
128 return false;
129 }
130
131 connected = true;
132 startNext();
133 } else if (currentJob) {
134 currentJob->d_ptr->handleResponse(tag, cmd);
135 }
136
137 lock.relock();
138 }
139
140 return true;
141}
142
143bool SessionPrivate::canPipelineNext()
144{
145 if (queue.isEmpty() || pipeline.count() >= PIPELINE_LENGTH) {
146 return false;
147 }
148 if (pipeline.isEmpty() && currentJob) {
149 return currentJob->d_ptr->mWriteFinished;
150 }
151 if (!pipeline.isEmpty()) {
152 return pipeline.last()->d_ptr->mWriteFinished;
153 }
154 return false;
155}
156
157void SessionPrivate::doStartNext()
158{
159 if (!connected || (queue.isEmpty() && pipeline.isEmpty())) {
160 return;
161 }
162 if (canPipelineNext()) {
163 Akonadi::Job *nextJob = queue.dequeue();
164 pipeline.enqueue(nextJob);
165 startJob(nextJob);
166 }
167 if (jobRunning) {
168 return;
169 }
170 jobRunning = true;
171 if (!pipeline.isEmpty()) {
172 currentJob = pipeline.dequeue();
173 } else {
174 currentJob = queue.dequeue();
175 startJob(currentJob);
176 }
177}
178
179void SessionPrivate::startJob(Job *job)
180{
181 if (protocolVersion != Protocol::version()) {
183 if (protocolVersion < Protocol::version()) {
184 job->setErrorText(
185 i18n("Protocol version mismatch. Server version is older (%1) than ours (%2). "
186 "If you updated your system recently please restart the Akonadi server.",
187 protocolVersion,
188 Protocol::version()));
189 qCWarning(AKONADICORE_LOG) << "Protocol version mismatch. Server version is older (" << protocolVersion << ") than ours (" << Protocol::version()
190 << "). "
191 "If you updated your system recently please restart the Akonadi server.";
192 } else {
193 job->setErrorText(
194 i18n("Protocol version mismatch. Server version is newer (%1) than ours (%2). "
195 "If you updated your system recently please restart all KDE PIM applications.",
196 protocolVersion,
197 Protocol::version()));
198 qCWarning(AKONADICORE_LOG) << "Protocol version mismatch. Server version is newer (" << protocolVersion << ") than ours (" << Protocol::version()
199 << "). "
200 "If you updated your system recently please restart all KDE PIM applications.";
201 }
202 job->emitResult();
203 } else {
204 job->d_ptr->startQueued();
205 }
206}
207
208void SessionPrivate::endJob(Job *job)
209{
210 job->emitResult();
211}
212
213void SessionPrivate::jobDone(KJob *job)
214{
215 // ### careful, this method can be called from the QObject dtor of job (see jobDestroyed() below)
216 // so don't call any methods on job itself
217 if (job == currentJob) {
218 if (pipeline.isEmpty()) {
219 jobRunning = false;
220 currentJob = nullptr;
221 } else {
222 currentJob = pipeline.dequeue();
223 }
224 startNext();
225 } else {
226 // non-current job finished, likely canceled while still in the queue
227 queue.removeAll(static_cast<Akonadi::Job *>(job));
228 // ### likely not enough to really cancel already running jobs
229 pipeline.removeAll(static_cast<Akonadi::Job *>(job));
230 }
231}
232
233void SessionPrivate::jobWriteFinished(Akonadi::Job *job)
234{
235 Q_ASSERT((job == currentJob && pipeline.isEmpty()) || (job = pipeline.last()));
236 Q_UNUSED(job)
237
238 startNext();
239}
240
241void SessionPrivate::jobDestroyed(QObject *job)
242{
243 // careful, accessing non-QObject methods of job will fail here already
244 jobDone(static_cast<KJob *>(job));
245}
246
247void SessionPrivate::addJob(Job *job)
248{
249 queue.append(job);
250 QObject::connect(job, &KJob::result, mParent, [this](KJob *job) {
251 jobDone(job);
252 });
253 QObject::connect(job, &Job::writeFinished, mParent, [this](Job *job) {
254 jobWriteFinished(job);
255 });
256 QObject::connect(job, &QObject::destroyed, mParent, [this](QObject *o) {
257 jobDestroyed(o);
258 });
259 startNext();
260}
261
262void SessionPrivate::publishOtherJobs(Job *thanThisJob)
263{
264 int count = 0;
265 for (const auto &job : std::as_const(queue)) {
266 if (job != thanThisJob) {
267 job->d_ptr->publishJob();
268 ++count;
269 }
270 }
271 if (count > 0) {
272 qCDebug(AKONADICORE_LOG) << "published" << count << "pending jobs to the job tracker";
273 }
274 if (currentJob && currentJob != thanThisJob) {
275 currentJob->d_ptr->signalStartedToJobTracker();
276 }
277}
278
279qint64 SessionPrivate::nextTag()
280{
281 return theNextTag++;
282}
283
284void SessionPrivate::sendCommand(qint64 tag, const Protocol::CommandPtr &command)
285{
286 connection->sendCommand(tag, command);
287}
288
289void SessionPrivate::serverStateChanged(ServerManager::State state)
290{
291 if (state == ServerManager::Running && !connected) {
292 reconnect();
293 } else if (!connected && state == ServerManager::Broken) {
294 // If the server is broken, cancel all pending jobs, otherwise they will be
295 // blocked forever and applications waiting for them to finish would be stuck
296 auto q = queue;
297 for (Job *job : q) {
299 job->kill(KJob::EmitResult);
300 }
301 } else if (state == ServerManager::Stopping) {
302 sessionThread()->destroyConnection(connection);
303 connection = nullptr;
304 }
305}
306
307void SessionPrivate::itemRevisionChanged(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
308{
309 // only deal with the queue, for the guys in the pipeline it's too late already anyway
310 // and they shouldn't have gotten there if they depend on a preceding job anyway.
311 for (Job *job : std::as_const(queue)) {
312 job->d_ptr->updateItemRevision(itemId, oldRevision, newRevision);
313 }
314}
315
316/// @endcond
317
318SessionPrivate::SessionPrivate(Session *parent)
319 : mParent(parent)
320 , mSessionThread(new SessionThread)
321 , connection(nullptr)
322 , protocolVersion(0)
323 , mCommandBuffer(parent, "handleCommands")
324 , currentJob(nullptr)
325{
326 // Shutdown the thread before QApplication event loop quits - the
327 // thread()->wait() mechanism in Connection dtor crashes sometimes
328 // when called from QApplication destructor
329 connThreadCleanUp = QObject::connect(qApp, &QCoreApplication::aboutToQuit, qApp, [this]() {
330 socketDisconnected();
331 connection = nullptr;
332
333 delete mSessionThread;
334 mSessionThread = nullptr;
335 });
336}
337
338SessionPrivate::~SessionPrivate()
339{
340 QObject::disconnect(connThreadCleanUp);
341 delete mSessionThread;
342}
343
344void SessionPrivate::init(const QByteArray &id)
345{
346 if (!id.isEmpty()) {
347 sessionId = id;
348 } else {
349 sessionId = QCoreApplication::instance()->applicationName().toUtf8() + '-' + QByteArray::number(QRandomGenerator::global()->generate());
350 }
351
352 qCDebug(AKONADICORE_LOG) << "Initializing session with ID" << id;
353
354 connected = false;
355 theNextTag = 2;
356 jobRunning = false;
357
358 if (ServerManager::state() == ServerManager::NotRunning) {
359 ServerManager::start();
360 }
361 QObject::connect(ServerManager::self(), &ServerManager::stateChanged, mParent, [this](ServerManager::State state) {
362 serverStateChanged(state);
363 });
364 reconnect();
365}
366
367void SessionPrivate::forceReconnect()
368{
369 jobRunning = false;
370 connected = false;
371 if (connection) {
372 connection->forceReconnect();
373 }
375 mParent,
376 [this]() {
377 reconnect();
378 },
380}
381
382Session::Session(const QByteArray &sessionId, QObject *parent)
383 : QObject(parent)
384 , d(new SessionPrivate(this))
385{
386 d->init(sessionId);
387}
388
389Session::Session(SessionPrivate *dd, const QByteArray &sessionId, QObject *parent)
390 : QObject(parent)
391 , d(dd)
392{
393 d->mParent = this;
394 d->init(sessionId);
395}
396
398{
399 d->clear(false);
400}
401
403{
404 return d->sessionId;
405}
406
407Q_GLOBAL_STATIC(QThreadStorage<QPointer<Session>>, instances) // NOLINT(readability-redundant-member-init)
408
409void SessionPrivate::createDefaultSession(const QByteArray &sessionId)
410{
411 Q_ASSERT_X(!sessionId.isEmpty(), "SessionPrivate::createDefaultSession", "You tried to create a default session with empty session id!");
412 Q_ASSERT_X(!instances()->hasLocalData(), "SessionPrivate::createDefaultSession", "You tried to create a default session twice!");
413
414 auto session = new Session(sessionId);
415 setDefaultSession(session);
416}
417
418void SessionPrivate::setDefaultSession(Session *session)
419{
420 instances()->setLocalData({session});
422 instances()->setLocalData({});
423 });
424}
425
427{
428 if (!instances()->hasLocalData()) {
429 auto session = new Session();
430 SessionPrivate::setDefaultSession(session);
431 }
432 return instances()->localData().data();
433}
434
436{
437 d->clear(true);
438}
439
440void SessionPrivate::clear(bool forceReconnect)
441{
442 auto q = queue;
443 for (Job *job : q) {
444 job->kill(KJob::EmitResult); // safe, not started yet
445 }
446 queue.clear();
447 auto p = pipeline;
448 for (Job *job : p) {
449 job->d_ptr->mStarted = false; // avoid killing/reconnect loops
450 job->kill(KJob::EmitResult);
451 }
452 pipeline.clear();
453 if (currentJob) {
454 currentJob->d_ptr->mStarted = false; // avoid killing/reconnect loops
455 currentJob->kill(KJob::EmitResult);
456 }
457
458 if (forceReconnect) {
459 this->forceReconnect();
460 }
461}
462
463#include "moc_session.cpp"
qint64 Id
Describes the unique id type.
Definition item.h:105
Base class for all actions in the Akonadi storage.
Definition job.h:81
@ ProtocolVersionMismatch
The server protocol version is too old or too new.
Definition job.h:100
@ ConnectionFailed
The connection to the Akonadi server failed.
Definition job.h:99
void writeFinished(Akonadi::Job *job)
This signal is emitted if the job has finished all write operations, ie.
State
Enum for the various states the server can be in.
A communication session with the Akonadi storage.
~Session() override
Destroys the session.
void clear()
Stops all jobs queued for execution.
static Session * defaultSession()
Returns the default session for this thread.
Session(const QByteArray &sessionId=QByteArray(), QObject *parent=nullptr)
Creates a new session.
QByteArray sessionId() const
Returns the session identifier.
void setErrorText(const QString &errorText)
void emitResult()
void result(KJob *job)
void setError(int errorCode)
bool kill(KJob::KillVerbosity verbosity=KJob::Quietly)
ASAP CLI session.
QString i18n(const char *text, const TYPE &arg...)
Helper integration between Akonadi and Qt.
void error(QWidget *parent, const QString &text, const QString &title, const KGuiItem &buttonOk, Options options=Notify)
KLEO_EXPORT std::unique_ptr< GpgME::DefaultAssuanTransaction > sendCommand(std::shared_ptr< GpgME::Context > &assuanContext, const std::string &command, GpgME::Error &err)
bool isEmpty() const const
QByteArray number(double n, char format, int precision)
QCoreApplication * instance()
bool invokeMethod(QObject *context, Functor &&function, FunctorReturnType *ret)
QMetaObject::Connection connect(const QObject *sender, PointerToMemberFunction signal, Functor functor)
void destroyed(QObject *obj)
bool disconnect(const QMetaObject::Connection &connection)
QRandomGenerator * global()
QueuedConnection
This file is part of the KDE documentation.
Documentation copyright © 1996-2025 The KDE developers.
Generated on Fri Jan 3 2025 11:58:20 by doxygen 1.12.0 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.