Akonadi

core/session.cpp
1 /*
2  SPDX-FileCopyrightText: 2007 Volker Krause <[email protected]>
3 
4  SPDX-License-Identifier: LGPL-2.0-or-later
5 */
6 
7 #include "session.h"
8 #include "session_p.h"
9 
10 #include "job.h"
11 #include "job_p.h"
12 #include "private/protocol_p.h"
13 #include "protocolhelper_p.h"
14 #include "servermanager.h"
15 #include "servermanager_p.h"
16 #include "sessionthread_p.h"
17 
18 #include "akonadicore_debug.h"
19 
20 #include <KLocalizedString>
21 
22 #include <QCoreApplication>
23 #include <QPointer>
24 #include <QRandomGenerator>
25 #include <QThread>
26 #include <QThreadStorage>
27 #include <QTimer>
28 
29 #include <QApplication>
30 #include <QHostAddress>
31 
32 // ### FIXME pipelining got broken by switching result emission in JobPrivate::handleResponse to delayed emission
33 // in order to work around exec() deadlocks. As a result of that Session knows to late about a finished job and still
34 // sends responses for the next one to the already finished one
35 #define PIPELINE_LENGTH 0
36 //#define PIPELINE_LENGTH 2
37 
38 using namespace Akonadi;
39 using namespace std::chrono_literals;
40 /// @cond PRIVATE
41 
42 void SessionPrivate::startNext()
43 {
44  QTimer::singleShot(0, mParent, [this]() {
45  doStartNext();
46  });
47 }
48 
49 void SessionPrivate::reconnect()
50 {
51  if (!connection) {
52  connection = new Connection(Connection::CommandConnection, sessionId, &mCommandBuffer);
53  sessionThread()->addConnection(connection);
54  mParent->connect(connection, &Connection::reconnected, mParent, &Session::reconnected, Qt::QueuedConnection);
55  mParent->connect(
56  connection,
57  &Connection::socketDisconnected,
58  mParent,
59  [this]() {
60  socketDisconnected();
61  },
63  mParent->connect(
64  connection,
65  &Connection::socketError,
66  mParent,
67  [this](const QString &error) {
68  socketError(error);
69  },
71  }
72 
73  connection->reconnect();
74 }
75 
76 void SessionPrivate::socketError(const QString &error)
77 {
78  qCWarning(AKONADICORE_LOG) << "Socket error occurred:" << error;
79  socketDisconnected();
80 }
81 
82 void SessionPrivate::socketDisconnected()
83 {
84  if (currentJob) {
85  currentJob->d_ptr->lostConnection();
86  }
87  connected = false;
88 }
89 
90 bool SessionPrivate::handleCommands()
91 {
92  CommandBufferLocker lock(&mCommandBuffer);
93  CommandBufferNotifyBlocker notify(&mCommandBuffer);
94  while (!mCommandBuffer.isEmpty()) {
95  const auto command = mCommandBuffer.dequeue();
96  lock.unlock();
97  const auto cmd = command.command;
98  const auto tag = command.tag;
99 
100  // Handle Hello response -> send Login
101  if (cmd->type() == Protocol::Command::Hello) {
102  const auto &hello = Protocol::cmdCast<Protocol::HelloResponse>(cmd);
103  if (hello.isError()) {
104  qCWarning(AKONADICORE_LOG) << "Error when establishing connection with Akonadi server:" << hello.errorMessage();
105  connection->closeConnection();
106  QTimer::singleShot(1s, connection, &Connection::reconnect);
107  return false;
108  }
109 
110  qCDebug(AKONADICORE_LOG) << "Connected to" << hello.serverName() << ", using protocol version" << hello.protocolVersion();
111  qCDebug(AKONADICORE_LOG) << "Server generation:" << hello.generation();
112  qCDebug(AKONADICORE_LOG) << "Server says:" << hello.message();
113  // Version mismatch is handled in SessionPrivate::startJob() so that
114  // we can report the error out via KJob API
115  protocolVersion = hello.protocolVersion();
116  Internal::setServerProtocolVersion(protocolVersion);
117  Internal::setGeneration(hello.generation());
118 
119  sendCommand(nextTag(), Protocol::LoginCommandPtr::create(sessionId));
120  } else if (cmd->type() == Protocol::Command::Login) {
121  const auto &login = Protocol::cmdCast<Protocol::LoginResponse>(cmd);
122  if (login.isError()) {
123  qCWarning(AKONADICORE_LOG) << "Unable to login to Akonadi server:" << login.errorMessage();
124  connection->closeConnection();
125  QTimer::singleShot(1s, mParent, [this]() {
126  reconnect();
127  });
128  return false;
129  }
130 
131  connected = true;
132  startNext();
133  } else if (currentJob) {
134  currentJob->d_ptr->handleResponse(tag, cmd);
135  }
136 
137  lock.relock();
138  }
139 
140  return true;
141 }
142 
143 bool SessionPrivate::canPipelineNext()
144 {
145  if (queue.isEmpty() || pipeline.count() >= PIPELINE_LENGTH) {
146  return false;
147  }
148  if (pipeline.isEmpty() && currentJob) {
149  return currentJob->d_ptr->mWriteFinished;
150  }
151  if (!pipeline.isEmpty()) {
152  return pipeline.last()->d_ptr->mWriteFinished;
153  }
154  return false;
155 }
156 
157 void SessionPrivate::doStartNext()
158 {
159  if (!connected || (queue.isEmpty() && pipeline.isEmpty())) {
160  return;
161  }
162  if (canPipelineNext()) {
163  Akonadi::Job *nextJob = queue.dequeue();
164  pipeline.enqueue(nextJob);
165  startJob(nextJob);
166  }
167  if (jobRunning) {
168  return;
169  }
170  jobRunning = true;
171  if (!pipeline.isEmpty()) {
172  currentJob = pipeline.dequeue();
173  } else {
174  currentJob = queue.dequeue();
175  startJob(currentJob);
176  }
177 }
178 
179 void SessionPrivate::startJob(Job *job)
180 {
181  if (protocolVersion != Protocol::version()) {
183  if (protocolVersion < Protocol::version()) {
184  job->setErrorText(
185  i18n("Protocol version mismatch. Server version is older (%1) than ours (%2). "
186  "If you updated your system recently please restart the Akonadi server.",
187  protocolVersion,
188  Protocol::version()));
189  qCWarning(AKONADICORE_LOG) << "Protocol version mismatch. Server version is older (" << protocolVersion << ") than ours (" << Protocol::version()
190  << "). "
191  "If you updated your system recently please restart the Akonadi server.";
192  } else {
193  job->setErrorText(
194  i18n("Protocol version mismatch. Server version is newer (%1) than ours (%2). "
195  "If you updated your system recently please restart all KDE PIM applications.",
196  protocolVersion,
197  Protocol::version()));
198  qCWarning(AKONADICORE_LOG) << "Protocol version mismatch. Server version is newer (" << protocolVersion << ") than ours (" << Protocol::version()
199  << "). "
200  "If you updated your system recently please restart all KDE PIM applications.";
201  }
202  job->emitResult();
203  } else {
204  job->d_ptr->startQueued();
205  }
206 }
207 
208 void SessionPrivate::endJob(Job *job)
209 {
210  job->emitResult();
211 }
212 
213 void SessionPrivate::jobDone(KJob *job)
214 {
215  // ### careful, this method can be called from the QObject dtor of job (see jobDestroyed() below)
216  // so don't call any methods on job itself
217  if (job == currentJob) {
218  if (pipeline.isEmpty()) {
219  jobRunning = false;
220  currentJob = nullptr;
221  } else {
222  currentJob = pipeline.dequeue();
223  }
224  startNext();
225  } else {
226  // non-current job finished, likely canceled while still in the queue
227  queue.removeAll(static_cast<Akonadi::Job *>(job));
228  // ### likely not enough to really cancel already running jobs
229  pipeline.removeAll(static_cast<Akonadi::Job *>(job));
230  }
231 }
232 
233 void SessionPrivate::jobWriteFinished(Akonadi::Job *job)
234 {
235  Q_ASSERT((job == currentJob && pipeline.isEmpty()) || (job = pipeline.last()));
236  Q_UNUSED(job)
237 
238  startNext();
239 }
240 
241 void SessionPrivate::jobDestroyed(QObject *job)
242 {
243  // careful, accessing non-QObject methods of job will fail here already
244  jobDone(static_cast<KJob *>(job));
245 }
246 
247 void SessionPrivate::addJob(Job *job)
248 {
249  queue.append(job);
250  QObject::connect(job, &KJob::result, mParent, [this](KJob *job) {
251  jobDone(job);
252  });
253  QObject::connect(job, &Job::writeFinished, mParent, [this](Job *job) {
254  jobWriteFinished(job);
255  });
256  QObject::connect(job, &QObject::destroyed, mParent, [this](QObject *o) {
257  jobDestroyed(o);
258  });
259  startNext();
260 }
261 
262 void SessionPrivate::publishOtherJobs(Job *thanThisJob)
263 {
264  int count = 0;
265  for (const auto &job : std::as_const(queue)) {
266  if (job != thanThisJob) {
267  job->d_ptr->publishJob();
268  ++count;
269  }
270  }
271  if (count > 0) {
272  qCDebug(AKONADICORE_LOG) << "published" << count << "pending jobs to the job tracker";
273  }
274  if (currentJob && currentJob != thanThisJob) {
275  currentJob->d_ptr->signalStartedToJobTracker();
276  }
277 }
278 
279 qint64 SessionPrivate::nextTag()
280 {
281  return theNextTag++;
282 }
283 
284 void SessionPrivate::sendCommand(qint64 tag, const Protocol::CommandPtr &command)
285 {
286  connection->sendCommand(tag, command);
287 }
288 
289 void SessionPrivate::serverStateChanged(ServerManager::State state)
290 {
291  if (state == ServerManager::Running && !connected) {
292  reconnect();
293  } else if (!connected && state == ServerManager::Broken) {
294  // If the server is broken, cancel all pending jobs, otherwise they will be
295  // blocked forever and applications waiting for them to finish would be stuck
296  for (Job *job : std::as_const(queue)) {
298  job->kill(KJob::EmitResult);
299  }
300  } else if (state == ServerManager::Stopping) {
301  sessionThread()->destroyConnection(connection);
302  connection = nullptr;
303  }
304 }
305 
306 void SessionPrivate::itemRevisionChanged(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
307 {
308  // only deal with the queue, for the guys in the pipeline it's too late already anyway
309  // and they shouldn't have gotten there if they depend on a preceding job anyway.
310  for (Job *job : std::as_const(queue)) {
311  job->d_ptr->updateItemRevision(itemId, oldRevision, newRevision);
312  }
313 }
314 
315 /// @endcond
316 
317 SessionPrivate::SessionPrivate(Session *parent)
318  : mParent(parent)
319  , mSessionThread(new SessionThread)
320  , connection(nullptr)
321  , protocolVersion(0)
322  , mCommandBuffer(parent, "handleCommands")
323  , currentJob(nullptr)
324 {
325  // Shutdown the thread before QApplication event loop quits - the
326  // thread()->wait() mechanism in Connection dtor crashes sometimes
327  // when called from QApplication destructor
328  connThreadCleanUp = QObject::connect(qApp, &QCoreApplication::aboutToQuit, [this]() {
329  delete mSessionThread;
330  mSessionThread = nullptr;
331  });
332 }
333 
334 SessionPrivate::~SessionPrivate()
335 {
336  QObject::disconnect(connThreadCleanUp);
337  delete mSessionThread;
338 }
339 
340 void SessionPrivate::init(const QByteArray &id)
341 {
342  if (!id.isEmpty()) {
343  sessionId = id;
344  } else {
346  }
347 
348  qCDebug(AKONADICORE_LOG) << "Initializing session with ID" << id;
349 
350  connected = false;
351  theNextTag = 2;
352  jobRunning = false;
353 
356  }
358  serverStateChanged(state);
359  });
360  reconnect();
361 }
362 
363 void SessionPrivate::forceReconnect()
364 {
365  jobRunning = false;
366  connected = false;
367  if (connection) {
368  connection->forceReconnect();
369  }
371  mParent,
372  [this]() {
373  reconnect();
374  },
376 }
377 
378 Session::Session(const QByteArray &sessionId, QObject *parent)
379  : QObject(parent)
380  , d(new SessionPrivate(this))
381 {
382  d->init(sessionId);
383 }
384 
385 Session::Session(SessionPrivate *dd, const QByteArray &sessionId, QObject *parent)
386  : QObject(parent)
387  , d(dd)
388 {
389  d->mParent = this;
390  d->init(sessionId);
391 }
392 
394 {
395  d->clear(false);
396 }
397 
399 {
400  return d->sessionId;
401 }
402 
403 Q_GLOBAL_STATIC(QThreadStorage<QPointer<Session>>, instances) // NOLINT(readability-redundant-member-init)
404 
405 void SessionPrivate::createDefaultSession(const QByteArray &sessionId)
406 {
407  Q_ASSERT_X(!sessionId.isEmpty(), "SessionPrivate::createDefaultSession", "You tried to create a default session with empty session id!");
408  Q_ASSERT_X(!instances()->hasLocalData(), "SessionPrivate::createDefaultSession", "You tried to create a default session twice!");
409 
410  auto session = new Session(sessionId);
411  setDefaultSession(session);
412 }
413 
414 void SessionPrivate::setDefaultSession(Session *session)
415 {
416  instances()->setLocalData({session});
418  instances()->setLocalData({});
419  });
420 }
421 
423 {
424  if (!instances()->hasLocalData()) {
425  auto session = new Session();
426  SessionPrivate::setDefaultSession(session);
427  }
428  return instances()->localData().data();
429 }
430 
432 {
433  d->clear(true);
434 }
435 
436 void SessionPrivate::clear(bool forceReconnect)
437 {
438  for (Job *job : std::as_const(queue)) {
439  job->kill(KJob::EmitResult); // safe, not started yet
440  }
441  queue.clear();
442  for (Job *job : std::as_const(pipeline)) {
443  job->d_ptr->mStarted = false; // avoid killing/reconnect loops
444  job->kill(KJob::EmitResult);
445  }
446  pipeline.clear();
447  if (currentJob) {
448  currentJob->d_ptr->mStarted = false; // avoid killing/reconnect loops
449  currentJob->kill(KJob::EmitResult);
450  }
451 
452  if (forceReconnect) {
453  this->forceReconnect();
454  }
455 }
456 
457 #include "moc_session.cpp"
static bool start()
Starts the server.
static State state()
Returns the state of the server.
bool disconnect(const QObject *sender, const char *signal, const QObject *receiver, const char *method)
void setErrorText(const QString &errorText)
@ Broken
Server is not operational and an error has been detected.
Definition: servermanager.h:41
void result(KJob *job)
void reconnected()
This signal is emitted whenever the session has been reconnected to the server (e....
@ ProtocolVersionMismatch
The server protocol version is too old or too new.
Definition: job.h:100
static ServerManager * self()
Returns the singleton instance of this class, for connecting to its signals.
void stateChanged(Akonadi::ServerManager::State state)
Emitted whenever the server state changes.
QByteArray number(int n, int base)
@ Running
Server is running and operational.
Definition: servermanager.h:39
void generate(const QStringList &source, QStringList &target)
@ ConnectionFailed
The connection to the Akonadi server failed.
Definition: job.h:99
bool kill(KillVerbosity verbosity=Quietly)
QMetaObject::Connection connect(const QObject *sender, const char *signal, const QObject *receiver, const char *method, Qt::ConnectionType type)
Q_GLOBAL_STATIC(Internal::StaticControl, s_instance) class ControlPrivate
Definition: control.cpp:28
qint64 Id
Describes the unique id type.
Definition: item.h:111
void destroyed(QObject *obj)
@ NotRunning
Server is not running, could be no one started it yet or it failed to start.
Definition: servermanager.h:37
QString i18n(const char *text, const TYPE &arg...)
static Session * defaultSession()
Returns the default session for this thread.
QByteArray sessionId() const
Returns the session identifier.
A communication session with the Akonadi storage.
Definition: core/session.h:55
QCoreApplication * instance()
void writeFinished(Akonadi::Job *job)
This signal is emitted if the job has finished all write operations, ie.
QueuedConnection
Base class for all actions in the Akonadi storage.
Definition: job.h:80
void clear()
Stops all jobs queued for execution.
void error(QWidget *parent, const QString &text, const QString &title, const KGuiItem &buttonOk, Options options=Notify)
~Session() override
Destroys the session.
bool isEmpty() const const
State
Enum for the various states the server can be in.
Definition: servermanager.h:36
bool invokeMethod(QObject *obj, const char *member, Qt::ConnectionType type, QGenericReturnArgument ret, QGenericArgument val0, QGenericArgument val1, QGenericArgument val2, QGenericArgument val3, QGenericArgument val4, QGenericArgument val5, QGenericArgument val6, QGenericArgument val7, QGenericArgument val8, QGenericArgument val9)
Session(const QByteArray &sessionId=QByteArray(), QObject *parent=nullptr)
Creates a new session.
void emitResult()
KLEO_EXPORT std::unique_ptr< GpgME::DefaultAssuanTransaction > sendCommand(std::shared_ptr< GpgME::Context > &assuanContext, const std::string &command, GpgME::Error &err)
@ Stopping
Server is shutting down.
Definition: servermanager.h:40
void setError(int errorCode)
QRandomGenerator * global()
Helper integration between Akonadi and Qt.
This file is part of the KDE documentation.
Documentation copyright © 1996-2023 The KDE developers.
Generated on Tue Feb 7 2023 03:58:07 by doxygen 1.8.17 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.