Akonadi

job.cpp
1 /*
2  SPDX-FileCopyrightText: 2006 Tobias Koenig <[email protected]>
3  2006 Marc Mutz <[email protected]>
4  2006 - 2007 Volker Krause <[email protected]>
5 
6  SPDX-License-Identifier: LGPL-2.0-or-later
7 */
8 
9 #include "job.h"
10 #include "akonadicore_debug.h"
11 #include "job_p.h"
12 #include "private/instance_p.h"
13 #include "private/protocol_p.h"
14 #include "session.h"
15 #include "session_p.h"
16 #include <QDBusConnection>
17 #include <QTime>
18 
19 #include <KLocalizedString>
20 
21 #include <QDBusConnectionInterface>
22 #include <QDBusInterface>
23 #include <QElapsedTimer>
24 #include <QTimer>
25 
26 using namespace Akonadi;
27 
28 static QDBusAbstractInterface *s_jobtracker = nullptr;
29 
30 /// @cond PRIVATE
31 void JobPrivate::handleResponse(qint64 tag, const Protocol::CommandPtr &response)
32 {
33  Q_Q(Job);
34 
35  if (mCurrentSubJob) {
36  mCurrentSubJob->d_ptr->handleResponse(tag, response);
37  return;
38  }
39 
40  if (tag == mTag) {
41  if (response->isResponse()) {
42  const auto &resp = Protocol::cmdCast<Protocol::Response>(response);
43  if (resp.isError()) {
44  q->setError(Job::Unknown);
45  q->setErrorText(resp.errorMessage());
46  q->emitResult();
47  return;
48  }
49  }
50  }
51 
52  if (mTag != tag) {
53  qCWarning(AKONADICORE_LOG) << "Received response with a different tag!";
54  qCDebug(AKONADICORE_LOG) << "Response tag:" << tag << ", response type:" << response->type();
55  qCDebug(AKONADICORE_LOG) << "Job tag:" << mTag << ", job:" << q;
56  return;
57  }
58 
59  if (mStarted) {
60  if (mReadingFinished) {
61  qCWarning(AKONADICORE_LOG) << "Received response for a job that does not expect any more data, ignoring";
62  qCDebug(AKONADICORE_LOG) << "Response tag:" << tag << ", response type:" << response->type();
63  qCDebug(AKONADICORE_LOG) << "Job tag:" << mTag << ", job:" << q;
64  Q_ASSERT(!mReadingFinished);
65  return;
66  }
67 
68  if (q->doHandleResponse(tag, response)) {
69  mReadingFinished = true;
70  QTimer::singleShot(0, q, [this]() {
71  delayedEmitResult();
72  });
73  }
74  }
75 }
76 
77 void JobPrivate::init(QObject *parent)
78 {
79  Q_Q(Job);
80 
81  mParentJob = qobject_cast<Job *>(parent);
82  mSession = qobject_cast<Session *>(parent);
83 
84  if (!mSession) {
85  if (!mParentJob) {
86  mSession = Session::defaultSession();
87  } else {
88  mSession = mParentJob->d_ptr->mSession;
89  }
90  }
91 
92  if (!mParentJob) {
93  mSession->d->addJob(q);
94  } else {
95  mParentJob->addSubjob(q);
96  }
97  publishJob();
98 }
99 
100 void JobPrivate::publishJob()
101 {
102  Q_Q(Job);
103  // if there's a job tracker running, tell it about the new job
104  if (!s_jobtracker) {
105  // Let's only check for the debugging console every 3 seconds, otherwise every single job
106  // makes a dbus call to the dbus daemon, doesn't help performance.
107  static QElapsedTimer s_lastTime;
108  if (!s_lastTime.isValid() || s_lastTime.elapsed() > 3000) {
109  if (!s_lastTime.isValid()) {
110  s_lastTime.start();
111  }
112  const QString suffix = Akonadi::Instance::identifier().isEmpty() ? QString() : QLatin1Char('-') + Akonadi::Instance::identifier();
113  if (QDBusConnection::sessionBus().interface()->isServiceRegistered(QStringLiteral("org.kde.akonadiconsole") + suffix)) {
114  s_jobtracker = new QDBusInterface(QLatin1String("org.kde.akonadiconsole") + suffix,
115  QStringLiteral("/jobtracker"),
116  QStringLiteral("org.freedesktop.Akonadi.JobTracker"),
118  nullptr);
119  mSession->d->publishOtherJobs(q);
120  } else {
121  s_lastTime.restart();
122  }
123  }
124  // Note: we never reset s_jobtracker to 0 when a call fails; but if we did
125  // then we should restart s_lastTime.
126  }
127  QMetaObject::invokeMethod(q, "signalCreationToJobTracker", Qt::QueuedConnection);
128 }
129 
130 void JobPrivate::signalCreationToJobTracker()
131 {
132  Q_Q(Job);
133  if (s_jobtracker) {
134  // We do these dbus calls manually, so as to avoid having to install (or copy) the console's
135  // xml interface document. Since this is purely a debugging aid, that seems preferable to
136  // publishing something not intended for public consumption.
137  // WARNING: for any signature change here, apply it to resourcescheduler.cpp too
138  const QList<QVariant> argumentList = QList<QVariant>() << QLatin1String(mSession->sessionId()) << QString::number(reinterpret_cast<quintptr>(q), 16)
139  << (mParentJob ? QString::number(reinterpret_cast<quintptr>(mParentJob), 16) : QString())
140  << QString::fromLatin1(q->metaObject()->className()) << jobDebuggingString();
141  QDBusPendingCall call = s_jobtracker->asyncCallWithArgumentList(QStringLiteral("jobCreated"), argumentList);
142 
143  auto watcher = new QDBusPendingCallWatcher(call, s_jobtracker);
145  QDBusPendingReply<void> reply = *w;
146  if (reply.isError() && s_jobtracker) {
147  qDebug() << reply.error().name() << reply.error().message();
148  s_jobtracker->deleteLater();
149  s_jobtracker = nullptr;
150  }
151  w->deleteLater();
152  });
153  }
154 }
155 
156 void JobPrivate::signalStartedToJobTracker()
157 {
158  Q_Q(Job);
159  if (s_jobtracker) {
160  // if there's a job tracker running, tell it a job started
161  const QList<QVariant> argumentList = {QString::number(reinterpret_cast<quintptr>(q), 16)};
162  s_jobtracker->callWithArgumentList(QDBus::NoBlock, QStringLiteral("jobStarted"), argumentList);
163  }
164 }
165 
166 void JobPrivate::aboutToFinish()
167 {
168  // Dummy
169 }
170 
171 void JobPrivate::delayedEmitResult()
172 {
173  Q_Q(Job);
174  if (q->hasSubjobs()) {
175  // We still have subjobs, wait for them to finish
176  mFinishPending = true;
177  } else {
178  aboutToFinish();
179  q->emitResult();
180  }
181 }
182 
183 void JobPrivate::startQueued()
184 {
185  Q_Q(Job);
186  mStarted = true;
187 
188  Q_EMIT q->aboutToStart(q);
189  q->doStart();
190  QTimer::singleShot(0, q, [this]() {
191  startNext();
192  });
193  QMetaObject::invokeMethod(q, "signalStartedToJobTracker", Qt::QueuedConnection);
194 }
195 
196 void JobPrivate::lostConnection()
197 {
198  Q_Q(Job);
199 
200  if (mCurrentSubJob) {
201  mCurrentSubJob->d_ptr->lostConnection();
202  } else {
203  q->setError(Job::ConnectionFailed);
204  q->emitResult();
205  }
206 }
207 
208 void JobPrivate::slotSubJobAboutToStart(Job *job)
209 {
210  Q_ASSERT(mCurrentSubJob == nullptr);
211  mCurrentSubJob = job;
212 }
213 
214 void JobPrivate::startNext()
215 {
216  Q_Q(Job);
217 
218  if (mStarted && !mCurrentSubJob && q->hasSubjobs()) {
219  Job *job = qobject_cast<Akonadi::Job *>(q->subjobs().at(0));
220  Q_ASSERT(job);
221  job->d_ptr->startQueued();
222  } else if (mFinishPending && !q->hasSubjobs()) {
223  // The last subjob we've been waiting for has finished, emitResult() finally
224  QTimer::singleShot(0, q, [this]() {
225  delayedEmitResult();
226  });
227  }
228 }
229 
230 qint64 JobPrivate::newTag()
231 {
232  if (mParentJob) {
233  mTag = mParentJob->d_ptr->newTag();
234  } else {
235  mTag = mSession->d->nextTag();
236  }
237  return mTag;
238 }
239 
240 qint64 JobPrivate::tag() const
241 {
242  return mTag;
243 }
244 
245 void JobPrivate::sendCommand(qint64 tag, const Protocol::CommandPtr &cmd)
246 {
247  if (mParentJob) {
248  mParentJob->d_ptr->sendCommand(tag, cmd);
249  } else {
250  mSession->d->sendCommand(tag, cmd);
251  }
252 }
253 
254 void JobPrivate::sendCommand(const Protocol::CommandPtr &cmd)
255 {
256  sendCommand(newTag(), cmd);
257 }
258 
259 void JobPrivate::itemRevisionChanged(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
260 {
261  mSession->d->itemRevisionChanged(itemId, oldRevision, newRevision);
262 }
263 
264 void JobPrivate::updateItemRevision(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
265 {
266  Q_Q(Job);
267  const auto &subjobs = q->subjobs();
268  for (KJob *j : subjobs) {
269  auto job = qobject_cast<Akonadi::Job *>(j);
270  if (job) {
271  job->d_ptr->updateItemRevision(itemId, oldRevision, newRevision);
272  }
273  }
274  doUpdateItemRevision(itemId, oldRevision, newRevision);
275 }
276 
277 void JobPrivate::doUpdateItemRevision(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
278 {
279  Q_UNUSED(itemId)
280  Q_UNUSED(oldRevision)
281  Q_UNUSED(newRevision)
282 }
283 
284 int JobPrivate::protocolVersion() const
285 {
286  return mSession->d->protocolVersion;
287 }
288 /// @endcond
289 
291  : KCompositeJob(parent)
292  , d_ptr(new JobPrivate(this))
293 {
294  d_ptr->init(parent);
295 }
296 
297 Job::Job(JobPrivate *dd, QObject *parent)
298  : KCompositeJob(parent)
299  , d_ptr(dd)
300 {
301  d_ptr->init(parent);
302 }
303 
305 {
306  // if there is a job tracer listening, tell it the job is done now
307  if (s_jobtracker) {
308  const QList<QVariant> argumentList = {QString::number(reinterpret_cast<quintptr>(this), 16), errorString()};
309  s_jobtracker->callWithArgumentList(QDBus::NoBlock, QStringLiteral("jobEnded"), argumentList);
310  }
311 }
312 
314 {
315 }
316 
318 {
319  Q_D(Job);
320  if (d->mStarted) {
321  // the only way to cancel an already started job is reconnecting to the server
322  d->mSession->d->forceReconnect();
323  }
324  d->mStarted = false;
325  return true;
326 }
327 
329 {
330  QString str;
331  switch (error()) {
332  case NoError:
333  break;
334  case ConnectionFailed:
335  str = i18n("Cannot connect to the Akonadi service.");
336  break;
338  str = i18n("The protocol version of the Akonadi server is incompatible. Make sure you have a compatible version installed.");
339  break;
340  case UserCanceled:
341  str = i18n("User canceled operation.");
342  break;
343  case Unknown:
344  return errorText();
345  case UserError:
346  str = i18n("Unknown error.");
347  break;
348  }
349  if (!errorText().isEmpty()) {
350  str += QStringLiteral(" (%1)").arg(errorText());
351  }
352  return str;
353 }
354 
356 {
357  bool rv = KCompositeJob::addSubjob(job);
358  if (rv) {
359  connect(qobject_cast<Job *>(job), &Job::aboutToStart, this, [this](Job *job) {
360  d_ptr->slotSubJobAboutToStart(job);
361  });
362  QTimer::singleShot(0, this, [this]() {
363  d_ptr->startNext();
364  });
365  }
366  return rv;
367 }
368 
370 {
371  bool rv = KCompositeJob::removeSubjob(job);
372  if (job == d_ptr->mCurrentSubJob) {
373  d_ptr->mCurrentSubJob = nullptr;
374  QTimer::singleShot(0, this, [this]() {
375  d_ptr->startNext();
376  });
377  }
378  return rv;
379 }
380 
382 {
383  qCDebug(AKONADICORE_LOG) << this << "Unhandled response: " << tag << Protocol::debugString(response);
384  setError(Unknown);
385  setErrorText(i18n("Unexpected response"));
386  emitResult();
387  return true;
388 }
389 
390 void Job::slotResult(KJob *job)
391 {
392  if (d_ptr->mCurrentSubJob == job) {
393  // current job finished, start the next one
394  d_ptr->mCurrentSubJob = nullptr;
396  if (!job->error()) {
397  QTimer::singleShot(0, this, [this]() {
398  d_ptr->startNext();
399  });
400  }
401  } else {
402  // job that was still waiting for execution finished, probably canceled,
403  // so just remove it from the queue and move on without caring about
404  // its error code
406  }
407 }
408 
410 {
411  d_ptr->mWriteFinished = true;
412  Q_EMIT writeFinished(this);
413 }
414 
415 #include "moc_job.cpp"
virtual bool addSubjob(KJob *job)
void emitResult()
The server protocol version is too old or too new.
Definition: job.h:100
Unknown error.
Definition: job.h:102
qint64 restart()
virtual bool doHandleResponse(qint64 tag, const Protocol::CommandPtr &response)
This method should be reimplemented in the concrete jobs in case you want to handle incoming data...
Definition: job.cpp:381
void finished(QDBusPendingCallWatcher *self)
QString name() const const
void setError(int errorCode)
QString message() const const
QDBusConnection sessionBus()
Base class for all actions in the Akonadi storage.
Definition: job.h:80
The connection to the Akonadi server failed.
Definition: job.h:99
KCompositeJob(QObject *parent=nullptr)
void start() override
Jobs are started automatically once entering the event loop again, no need to explicitly call this...
Definition: job.cpp:313
static Session * defaultSession()
Returns the default session for this thread.
bool removeSubjob(KJob *job) override
Removes the given subjob of this job.
Definition: job.cpp:369
void writeFinished(Akonadi::Job *job)
This signal is emitted if the job has finished all write operations, ie.
~Job() override
Destroys the job.
Definition: job.cpp:304
void setErrorText(const QString &errorText)
KLEO_EXPORT std::unique_ptr< GpgME::AssuanTransaction > sendCommand(std::shared_ptr< GpgME::Context > &assuanContext, const std::string &command, std::unique_ptr< GpgME::AssuanTransaction > transaction, GpgME::Error &err)
QString number(int n, int base)
virtual void slotResult(KJob *job)
bool isEmpty() const const
void emitWriteFinished()
Call this method to indicate that this job will not call writeData() again.
Definition: job.cpp:409
A communication session with the Akonadi storage.
Definition: core/session.h:55
void deleteLater()
QDBusError error() const const
QString errorString() const final
Returns the error string, if there has been an error, an empty string otherwise.
Definition: job.cpp:328
bool invokeMethod(QObject *obj, const char *member, Qt::ConnectionType type, QGenericReturnArgument ret, QGenericArgument val0, QGenericArgument val1, QGenericArgument val2, QGenericArgument val3, QGenericArgument val4, QGenericArgument val5, QGenericArgument val6, QGenericArgument val7, QGenericArgument val8, QGenericArgument val9)
Job(QObject *parent=nullptr)
Creates a new job.
Definition: job.cpp:290
QString i18n(const char *text, const TYPE &arg...)
Starting point for error codes defined by sub-classes.
Definition: job.h:103
QDBusPendingCall asyncCallWithArgumentList(const QString &method, const QList< QVariant > &args)
Helper integration between Akonadi and Qt.
QString arg(qlonglong a, int fieldWidth, int base, QChar fillChar) const const
bool isValid() const const
void aboutToStart(Akonadi::Job *job)
This signal is emitted directly before the job will be started.
The user canceled this job.
Definition: job.h:101
QueuedConnection
QMetaObject::Connection connect(const QObject *sender, const char *signal, const QObject *receiver, const char *method, Qt::ConnectionType type)
QDBusMessage callWithArgumentList(QDBus::CallMode mode, const QString &method, const QList< QVariant > &args)
qint64 elapsed() const const
bool addSubjob(KJob *job) override
Adds the given job as a subjob to this job.
Definition: job.cpp:355
qint64 Id
Describes the unique id type.
Definition: item.h:106
Q_EMITQ_EMIT
QString errorText() const
bool isError() const const
int error() const
bool doKill() override
Kills the execution of the job.
Definition: job.cpp:317
virtual bool removeSubjob(KJob *job)
This file is part of the KDE documentation.
Documentation copyright © 1996-2021 The KDE developers.
Generated on Thu Dec 2 2021 23:08:40 by doxygen 1.8.11 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.