Akonadi

core/session.cpp
1 /*
2  SPDX-FileCopyrightText: 2007 Volker Krause <[email protected]>
3 
4  SPDX-License-Identifier: LGPL-2.0-or-later
5 */
6 
7 #include "session.h"
8 #include "session_p.h"
9 
10 
11 #include "job.h"
12 #include "job_p.h"
13 #include "servermanager.h"
14 #include "servermanager_p.h"
15 #include "protocolhelper_p.h"
16 #include "sessionthread_p.h"
17 #include "private/standarddirs_p.h"
18 #include "private/protocol_p.h"
19 
20 #include "akonadicore_debug.h"
21 
22 #include <KLocalizedString>
23 
24 #include <QCoreApplication>
25 #include <QThreadStorage>
26 #include <QTimer>
27 #include <QThread>
28 #include <QPointer>
29 #include <QRandomGenerator>
30 
31 #include <QHostAddress>
32 #include <QApplication>
33 
34 // ### FIXME pipelining got broken by switching result emission in JobPrivate::handleResponse to delayed emission
35 // in order to work around exec() deadlocks. As a result of that Session knows to late about a finished job and still
36 // sends responses for the next one to the already finished one
37 #define PIPELINE_LENGTH 0
38 //#define PIPELINE_LENGTH 2
39 
40 using namespace Akonadi;
41 
42 //@cond PRIVATE
43 
44 void SessionPrivate::startNext()
45 {
46  QTimer::singleShot(0, mParent, [this]() { doStartNext(); });
47 }
48 
49 void SessionPrivate::reconnect()
50 {
51  if (!connection) {
52  connection = new Connection(Connection::CommandConnection, sessionId, &mCommandBuffer);
53  sessionThread()->addConnection(connection);
54  mParent->connect(connection, &Connection::reconnected, mParent, &Session::reconnected,
56  mParent->connect(connection, &Connection::socketDisconnected, mParent, [this]() { socketDisconnected(); }, Qt::QueuedConnection);
57  mParent->connect(connection, &Connection::socketError, mParent, [this](const QString &error) { socketError(error); }, Qt::QueuedConnection);
58  }
59 
60  connection->reconnect();
61 }
62 
63 void SessionPrivate::socketError(const QString &error)
64 {
65  qCWarning(AKONADICORE_LOG) << "Socket error occurred:" << error;
66  socketDisconnected();
67 }
68 
69 void SessionPrivate::socketDisconnected()
70 {
71  if (currentJob) {
72  currentJob->d_ptr->lostConnection();
73  }
74  connected = false;
75 }
76 
77 bool SessionPrivate::handleCommands()
78 {
79  CommandBufferLocker lock(&mCommandBuffer);
80  CommandBufferNotifyBlocker notify(&mCommandBuffer);
81  while (!mCommandBuffer.isEmpty()) {
82  const auto command = mCommandBuffer.dequeue();
83  lock.unlock();
84  const auto cmd = command.command;
85  const auto tag = command.tag;
86 
87  // Handle Hello response -> send Login
88  if (cmd->type() == Protocol::Command::Hello) {
89  const auto &hello = Protocol::cmdCast<Protocol::HelloResponse>(cmd);
90  if (hello.isError()) {
91  qCWarning(AKONADICORE_LOG) << "Error when establishing connection with Akonadi server:" << hello.errorMessage();
92  connection->closeConnection();
93  QTimer::singleShot(1000, connection, &Connection::reconnect);
94  return false;
95  }
96 
97  qCDebug(AKONADICORE_LOG) << "Connected to" << hello.serverName() << ", using protocol version" << hello.protocolVersion();
98  qCDebug(AKONADICORE_LOG) << "Server generation:" << hello.generation();
99  qCDebug(AKONADICORE_LOG) << "Server says:" << hello.message();
100  // Version mismatch is handled in SessionPrivate::startJob() so that
101  // we can report the error out via KJob API
102  protocolVersion = hello.protocolVersion();
103  Internal::setServerProtocolVersion(protocolVersion);
104  Internal::setGeneration(hello.generation());
105 
106  sendCommand(nextTag(), Protocol::LoginCommandPtr::create(sessionId));
107  } else if (cmd->type() == Protocol::Command::Login) {
108  const auto &login = Protocol::cmdCast<Protocol::LoginResponse>(cmd);
109  if (login.isError()) {
110  qCWarning(AKONADICORE_LOG) << "Unable to login to Akonadi server:" << login.errorMessage();
111  connection->closeConnection();
112  QTimer::singleShot(1000, mParent, [this]() { reconnect(); });
113  return false;
114  }
115 
116  connected = true;
117  startNext();
118  } else if (currentJob) {
119  currentJob->d_ptr->handleResponse(tag, cmd);
120  }
121 
122  lock.relock();
123  }
124 
125  return true;
126 }
127 
128 bool SessionPrivate::canPipelineNext()
129 {
130  if (queue.isEmpty() || pipeline.count() >= PIPELINE_LENGTH) {
131  return false;
132  }
133  if (pipeline.isEmpty() && currentJob) {
134  return currentJob->d_ptr->mWriteFinished;
135  }
136  if (!pipeline.isEmpty()) {
137  return pipeline.last()->d_ptr->mWriteFinished;
138  }
139  return false;
140 }
141 
142 void SessionPrivate::doStartNext()
143 {
144  if (!connected || (queue.isEmpty() && pipeline.isEmpty())) {
145  return;
146  }
147  if (canPipelineNext()) {
148  Akonadi::Job *nextJob = queue.dequeue();
149  pipeline.enqueue(nextJob);
150  startJob(nextJob);
151  }
152  if (jobRunning) {
153  return;
154  }
155  jobRunning = true;
156  if (!pipeline.isEmpty()) {
157  currentJob = pipeline.dequeue();
158  } else {
159  currentJob = queue.dequeue();
160  startJob(currentJob);
161  }
162 }
163 
164 void SessionPrivate::startJob(Job *job)
165 {
166  if (protocolVersion != Protocol::version()) {
168  if (protocolVersion < Protocol::version()) {
169  job->setErrorText(i18n("Protocol version mismatch. Server version is older (%1) than ours (%2). "
170  "If you updated your system recently please restart the Akonadi server.",
171  protocolVersion, Protocol::version()));
172  qCWarning(AKONADICORE_LOG) << "Protocol version mismatch. Server version is older (" << protocolVersion << ") than ours (" << Protocol::version() << "). "
173  "If you updated your system recently please restart the Akonadi server.";
174  } else {
175  job->setErrorText(i18n("Protocol version mismatch. Server version is newer (%1) than ours (%2). "
176  "If you updated your system recently please restart all KDE PIM applications.",
177  protocolVersion, Protocol::version()));
178  qCWarning(AKONADICORE_LOG) << "Protocol version mismatch. Server version is newer (" << protocolVersion << ") than ours (" << Protocol::version() << "). "
179  "If you updated your system recently please restart all KDE PIM applications.";
180  }
181  job->emitResult();
182  } else {
183  job->d_ptr->startQueued();
184  }
185 }
186 
187 void SessionPrivate::endJob(Job *job)
188 {
189  job->emitResult();
190 }
191 
192 void SessionPrivate::jobDone(KJob *job)
193 {
194  // ### careful, this method can be called from the QObject dtor of job (see jobDestroyed() below)
195  // so don't call any methods on job itself
196  if (job == currentJob) {
197  if (pipeline.isEmpty()) {
198  jobRunning = false;
199  currentJob = nullptr;
200  } else {
201  currentJob = pipeline.dequeue();
202  }
203  startNext();
204  } else {
205  // non-current job finished, likely canceled while still in the queue
206  queue.removeAll(static_cast<Akonadi::Job *>(job));
207  // ### likely not enough to really cancel already running jobs
208  pipeline.removeAll(static_cast<Akonadi::Job *>(job));
209  }
210 }
211 
212 void SessionPrivate::jobWriteFinished(Akonadi::Job *job)
213 {
214  Q_ASSERT((job == currentJob && pipeline.isEmpty()) || (job = pipeline.last()));
215  Q_UNUSED(job);
216 
217  startNext();
218 }
219 
220 void SessionPrivate::jobDestroyed(QObject *job)
221 {
222  // careful, accessing non-QObject methods of job will fail here already
223  jobDone(static_cast<KJob *>(job));
224 }
225 
226 void SessionPrivate::addJob(Job *job)
227 {
228  queue.append(job);
229  QObject::connect(job, &KJob::result, mParent, [this](KJob *job) { jobDone(job); });
230  QObject::connect(job, &Job::writeFinished, mParent, [this](Job *job) { jobWriteFinished(job); });
231  QObject::connect(job, &QObject::destroyed, mParent, [this](QObject *o) { jobDestroyed(o); });
232  startNext();
233 }
234 
235 void SessionPrivate::publishOtherJobs(Job *thanThisJob)
236 {
237  int count = 0;
238  for (const auto& job : qAsConst(queue)) {
239  if (job != thanThisJob) {
240  job->d_ptr->publishJob();
241  ++count;
242  }
243  }
244  if (count > 0) {
245  qCDebug(AKONADICORE_LOG) << "published" << count << "pending jobs to the job tracker";
246  }
247  if (currentJob && currentJob != thanThisJob) {
248  currentJob->d_ptr->signalStartedToJobTracker();
249  }
250 }
251 
252 qint64 SessionPrivate::nextTag()
253 {
254  return theNextTag++;
255 }
256 
257 void SessionPrivate::sendCommand(qint64 tag, const Protocol::CommandPtr &command)
258 {
259  connection->sendCommand(tag, command);
260 }
261 
262 void SessionPrivate::serverStateChanged(ServerManager::State state)
263 {
264  if (state == ServerManager::Running && !connected) {
265  reconnect();
266  } else if (!connected && state == ServerManager::Broken) {
267  // If the server is broken, cancel all pending jobs, otherwise they will be
268  // blocked forever and applications waiting for them to finish would be stuck
269  for (Job *job : qAsConst(queue)) {
271  job->kill(KJob::EmitResult);
272  }
273  } else if (state == ServerManager::Stopping) {
274  sessionThread()->destroyConnection(connection);
275  connection = nullptr;
276  }
277 }
278 
279 void SessionPrivate::itemRevisionChanged(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
280 {
281  // only deal with the queue, for the guys in the pipeline it's too late already anyway
282  // and they shouldn't have gotten there if they depend on a preceding job anyway.
283  for (Job *job : qAsConst(queue)) {
284  job->d_ptr->updateItemRevision(itemId, oldRevision, newRevision);
285  }
286 }
287 
288 //@endcond
289 
290 SessionPrivate::SessionPrivate(Session *parent)
291  : mParent(parent)
292  , mSessionThread(new SessionThread)
293  , connection(nullptr)
294  , protocolVersion(0)
295  , mCommandBuffer(parent, "handleCommands")
296  , currentJob(nullptr)
297 {
298  // Shutdown the thread before QApplication event loop quits - the
299  // thread()->wait() mechanism in Connection dtor crashes sometimes
300  // when called from QApplication destructor
301  connThreadCleanUp = QObject::connect(qApp, &QCoreApplication::aboutToQuit,
302  [this]() {
303  delete mSessionThread;
304  mSessionThread = nullptr;
305  });
306 }
307 
308 SessionPrivate::~SessionPrivate()
309 {
310  QObject::disconnect(connThreadCleanUp);
311  delete mSessionThread;
312 }
313 
314 void SessionPrivate::init(const QByteArray &id)
315 {
316  if (!id.isEmpty()) {
317  sessionId = id;
318  } else {
319  sessionId = QCoreApplication::instance()->applicationName().toUtf8()
321  }
322 
323  qCDebug(AKONADICORE_LOG) << "Initializing session with ID" << id;
324 
325  connected = false;
326  theNextTag = 2;
327  jobRunning = false;
328 
331  }
333  mParent, [this](ServerManager::State state) { serverStateChanged(state); });
334  reconnect();
335 }
336 
337 void SessionPrivate::forceReconnect()
338 {
339  jobRunning = false;
340  connected = false;
341  if (connection) {
342  connection->forceReconnect();
343  }
344  QMetaObject::invokeMethod(mParent, [this]() { reconnect(); }, Qt::QueuedConnection);
345 }
346 
347 Session::Session(const QByteArray &sessionId, QObject *parent)
348  : QObject(parent)
349  , d(new SessionPrivate(this))
350 {
351  d->init(sessionId);
352 }
353 
354 Session::Session(SessionPrivate *dd, const QByteArray &sessionId, QObject *parent)
355  : QObject(parent)
356  , d(dd)
357 {
358  d->mParent = this;
359  d->init(sessionId);
360 }
361 
363 {
364  d->clear(false);
365  delete d;
366 }
367 
369 {
370  return d->sessionId;
371 }
372 
373 Q_GLOBAL_STATIC(QThreadStorage<QPointer<Session>>, instances) // NOLINT(readability-redundant-member-init)
374 
375 void SessionPrivate::createDefaultSession(const QByteArray &sessionId)
376 {
377  Q_ASSERT_X(!sessionId.isEmpty(), "SessionPrivate::createDefaultSession",
378  "You tried to create a default session with empty session id!");
379  Q_ASSERT_X(!instances()->hasLocalData(), "SessionPrivate::createDefaultSession",
380  "You tried to create a default session twice!");
381 
382  Session *session = new Session(sessionId);
383  setDefaultSession(session);
384 }
385 
386 void SessionPrivate::setDefaultSession(Session *session)
387 {
388  instances()->setLocalData({ session });
390  []() {
391  instances()->setLocalData({});
392  });
393 }
394 
396 {
397  if (!instances()->hasLocalData()) {
398  Session *session = new Session();
399  SessionPrivate::setDefaultSession(session);
400  }
401  return instances()->localData().data();
402 }
403 
405 {
406  d->clear(true);
407 }
408 
409 void SessionPrivate::clear(bool forceReconnect)
410 {
411  for (Job *job : qAsConst(queue)) {
412  job->kill(KJob::EmitResult); // safe, not started yet
413  }
414  queue.clear();
415  for (Job *job : qAsConst(pipeline)) {
416  job->d_ptr->mStarted = false; // avoid killing/reconnect loops
417  job->kill(KJob::EmitResult);
418  }
419  pipeline.clear();
420  if (currentJob) {
421  currentJob->d_ptr->mStarted = false; // avoid killing/reconnect loops
422  currentJob->kill(KJob::EmitResult);
423  }
424 
425  if (forceReconnect) {
426  this->forceReconnect();
427  }
428 }
429 
430 #include "moc_session.cpp"
bool kill(KillVerbosity verbosity=Quietly)
void emitResult()
The server protocol version is too old or too new.
Definition: job.h:100
void setError(int errorCode)
bool isEmpty() const const
static ServerManager * self()
Returns the singleton instance of this class, for connecting to its signals.
Base class for all actions in the Akonadi storage.
Definition: job.h:80
The connection to the Akonadi server failed.
Definition: job.h:99
Session(const QByteArray &sessionId=QByteArray(), QObject *parent=nullptr)
Creates a new session.
bool disconnect(const QObject *sender, const char *signal, const QObject *receiver, const char *method)
static Session * defaultSession()
Returns the default session for this thread.
~Session()
Destroys the session.
void writeFinished(Akonadi::Job *job)
This signal is emitted if the job has finished all write operations, ie.
QByteArray sessionId() const
Returns the session identifier.
void setErrorText(const QString &errorText)
void clear()
Stops all jobs queued for execution.
Server is shutting down.
Definition: servermanager.h:42
static State state()
Returns the state of the server.
QByteArray number(int n, int base)
A communication session with the Akonadi storage.
Definition: core/session.h:54
QCoreApplication * instance()
Server is not running, could be no one started it yet or it failed to start.
Definition: servermanager.h:39
static bool start()
Starts the server.
void reconnected()
This signal is emitted whenever the session has been reconnected to the server (e.g.
bool invokeMethod(QObject *obj, const char *member, Qt::ConnectionType type, QGenericReturnArgument ret, QGenericArgument val0, QGenericArgument val1, QGenericArgument val2, QGenericArgument val3, QGenericArgument val4, QGenericArgument val5, QGenericArgument val6, QGenericArgument val7, QGenericArgument val8, QGenericArgument val9)
QString i18n(const char *text, const TYPE &arg...)
void generate(Iter begin, Iter end, QStringList &target)
Server is running and operational.
Definition: servermanager.h:41
Helper integration between Akonadi and Qt.
State
Enum for the various states the server can be in.
Definition: servermanager.h:38
void stateChanged(Akonadi::ServerManager::State state)
Emitted whenever the server state changes.
void result(KJob *job)
QueuedConnection
QMetaObject::Connection connect(const QObject *sender, const char *signal, const QObject *receiver, const char *method, Qt::ConnectionType type)
QRandomGenerator * global()
void destroyed(QObject *obj)
Server is not operational and an error has been detected.
Definition: servermanager.h:43
This file is part of the KDE documentation.
Documentation copyright © 1996-2020 The KDE developers.
Generated on Wed Aug 12 2020 23:16:11 by doxygen 1.8.11 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.