Akonadi

job.cpp
1 /*
2  SPDX-FileCopyrightText: 2006 Tobias Koenig <[email protected]>
3  2006 Marc Mutz <[email protected]>
4  2006 - 2007 Volker Krause <[email protected]>
5 
6  SPDX-License-Identifier: LGPL-2.0-or-later
7 */
8 
9 #include "job.h"
10 #include "akonadicore_debug.h"
11 #include "job_p.h"
12 #include "private/instance_p.h"
13 #include "private/protocol_p.h"
14 #include "session.h"
15 #include "session_p.h"
16 #include <QDBusConnection>
17 #include <QTime>
18 
19 #include <KLocalizedString>
20 
21 #include <QDBusConnectionInterface>
22 #include <QDBusInterface>
23 #include <QElapsedTimer>
24 #include <QTimer>
25 
26 using namespace Akonadi;
27 
28 static QDBusAbstractInterface *s_jobtracker = nullptr;
29 
30 /// @cond PRIVATE
31 void JobPrivate::handleResponse(qint64 tag, const Protocol::CommandPtr &response)
32 {
33  Q_Q(Job);
34 
35  if (mCurrentSubJob) {
36  mCurrentSubJob->d_ptr->handleResponse(tag, response);
37  return;
38  }
39 
40  if (tag == mTag) {
41  if (response->isResponse()) {
42  const auto &resp = Protocol::cmdCast<Protocol::Response>(response);
43  if (resp.isError()) {
44  q->setError(Job::Unknown);
45  q->setErrorText(resp.errorMessage());
46  q->emitResult();
47  return;
48  }
49  }
50  }
51 
52  if (mTag != tag) {
53  qCWarning(AKONADICORE_LOG) << "Received response with a different tag!";
54  qCDebug(AKONADICORE_LOG) << "Response tag:" << tag << ", response type:" << response->type();
55  qCDebug(AKONADICORE_LOG) << "Job tag:" << mTag << ", job:" << q;
56  return;
57  }
58 
59  if (mStarted) {
60  if (mReadingFinished) {
61  qCWarning(AKONADICORE_LOG) << "Received response for a job that does not expect any more data, ignoring";
62  qCDebug(AKONADICORE_LOG) << "Response tag:" << tag << ", response type:" << response->type();
63  qCDebug(AKONADICORE_LOG) << "Job tag:" << mTag << ", job:" << q;
64  Q_ASSERT(!mReadingFinished);
65  return;
66  }
67 
68  if (q->doHandleResponse(tag, response)) {
69  mReadingFinished = true;
70  QTimer::singleShot(0, q, [this]() {
71  delayedEmitResult();
72  });
73  }
74  }
75 }
76 
77 void JobPrivate::init(QObject *parent)
78 {
79  Q_Q(Job);
80 
81  mParentJob = qobject_cast<Job *>(parent);
82  mSession = qobject_cast<Session *>(parent);
83 
84  if (!mSession) {
85  if (!mParentJob) {
86  mSession = Session::defaultSession();
87  } else {
88  mSession = mParentJob->d_ptr->mSession;
89  }
90  }
91 
92  if (!mParentJob) {
93  mSession->d->addJob(q);
94  } else {
95  mParentJob->addSubjob(q);
96  }
97  publishJob();
98 }
99 
100 void JobPrivate::publishJob()
101 {
102  Q_Q(Job);
103  // if there's a job tracker running, tell it about the new job
104  if (!s_jobtracker) {
105  // Let's only check for the debugging console every 3 seconds, otherwise every single job
106  // makes a dbus call to the dbus daemon, doesn't help performance.
107  static QElapsedTimer s_lastTime;
108  if (!s_lastTime.isValid() || s_lastTime.elapsed() > 3000) {
109  if (!s_lastTime.isValid()) {
110  s_lastTime.start();
111  }
112  const QString suffix = Akonadi::Instance::identifier().isEmpty() ? QString() : QLatin1Char('-') + Akonadi::Instance::identifier();
113  if (QDBusConnection::sessionBus().interface()->isServiceRegistered(QStringLiteral("org.kde.akonadiconsole") + suffix)) {
114  s_jobtracker = new QDBusInterface(QLatin1String("org.kde.akonadiconsole") + suffix,
115  QStringLiteral("/jobtracker"),
116  QStringLiteral("org.freedesktop.Akonadi.JobTracker"),
118  nullptr);
119  mSession->d->publishOtherJobs(q);
120  } else {
121  s_lastTime.restart();
122  }
123  }
124  // Note: we never reset s_jobtracker to 0 when a call fails; but if we did
125  // then we should restart s_lastTime.
126  }
127  QMetaObject::invokeMethod(q, "signalCreationToJobTracker", Qt::QueuedConnection);
128 }
129 
130 void JobPrivate::signalCreationToJobTracker()
131 {
132  Q_Q(Job);
133  if (s_jobtracker) {
134  // We do these dbus calls manually, so as to avoid having to install (or copy) the console's
135  // xml interface document. Since this is purely a debugging aid, that seems preferable to
136  // publishing something not intended for public consumption.
137  // WARNING: for any signature change here, apply it to resourcescheduler.cpp too
138  const QList<QVariant> argumentList = QList<QVariant>() << QLatin1String(mSession->sessionId()) << QString::number(reinterpret_cast<quintptr>(q), 16)
139  << (mParentJob ? QString::number(reinterpret_cast<quintptr>(mParentJob), 16) : QString())
140  << QString::fromLatin1(q->metaObject()->className()) << jobDebuggingString();
141  QDBusPendingCall call = s_jobtracker->asyncCallWithArgumentList(QStringLiteral("jobCreated"), argumentList);
142 
143  auto watcher = new QDBusPendingCallWatcher(call, s_jobtracker);
145  QDBusPendingReply<void> reply = *w;
146  if (reply.isError() && s_jobtracker) {
147  qDebug() << reply.error().name() << reply.error().message();
148  s_jobtracker->deleteLater();
149  s_jobtracker = nullptr;
150  }
151  w->deleteLater();
152  });
153  }
154 }
155 
156 void JobPrivate::signalStartedToJobTracker()
157 {
158  Q_Q(Job);
159  if (s_jobtracker) {
160  // if there's a job tracker running, tell it a job started
161  const QList<QVariant> argumentList = {QString::number(reinterpret_cast<quintptr>(q), 16)};
162  s_jobtracker->callWithArgumentList(QDBus::NoBlock, QStringLiteral("jobStarted"), argumentList);
163  }
164 }
165 
166 void JobPrivate::aboutToFinish()
167 {
168  // Dummy
169 }
170 
171 void JobPrivate::delayedEmitResult()
172 {
173  Q_Q(Job);
174  if (q->hasSubjobs()) {
175  // We still have subjobs, wait for them to finish
176  mFinishPending = true;
177  } else {
178  aboutToFinish();
179  q->emitResult();
180  }
181 }
182 
183 void JobPrivate::startQueued()
184 {
185  Q_Q(Job);
186  mStarted = true;
187 
188  Q_EMIT q->aboutToStart(q);
189  q->doStart();
190  QTimer::singleShot(0, q, [this]() {
191  startNext();
192  });
193  QMetaObject::invokeMethod(q, "signalStartedToJobTracker", Qt::QueuedConnection);
194 }
195 
196 void JobPrivate::lostConnection()
197 {
198  Q_Q(Job);
199 
200  if (mCurrentSubJob) {
201  mCurrentSubJob->d_ptr->lostConnection();
202  } else {
203  q->setError(Job::ConnectionFailed);
204  q->emitResult();
205  }
206 }
207 
208 void JobPrivate::slotSubJobAboutToStart(Job *job)
209 {
210  Q_ASSERT(mCurrentSubJob == nullptr);
211  mCurrentSubJob = job;
212 }
213 
214 void JobPrivate::startNext()
215 {
216  Q_Q(Job);
217 
218  if (mStarted && !mCurrentSubJob && q->hasSubjobs()) {
219  Job *job = qobject_cast<Akonadi::Job *>(q->subjobs().at(0));
220  Q_ASSERT(job);
221  job->d_ptr->startQueued();
222  } else if (mFinishPending && !q->hasSubjobs()) {
223  // The last subjob we've been waiting for has finished, emitResult() finally
224  QTimer::singleShot(0, q, [this]() {
225  delayedEmitResult();
226  });
227  }
228 }
229 
230 qint64 JobPrivate::newTag()
231 {
232  if (mParentJob) {
233  mTag = mParentJob->d_ptr->newTag();
234  } else {
235  mTag = mSession->d->nextTag();
236  }
237  return mTag;
238 }
239 
240 qint64 JobPrivate::tag() const
241 {
242  return mTag;
243 }
244 
245 void JobPrivate::sendCommand(qint64 tag, const Protocol::CommandPtr &cmd)
246 {
247  if (mParentJob) {
248  mParentJob->d_ptr->sendCommand(tag, cmd);
249  } else {
250  mSession->d->sendCommand(tag, cmd);
251  }
252 }
253 
254 void JobPrivate::sendCommand(const Protocol::CommandPtr &cmd)
255 {
256  sendCommand(newTag(), cmd);
257 }
258 
259 void JobPrivate::itemRevisionChanged(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
260 {
261  mSession->d->itemRevisionChanged(itemId, oldRevision, newRevision);
262 }
263 
264 void JobPrivate::updateItemRevision(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
265 {
266  Q_Q(Job);
267  const auto &subjobs = q->subjobs();
268  for (KJob *j : subjobs) {
269  auto job = qobject_cast<Akonadi::Job *>(j);
270  if (job) {
271  job->d_ptr->updateItemRevision(itemId, oldRevision, newRevision);
272  }
273  }
274  doUpdateItemRevision(itemId, oldRevision, newRevision);
275 }
276 
277 void JobPrivate::doUpdateItemRevision(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
278 {
279  Q_UNUSED(itemId)
280  Q_UNUSED(oldRevision)
281  Q_UNUSED(newRevision)
282 }
283 
284 int JobPrivate::protocolVersion() const
285 {
286  return mSession->d->protocolVersion;
287 }
288 /// @endcond
289 
291  : KCompositeJob(parent)
292  , d_ptr(new JobPrivate(this))
293 {
294  d_ptr->init(parent);
295 }
296 
297 Job::Job(JobPrivate *dd, QObject *parent)
298  : KCompositeJob(parent)
299  , d_ptr(dd)
300 {
301  d_ptr->init(parent);
302 }
303 
305 {
306  // if there is a job tracer listening, tell it the job is done now
307  if (s_jobtracker) {
308  const QList<QVariant> argumentList = {QString::number(reinterpret_cast<quintptr>(this), 16), errorString()};
309  s_jobtracker->callWithArgumentList(QDBus::NoBlock, QStringLiteral("jobEnded"), argumentList);
310  }
311 }
312 
314 {
315 }
316 
318 {
319  Q_D(Job);
320  if (d->mStarted) {
321  // the only way to cancel an already started job is reconnecting to the server
322  d->mSession->d->forceReconnect();
323  }
324  d->mStarted = false;
325  return true;
326 }
327 
329 {
330  QString str;
331  switch (error()) {
332  case NoError:
333  break;
334  case ConnectionFailed:
335  str = i18n("Cannot connect to the Akonadi service.");
336  break;
338  str = i18n("The protocol version of the Akonadi server is incompatible. Make sure you have a compatible version installed.");
339  break;
340  case UserCanceled:
341  str = i18n("User canceled operation.");
342  break;
343  case Unknown:
344  return errorText();
345  case UserError:
346  str = i18n("Unknown error.");
347  break;
348  }
349  if (!errorText().isEmpty()) {
350  str += QStringLiteral(" (%1)").arg(errorText());
351  }
352  return str;
353 }
354 
356 {
357  bool rv = KCompositeJob::addSubjob(job);
358  if (rv) {
359  connect(qobject_cast<Job *>(job), &Job::aboutToStart, this, [this](Job *job) {
360  d_ptr->slotSubJobAboutToStart(job);
361  });
362  QTimer::singleShot(0, this, [this]() {
363  d_ptr->startNext();
364  });
365  }
366  return rv;
367 }
368 
370 {
371  bool rv = KCompositeJob::removeSubjob(job);
372  if (job == d_ptr->mCurrentSubJob) {
373  d_ptr->mCurrentSubJob = nullptr;
374  QTimer::singleShot(0, this, [this]() {
375  d_ptr->startNext();
376  });
377  }
378  return rv;
379 }
380 
382 {
383  qCDebug(AKONADICORE_LOG) << this << "Unhandled response: " << tag << Protocol::debugString(response);
384  setError(Unknown);
385  setErrorText(i18n("Unexpected response"));
386  emitResult();
387  return true;
388 }
389 
390 void Job::slotResult(KJob *job)
391 {
392  if (d_ptr->mCurrentSubJob == job) {
393  // current job finished, start the next one
394  d_ptr->mCurrentSubJob = nullptr;
396  if (!job->error()) {
397  QTimer::singleShot(0, this, [this]() {
398  d_ptr->startNext();
399  });
400  }
401  } else {
402  // job that was still waiting for execution finished, probably canceled,
403  // so just remove it from the queue and move on without caring about
404  // its error code
406  }
407 }
408 
410 {
411  d_ptr->mWriteFinished = true;
412  Q_EMIT writeFinished(this);
413 }
414 
415 #include "moc_job.cpp"
void finished(QDBusPendingCallWatcher *self)
void emitWriteFinished()
Call this method to indicate that this job will not call writeData() again.
Definition: job.cpp:409
QString number(int n, int base)
@ Unknown
Unknown error.
Definition: job.h:102
bool addSubjob(KJob *job) override
Adds the given job as a subjob to this job.
Definition: job.cpp:355
Q_EMITQ_EMIT
QString errorString() const final
Returns the error string, if there has been an error, an empty string otherwise.
Definition: job.cpp:328
@ ProtocolVersionMismatch
The server protocol version is too old or too new.
Definition: job.h:100
bool isError() const const
@ ConnectionFailed
The connection to the Akonadi server failed.
Definition: job.h:99
Definition: item.h:32
virtual bool removeSubjob(KJob *job)
QMetaObject::Connection connect(const QObject *sender, const char *signal, const QObject *receiver, const char *method, Qt::ConnectionType type)
qint64 Id
Describes the unique id type.
Definition: item.h:110
bool removeSubjob(KJob *job) override
Removes the given subjob of this job.
Definition: job.cpp:369
~Job() override
Destroys the job.
Definition: job.cpp:304
void deleteLater()
QString i18n(const char *text, const TYPE &arg...)
QDBusConnection sessionBus()
qint64 restart()
static Session * defaultSession()
Returns the default session for this thread.
bool isEmpty() const const
QString errorText() const
void writeFinished(Akonadi::Job *job)
This signal is emitted if the job has finished all write operations, ie.
void start() override
Jobs are started automatically once entering the event loop again, no need to explicitly call this.
Definition: job.cpp:313
virtual bool addSubjob(KJob *job)
QDBusPendingCall asyncCallWithArgumentList(const QString &method, const QList< QVariant > &args)
QueuedConnection
qint64 elapsed() const const
Base class for all actions in the Akonadi storage.
Definition: job.h:80
Job(QObject *parent=nullptr)
Creates a new job.
Definition: job.cpp:290
QString arg(qlonglong a, int fieldWidth, int base, QChar fillChar) const const
virtual bool doHandleResponse(qint64 tag, const Protocol::CommandPtr &response)
This method should be reimplemented in the concrete jobs in case you want to handle incoming data.
Definition: job.cpp:381
@ UserError
Starting point for error codes defined by sub-classes.
Definition: job.h:103
bool invokeMethod(QObject *obj, const char *member, Qt::ConnectionType type, QGenericReturnArgument ret, QGenericArgument val0, QGenericArgument val1, QGenericArgument val2, QGenericArgument val3, QGenericArgument val4, QGenericArgument val5, QGenericArgument val6, QGenericArgument val7, QGenericArgument val8, QGenericArgument val9)
virtual void slotResult(KJob *job)
KLEO_EXPORT std::unique_ptr< GpgME::DefaultAssuanTransaction > sendCommand(std::shared_ptr< GpgME::Context > &assuanContext, const std::string &command, GpgME::Error &err)
int error() const
QDBusMessage callWithArgumentList(QDBus::CallMode mode, const QString &method, const QList< QVariant > &args)
@ UserCanceled
The user canceled this job.
Definition: job.h:101
QObject * parent() const const
bool isValid() const const
bool doKill() override
Kills the execution of the job.
Definition: job.cpp:317
void aboutToStart(Akonadi::Job *job)
This signal is emitted directly before the job will be started.
Q_D(Todo)
Helper integration between Akonadi and Qt.
This file is part of the KDE documentation.
Documentation copyright © 1996-2022 The KDE developers.
Generated on Tue May 24 2022 04:03:57 by doxygen 1.8.17 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.