Akonadi

core/session.cpp
1 /*
2  SPDX-FileCopyrightText: 2007 Volker Krause <[email protected]>
3 
4  SPDX-License-Identifier: LGPL-2.0-or-later
5 */
6 
7 #include "session.h"
8 #include "session_p.h"
9 
10 #include "job.h"
11 #include "job_p.h"
12 #include "private/protocol_p.h"
13 #include "private/standarddirs_p.h"
14 #include "protocolhelper_p.h"
15 #include "servermanager.h"
16 #include "servermanager_p.h"
17 #include "sessionthread_p.h"
18 
19 #include "akonadicore_debug.h"
20 
21 #include <KLocalizedString>
22 
23 #include <QCoreApplication>
24 #include <QPointer>
25 #include <QRandomGenerator>
26 #include <QThread>
27 #include <QThreadStorage>
28 #include <QTimer>
29 
30 #include <QApplication>
31 #include <QHostAddress>
32 
33 // ### FIXME pipelining got broken by switching result emission in JobPrivate::handleResponse to delayed emission
34 // in order to work around exec() deadlocks. As a result of that Session knows to late about a finished job and still
35 // sends responses for the next one to the already finished one
36 #define PIPELINE_LENGTH 0
37 //#define PIPELINE_LENGTH 2
38 
39 using namespace Akonadi;
40 using namespace std::chrono_literals;
41 /// @cond PRIVATE
42 
43 void SessionPrivate::startNext()
44 {
45  QTimer::singleShot(0, mParent, [this]() {
46  doStartNext();
47  });
48 }
49 
50 void SessionPrivate::reconnect()
51 {
52  if (!connection) {
53  connection = new Connection(Connection::CommandConnection, sessionId, &mCommandBuffer);
54  sessionThread()->addConnection(connection);
55  mParent->connect(connection, &Connection::reconnected, mParent, &Session::reconnected, Qt::QueuedConnection);
56  mParent->connect(
57  connection,
58  &Connection::socketDisconnected,
59  mParent,
60  [this]() {
61  socketDisconnected();
62  },
64  mParent->connect(
65  connection,
66  &Connection::socketError,
67  mParent,
68  [this](const QString &error) {
69  socketError(error);
70  },
72  }
73 
74  connection->reconnect();
75 }
76 
77 void SessionPrivate::socketError(const QString &error)
78 {
79  qCWarning(AKONADICORE_LOG) << "Socket error occurred:" << error;
80  socketDisconnected();
81 }
82 
83 void SessionPrivate::socketDisconnected()
84 {
85  if (currentJob) {
86  currentJob->d_ptr->lostConnection();
87  }
88  connected = false;
89 }
90 
91 bool SessionPrivate::handleCommands()
92 {
93  CommandBufferLocker lock(&mCommandBuffer);
94  CommandBufferNotifyBlocker notify(&mCommandBuffer);
95  while (!mCommandBuffer.isEmpty()) {
96  const auto command = mCommandBuffer.dequeue();
97  lock.unlock();
98  const auto cmd = command.command;
99  const auto tag = command.tag;
100 
101  // Handle Hello response -> send Login
102  if (cmd->type() == Protocol::Command::Hello) {
103  const auto &hello = Protocol::cmdCast<Protocol::HelloResponse>(cmd);
104  if (hello.isError()) {
105  qCWarning(AKONADICORE_LOG) << "Error when establishing connection with Akonadi server:" << hello.errorMessage();
106  connection->closeConnection();
107  QTimer::singleShot(1s, connection, &Connection::reconnect);
108  return false;
109  }
110 
111  qCDebug(AKONADICORE_LOG) << "Connected to" << hello.serverName() << ", using protocol version" << hello.protocolVersion();
112  qCDebug(AKONADICORE_LOG) << "Server generation:" << hello.generation();
113  qCDebug(AKONADICORE_LOG) << "Server says:" << hello.message();
114  // Version mismatch is handled in SessionPrivate::startJob() so that
115  // we can report the error out via KJob API
116  protocolVersion = hello.protocolVersion();
117  Internal::setServerProtocolVersion(protocolVersion);
118  Internal::setGeneration(hello.generation());
119 
120  sendCommand(nextTag(), Protocol::LoginCommandPtr::create(sessionId));
121  } else if (cmd->type() == Protocol::Command::Login) {
122  const auto &login = Protocol::cmdCast<Protocol::LoginResponse>(cmd);
123  if (login.isError()) {
124  qCWarning(AKONADICORE_LOG) << "Unable to login to Akonadi server:" << login.errorMessage();
125  connection->closeConnection();
126  QTimer::singleShot(1s, mParent, [this]() {
127  reconnect();
128  });
129  return false;
130  }
131 
132  connected = true;
133  startNext();
134  } else if (currentJob) {
135  currentJob->d_ptr->handleResponse(tag, cmd);
136  }
137 
138  lock.relock();
139  }
140 
141  return true;
142 }
143 
144 bool SessionPrivate::canPipelineNext()
145 {
146  if (queue.isEmpty() || pipeline.count() >= PIPELINE_LENGTH) {
147  return false;
148  }
149  if (pipeline.isEmpty() && currentJob) {
150  return currentJob->d_ptr->mWriteFinished;
151  }
152  if (!pipeline.isEmpty()) {
153  return pipeline.last()->d_ptr->mWriteFinished;
154  }
155  return false;
156 }
157 
158 void SessionPrivate::doStartNext()
159 {
160  if (!connected || (queue.isEmpty() && pipeline.isEmpty())) {
161  return;
162  }
163  if (canPipelineNext()) {
164  Akonadi::Job *nextJob = queue.dequeue();
165  pipeline.enqueue(nextJob);
166  startJob(nextJob);
167  }
168  if (jobRunning) {
169  return;
170  }
171  jobRunning = true;
172  if (!pipeline.isEmpty()) {
173  currentJob = pipeline.dequeue();
174  } else {
175  currentJob = queue.dequeue();
176  startJob(currentJob);
177  }
178 }
179 
180 void SessionPrivate::startJob(Job *job)
181 {
182  if (protocolVersion != Protocol::version()) {
184  if (protocolVersion < Protocol::version()) {
185  job->setErrorText(
186  i18n("Protocol version mismatch. Server version is older (%1) than ours (%2). "
187  "If you updated your system recently please restart the Akonadi server.",
188  protocolVersion,
189  Protocol::version()));
190  qCWarning(AKONADICORE_LOG) << "Protocol version mismatch. Server version is older (" << protocolVersion << ") than ours (" << Protocol::version()
191  << "). "
192  "If you updated your system recently please restart the Akonadi server.";
193  } else {
194  job->setErrorText(
195  i18n("Protocol version mismatch. Server version is newer (%1) than ours (%2). "
196  "If you updated your system recently please restart all KDE PIM applications.",
197  protocolVersion,
198  Protocol::version()));
199  qCWarning(AKONADICORE_LOG) << "Protocol version mismatch. Server version is newer (" << protocolVersion << ") than ours (" << Protocol::version()
200  << "). "
201  "If you updated your system recently please restart all KDE PIM applications.";
202  }
203  job->emitResult();
204  } else {
205  job->d_ptr->startQueued();
206  }
207 }
208 
209 void SessionPrivate::endJob(Job *job)
210 {
211  job->emitResult();
212 }
213 
214 void SessionPrivate::jobDone(KJob *job)
215 {
216  // ### careful, this method can be called from the QObject dtor of job (see jobDestroyed() below)
217  // so don't call any methods on job itself
218  if (job == currentJob) {
219  if (pipeline.isEmpty()) {
220  jobRunning = false;
221  currentJob = nullptr;
222  } else {
223  currentJob = pipeline.dequeue();
224  }
225  startNext();
226  } else {
227  // non-current job finished, likely canceled while still in the queue
228  queue.removeAll(static_cast<Akonadi::Job *>(job));
229  // ### likely not enough to really cancel already running jobs
230  pipeline.removeAll(static_cast<Akonadi::Job *>(job));
231  }
232 }
233 
234 void SessionPrivate::jobWriteFinished(Akonadi::Job *job)
235 {
236  Q_ASSERT((job == currentJob && pipeline.isEmpty()) || (job = pipeline.last()));
237  Q_UNUSED(job)
238 
239  startNext();
240 }
241 
242 void SessionPrivate::jobDestroyed(QObject *job)
243 {
244  // careful, accessing non-QObject methods of job will fail here already
245  jobDone(static_cast<KJob *>(job));
246 }
247 
248 void SessionPrivate::addJob(Job *job)
249 {
250  queue.append(job);
251  QObject::connect(job, &KJob::result, mParent, [this](KJob *job) {
252  jobDone(job);
253  });
254  QObject::connect(job, &Job::writeFinished, mParent, [this](Job *job) {
255  jobWriteFinished(job);
256  });
257  QObject::connect(job, &QObject::destroyed, mParent, [this](QObject *o) {
258  jobDestroyed(o);
259  });
260  startNext();
261 }
262 
263 void SessionPrivate::publishOtherJobs(Job *thanThisJob)
264 {
265  int count = 0;
266  for (const auto &job : std::as_const(queue)) {
267  if (job != thanThisJob) {
268  job->d_ptr->publishJob();
269  ++count;
270  }
271  }
272  if (count > 0) {
273  qCDebug(AKONADICORE_LOG) << "published" << count << "pending jobs to the job tracker";
274  }
275  if (currentJob && currentJob != thanThisJob) {
276  currentJob->d_ptr->signalStartedToJobTracker();
277  }
278 }
279 
280 qint64 SessionPrivate::nextTag()
281 {
282  return theNextTag++;
283 }
284 
285 void SessionPrivate::sendCommand(qint64 tag, const Protocol::CommandPtr &command)
286 {
287  connection->sendCommand(tag, command);
288 }
289 
290 void SessionPrivate::serverStateChanged(ServerManager::State state)
291 {
292  if (state == ServerManager::Running && !connected) {
293  reconnect();
294  } else if (!connected && state == ServerManager::Broken) {
295  // If the server is broken, cancel all pending jobs, otherwise they will be
296  // blocked forever and applications waiting for them to finish would be stuck
297  for (Job *job : std::as_const(queue)) {
299  job->kill(KJob::EmitResult);
300  }
301  } else if (state == ServerManager::Stopping) {
302  sessionThread()->destroyConnection(connection);
303  connection = nullptr;
304  }
305 }
306 
307 void SessionPrivate::itemRevisionChanged(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
308 {
309  // only deal with the queue, for the guys in the pipeline it's too late already anyway
310  // and they shouldn't have gotten there if they depend on a preceding job anyway.
311  for (Job *job : std::as_const(queue)) {
312  job->d_ptr->updateItemRevision(itemId, oldRevision, newRevision);
313  }
314 }
315 
316 /// @endcond
317 
318 SessionPrivate::SessionPrivate(Session *parent)
319  : mParent(parent)
320  , mSessionThread(new SessionThread)
321  , connection(nullptr)
322  , protocolVersion(0)
323  , mCommandBuffer(parent, "handleCommands")
324  , currentJob(nullptr)
325 {
326  // Shutdown the thread before QApplication event loop quits - the
327  // thread()->wait() mechanism in Connection dtor crashes sometimes
328  // when called from QApplication destructor
329  connThreadCleanUp = QObject::connect(qApp, &QCoreApplication::aboutToQuit, [this]() {
330  delete mSessionThread;
331  mSessionThread = nullptr;
332  });
333 }
334 
335 SessionPrivate::~SessionPrivate()
336 {
337  QObject::disconnect(connThreadCleanUp);
338  delete mSessionThread;
339 }
340 
341 void SessionPrivate::init(const QByteArray &id)
342 {
343  if (!id.isEmpty()) {
344  sessionId = id;
345  } else {
347  }
348 
349  qCDebug(AKONADICORE_LOG) << "Initializing session with ID" << id;
350 
351  connected = false;
352  theNextTag = 2;
353  jobRunning = false;
354 
357  }
359  serverStateChanged(state);
360  });
361  reconnect();
362 }
363 
364 void SessionPrivate::forceReconnect()
365 {
366  jobRunning = false;
367  connected = false;
368  if (connection) {
369  connection->forceReconnect();
370  }
372  mParent,
373  [this]() {
374  reconnect();
375  },
377 }
378 
379 Session::Session(const QByteArray &sessionId, QObject *parent)
380  : QObject(parent)
381  , d(new SessionPrivate(this))
382 {
383  d->init(sessionId);
384 }
385 
386 Session::Session(SessionPrivate *dd, const QByteArray &sessionId, QObject *parent)
387  : QObject(parent)
388  , d(dd)
389 {
390  d->mParent = this;
391  d->init(sessionId);
392 }
393 
395 {
396  d->clear(false);
397 }
398 
400 {
401  return d->sessionId;
402 }
403 
404 Q_GLOBAL_STATIC(QThreadStorage<QPointer<Session>>, instances) // NOLINT(readability-redundant-member-init)
405 
406 void SessionPrivate::createDefaultSession(const QByteArray &sessionId)
407 {
408  Q_ASSERT_X(!sessionId.isEmpty(), "SessionPrivate::createDefaultSession", "You tried to create a default session with empty session id!");
409  Q_ASSERT_X(!instances()->hasLocalData(), "SessionPrivate::createDefaultSession", "You tried to create a default session twice!");
410 
411  auto session = new Session(sessionId);
412  setDefaultSession(session);
413 }
414 
415 void SessionPrivate::setDefaultSession(Session *session)
416 {
417  instances()->setLocalData({session});
419  instances()->setLocalData({});
420  });
421 }
422 
424 {
425  if (!instances()->hasLocalData()) {
426  auto session = new Session();
427  SessionPrivate::setDefaultSession(session);
428  }
429  return instances()->localData().data();
430 }
431 
433 {
434  d->clear(true);
435 }
436 
437 void SessionPrivate::clear(bool forceReconnect)
438 {
439  for (Job *job : std::as_const(queue)) {
440  job->kill(KJob::EmitResult); // safe, not started yet
441  }
442  queue.clear();
443  for (Job *job : std::as_const(pipeline)) {
444  job->d_ptr->mStarted = false; // avoid killing/reconnect loops
445  job->kill(KJob::EmitResult);
446  }
447  pipeline.clear();
448  if (currentJob) {
449  currentJob->d_ptr->mStarted = false; // avoid killing/reconnect loops
450  currentJob->kill(KJob::EmitResult);
451  }
452 
453  if (forceReconnect) {
454  this->forceReconnect();
455  }
456 }
457 
458 #include "moc_session.cpp"
bool kill(KillVerbosity verbosity=Quietly)
void emitResult()
The server protocol version is too old or too new.
Definition: job.h:100
void setError(int errorCode)
bool isEmpty() const const
static ServerManager * self()
Returns the singleton instance of this class, for connecting to its signals.
Q_GLOBAL_STATIC(Internal::StaticControl, s_instance) class ControlPrivate
Definition: control.cpp:28
Base class for all actions in the Akonadi storage.
Definition: job.h:80
The connection to the Akonadi server failed.
Definition: job.h:99
Session(const QByteArray &sessionId=QByteArray(), QObject *parent=nullptr)
Creates a new session.
bool disconnect(const QObject *sender, const char *signal, const QObject *receiver, const char *method)
static Session * defaultSession()
Returns the default session for this thread.
void writeFinished(Akonadi::Job *job)
This signal is emitted if the job has finished all write operations, ie.
QByteArray sessionId() const
Returns the session identifier.
void setErrorText(const QString &errorText)
KLEO_EXPORT std::unique_ptr< GpgME::AssuanTransaction > sendCommand(std::shared_ptr< GpgME::Context > &assuanContext, const std::string &command, std::unique_ptr< GpgME::AssuanTransaction > transaction, GpgME::Error &err)
void clear()
Stops all jobs queued for execution.
Server is shutting down.
Definition: servermanager.h:40
static State state()
Returns the state of the server.
QByteArray number(int n, int base)
A communication session with the Akonadi storage.
Definition: core/session.h:55
QCoreApplication * instance()
Server is not running, could be no one started it yet or it failed to start.
Definition: servermanager.h:37
static bool start()
Starts the server.
void reconnected()
This signal is emitted whenever the session has been reconnected to the server (e.g.
bool invokeMethod(QObject *obj, const char *member, Qt::ConnectionType type, QGenericReturnArgument ret, QGenericArgument val0, QGenericArgument val1, QGenericArgument val2, QGenericArgument val3, QGenericArgument val4, QGenericArgument val5, QGenericArgument val6, QGenericArgument val7, QGenericArgument val8, QGenericArgument val9)
QString i18n(const char *text, const TYPE &arg...)
~Session() override
Destroys the session.
void generate(Iter begin, Iter end, QStringList &target)
Server is running and operational.
Definition: servermanager.h:39
Helper integration between Akonadi and Qt.
State
Enum for the various states the server can be in.
Definition: servermanager.h:36
void stateChanged(Akonadi::ServerManager::State state)
Emitted whenever the server state changes.
void result(KJob *job)
QueuedConnection
QMetaObject::Connection connect(const QObject *sender, const char *signal, const QObject *receiver, const char *method, Qt::ConnectionType type)
QRandomGenerator * global()
qint64 Id
Describes the unique id type.
Definition: item.h:110
void destroyed(QObject *obj)
Server is not operational and an error has been detected.
Definition: servermanager.h:41
This file is part of the KDE documentation.
Documentation copyright © 1996-2022 The KDE developers.
Generated on Sat Jan 15 2022 23:08:26 by doxygen 1.8.11 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.