Akonadi

job.cpp
1 /*
2  SPDX-FileCopyrightText: 2006 Tobias Koenig <[email protected]>
3  2006 Marc Mutz <[email protected]>
4  2006 - 2007 Volker Krause <[email protected]>
5 
6  SPDX-License-Identifier: LGPL-2.0-or-later
7 */
8 
9 #include "job.h"
10 #include "job_p.h"
11 #include "akonadicore_debug.h"
12 #include <QDBusConnection>
13 #include <QTime>
14 #include "private/protocol_p.h"
15 #include "private/instance_p.h"
16 #include "session.h"
17 #include "session_p.h"
18 
19 #include <KLocalizedString>
20 
21 #include <QTimer>
22 #include <QElapsedTimer>
23 #include <QDBusInterface>
24 #include <QDBusConnectionInterface>
25 
26 using namespace Akonadi;
27 
28 static QDBusAbstractInterface *s_jobtracker = nullptr;
29 
30 //@cond PRIVATE
31 void JobPrivate::handleResponse(qint64 tag, const Protocol::CommandPtr &response)
32 {
33  Q_Q(Job);
34 
35  if (mCurrentSubJob) {
36  mCurrentSubJob->d_ptr->handleResponse(tag, response);
37  return;
38  }
39 
40  if (tag == mTag) {
41  if (response->isResponse()) {
42  const auto &resp = Protocol::cmdCast<Protocol::Response>(response);
43  if (resp.isError()) {
44  q->setError(Job::Unknown);
45  q->setErrorText(resp.errorMessage());
46  q->emitResult();
47  return;
48  }
49  }
50  }
51 
52  if (mTag != tag) {
53  qCWarning(AKONADICORE_LOG) << "Received response with a different tag!";
54  qCDebug(AKONADICORE_LOG) << "Response tag:" << tag << ", response type:" << response->type();
55  qCDebug(AKONADICORE_LOG) << "Job tag:" << mTag << ", job:" << q;
56  return;
57  }
58 
59  if (mStarted) {
60  if (mReadingFinished) {
61  qCWarning(AKONADICORE_LOG) << "Received response for a job that does not expect any more data, ignoring";
62  qCDebug(AKONADICORE_LOG) << "Response tag:" << tag << ", response type:" << response->type();
63  qCDebug(AKONADICORE_LOG) << "Job tag:" << mTag << ", job:" << q;
64  Q_ASSERT(!mReadingFinished);
65  return;
66  }
67 
68  if (q->doHandleResponse(tag, response)) {
69  mReadingFinished = true;
70  QTimer::singleShot(0, q, [this]() {delayedEmitResult(); });
71  }
72  }
73 }
74 
75 void JobPrivate::init(QObject *parent)
76 {
77  Q_Q(Job);
78 
79  mParentJob = qobject_cast<Job *>(parent);
80  mSession = qobject_cast<Session *>(parent);
81 
82  if (!mSession) {
83  if (!mParentJob) {
84  mSession = Session::defaultSession();
85  } else {
86  mSession = mParentJob->d_ptr->mSession;
87  }
88  }
89 
90  if (!mParentJob) {
91  mSession->d->addJob(q);
92  } else {
93  mParentJob->addSubjob(q);
94  }
95  publishJob();
96 }
97 
98 void JobPrivate::publishJob()
99 {
100  Q_Q(Job);
101  // if there's a job tracker running, tell it about the new job
102  if (!s_jobtracker) {
103  // Let's only check for the debugging console every 3 seconds, otherwise every single job
104  // makes a dbus call to the dbus daemon, doesn't help performance.
105  static QElapsedTimer s_lastTime;
106  if (!s_lastTime.isValid() || s_lastTime.elapsed() > 3000) {
107  if (!s_lastTime.isValid()) {
108  s_lastTime.start();
109  }
110  const QString suffix = Akonadi::Instance::identifier().isEmpty() ? QString() : QLatin1Char('-') + Akonadi::Instance::identifier();
111  if (QDBusConnection::sessionBus().interface()->isServiceRegistered(QStringLiteral("org.kde.akonadiconsole") + suffix)) {
112  s_jobtracker = new QDBusInterface(QLatin1String("org.kde.akonadiconsole") + suffix,
113  QStringLiteral("/jobtracker"),
114  QStringLiteral("org.freedesktop.Akonadi.JobTracker"),
115  QDBusConnection::sessionBus(), nullptr);
116  mSession->d->publishOtherJobs(q);
117  } else {
118  s_lastTime.restart();
119  }
120  }
121  // Note: we never reset s_jobtracker to 0 when a call fails; but if we did
122  // then we should restart s_lastTime.
123  }
124  QMetaObject::invokeMethod(q, "signalCreationToJobTracker", Qt::QueuedConnection);
125 }
126 
127 void JobPrivate::signalCreationToJobTracker()
128 {
129  Q_Q(Job);
130  if (s_jobtracker) {
131  // We do these dbus calls manually, so as to avoid having to install (or copy) the console's
132  // xml interface document. Since this is purely a debugging aid, that seems preferable to
133  // publishing something not intended for public consumption.
134  // WARNING: for any signature change here, apply it to resourcescheduler.cpp too
135  const QList<QVariant> argumentList = QList<QVariant>() << QLatin1String(mSession->sessionId())
136  << QString::number(reinterpret_cast<quintptr>(q), 16)
137  << (mParentJob ? QString::number(reinterpret_cast<quintptr>(mParentJob), 16) : QString())
138  << QString::fromLatin1(q->metaObject()->className())
139  << jobDebuggingString();
140  QDBusPendingCall call = s_jobtracker->asyncCallWithArgumentList(QStringLiteral("jobCreated"), argumentList);
141 
142  QDBusPendingCallWatcher *watcher = new QDBusPendingCallWatcher(call, s_jobtracker);
144  QDBusPendingReply<void> reply = *w;
145  if (reply.isError() && s_jobtracker) {
146  qDebug() << reply.error().name() << reply.error().message();
147  s_jobtracker->deleteLater();
148  s_jobtracker = nullptr;
149  }
150  w->deleteLater();
151  });
152  }
153 }
154 
155 void JobPrivate::signalStartedToJobTracker()
156 {
157  Q_Q(Job);
158  if (s_jobtracker) {
159  // if there's a job tracker running, tell it a job started
160  const QList<QVariant> argumentList = { QString::number(reinterpret_cast<quintptr>(q), 16) };
161  s_jobtracker->callWithArgumentList(QDBus::NoBlock, QStringLiteral("jobStarted"), argumentList);
162  }
163 }
164 
165 void JobPrivate::aboutToFinish()
166 {
167  // Dummy
168 }
169 
170 void JobPrivate::delayedEmitResult()
171 {
172  Q_Q(Job);
173  if (q->hasSubjobs()) {
174  // We still have subjobs, wait for them to finish
175  mFinishPending = true;
176  } else {
177  aboutToFinish();
178  q->emitResult();
179  }
180 }
181 
182 void JobPrivate::startQueued()
183 {
184  Q_Q(Job);
185  mStarted = true;
186 
187  Q_EMIT q->aboutToStart(q);
188  q->doStart();
189  QTimer::singleShot(0, q, [this]() { startNext(); });
190  QMetaObject::invokeMethod(q, "signalStartedToJobTracker", Qt::QueuedConnection);
191 }
192 
193 void JobPrivate::lostConnection()
194 {
195  Q_Q(Job);
196 
197  if (mCurrentSubJob) {
198  mCurrentSubJob->d_ptr->lostConnection();
199  } else {
200  q->setError(Job::ConnectionFailed);
201  q->emitResult();
202  }
203 }
204 
205 void JobPrivate::slotSubJobAboutToStart(Job *job)
206 {
207  Q_ASSERT(mCurrentSubJob == nullptr);
208  mCurrentSubJob = job;
209 }
210 
211 void JobPrivate::startNext()
212 {
213  Q_Q(Job);
214 
215  if (mStarted && !mCurrentSubJob && q->hasSubjobs()) {
216  Job *job = qobject_cast<Akonadi::Job *>(q->subjobs().at(0));
217  Q_ASSERT(job);
218  job->d_ptr->startQueued();
219  } else if (mFinishPending && !q->hasSubjobs()) {
220  // The last subjob we've been waiting for has finished, emitResult() finally
221  QTimer::singleShot(0, q, [this]() {delayedEmitResult(); });
222  }
223 }
224 
225 qint64 JobPrivate::newTag()
226 {
227  if (mParentJob) {
228  mTag = mParentJob->d_ptr->newTag();
229  } else {
230  mTag = mSession->d->nextTag();
231  }
232  return mTag;
233 }
234 
235 qint64 JobPrivate::tag() const
236 {
237  return mTag;
238 }
239 
240 void JobPrivate::sendCommand(qint64 tag, const Protocol::CommandPtr &cmd)
241 {
242  if (mParentJob) {
243  mParentJob->d_ptr->sendCommand(tag, cmd);
244  } else {
245  mSession->d->sendCommand(tag, cmd);
246  }
247 }
248 
249 void JobPrivate::sendCommand(const Protocol::CommandPtr &cmd)
250 {
251  sendCommand(newTag(), cmd);
252 }
253 
254 void JobPrivate::itemRevisionChanged(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
255 {
256  mSession->d->itemRevisionChanged(itemId, oldRevision, newRevision);
257 }
258 
259 void JobPrivate::updateItemRevision(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
260 {
261  Q_Q(Job);
262  const auto &subjobs = q->subjobs();
263  for (KJob *j : subjobs) {
264  Akonadi::Job *job = qobject_cast<Akonadi::Job *>(j);
265  if (job) {
266  job->d_ptr->updateItemRevision(itemId, oldRevision, newRevision);
267  }
268  }
269  doUpdateItemRevision(itemId, oldRevision, newRevision);
270 }
271 
272 void JobPrivate::doUpdateItemRevision(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
273 {
274  Q_UNUSED(itemId);
275  Q_UNUSED(oldRevision);
276  Q_UNUSED(newRevision);
277 }
278 
279 int JobPrivate::protocolVersion() const
280 {
281  return mSession->d->protocolVersion;
282 }
283 //@endcond
284 
286  : KCompositeJob(parent)
287  , d_ptr(new JobPrivate(this))
288 {
289  d_ptr->init(parent);
290 }
291 
292 Job::Job(JobPrivate *dd, QObject *parent)
293  : KCompositeJob(parent)
294  , d_ptr(dd)
295 {
296  d_ptr->init(parent);
297 }
298 
300 {
301  // if there is a job tracer listening, tell it the job is done now
302  if (s_jobtracker) {
303  const QList<QVariant> argumentList = {QString::number(reinterpret_cast<quintptr>(this), 16), errorString()};
304  s_jobtracker->callWithArgumentList(QDBus::NoBlock, QStringLiteral("jobEnded"), argumentList);
305  }
306 
307  delete d_ptr;
308 }
309 
311 {
312 }
313 
315 {
316  Q_D(Job);
317  if (d->mStarted) {
318  // the only way to cancel an already started job is reconnecting to the server
319  d->mSession->d->forceReconnect();
320  }
321  d->mStarted = false;
322  return true;
323 }
324 
326 {
327  QString str;
328  switch (error()) {
329  case NoError:
330  break;
331  case ConnectionFailed:
332  str = i18n("Cannot connect to the Akonadi service.");
333  break;
335  str = i18n("The protocol version of the Akonadi server is incompatible. Make sure you have a compatible version installed.");
336  break;
337  case UserCanceled:
338  str = i18n("User canceled operation.");
339  break;
340  case Unknown:
341  return errorText();
342  case UserError:
343  str = i18n("Unknown error.");
344  break;
345  }
346  if (!errorText().isEmpty()) {
347  str += QStringLiteral(" (%1)").arg(errorText());
348  }
349  return str;
350 }
351 
353 {
354  bool rv = KCompositeJob::addSubjob(job);
355  if (rv) {
356  connect(qobject_cast<Job*>(job), &Job::aboutToStart, this, [this](Job *job) { d_ptr->slotSubJobAboutToStart(job); });
357  QTimer::singleShot(0, this, [this]() { d_ptr->startNext(); });
358  }
359  return rv;
360 }
361 
363 {
364  bool rv = KCompositeJob::removeSubjob(job);
365  if (job == d_ptr->mCurrentSubJob) {
366  d_ptr->mCurrentSubJob = nullptr;
367  QTimer::singleShot(0, this, [this]() { d_ptr->startNext(); });
368  }
369  return rv;
370 }
371 
373 {
374  qCDebug(AKONADICORE_LOG) << this << "Unhandled response: " << tag << Protocol::debugString(response);
375  setError(Unknown);
376  setErrorText(i18n("Unexpected response"));
377  emitResult();
378  return true;
379 }
380 
381 void Job::slotResult(KJob *job)
382 {
383  if (d_ptr->mCurrentSubJob == job) {
384  // current job finished, start the next one
385  d_ptr->mCurrentSubJob = nullptr;
387  if (!job->error()) {
388  QTimer::singleShot(0, this, [this]() { d_ptr->startNext(); });
389  }
390  } else {
391  // job that was still waiting for execution finished, probably canceled,
392  // so just remove it from the queue and move on without caring about
393  // its error code
395  }
396 }
397 
399 {
400  d_ptr->mWriteFinished = true;
401  Q_EMIT writeFinished(this);
402 }
403 
404 #include "moc_job.cpp"
virtual bool addSubjob(KJob *job)
virtual bool doHandleResponse(qint64 tag, const Protocol::CommandPtr &response)
This method should be reimplemented in the concrete jobs in case you want to handle incoming data...
Definition: job.cpp:372
void emitResult()
The server protocol version is too old or too new.
Definition: job.h:100
Unknown error.
Definition: job.h:102
qint64 restart()
void finished(QDBusPendingCallWatcher *self)
QString name() const const
void setError(int errorCode)
QString message() const const
QDBusConnection sessionBus()
Base class for all actions in the Akonadi storage.
Definition: job.h:80
The connection to the Akonadi server failed.
Definition: job.h:99
KCompositeJob(QObject *parent=nullptr)
void start() override
Jobs are started automatically once entering the event loop again, no need to explicitly call this...
Definition: job.cpp:310
static Session * defaultSession()
Returns the default session for this thread.
bool removeSubjob(KJob *job) override
Removes the given subjob of this job.
Definition: job.cpp:362
void writeFinished(Akonadi::Job *job)
This signal is emitted if the job has finished all write operations, ie.
~Job() override
Destroys the job.
Definition: job.cpp:299
void setErrorText(const QString &errorText)
QString number(int n, int base)
virtual void slotResult(KJob *job)
bool isEmpty() const const
void emitWriteFinished()
Call this method to indicate that this job will not call writeData() again.
Definition: job.cpp:398
A communication session with the Akonadi storage.
Definition: core/session.h:54
void deleteLater()
QDBusError error() const const
QString errorString() const final
Returns the error string, if there has been an error, an empty string otherwise.
Definition: job.cpp:325
bool invokeMethod(QObject *obj, const char *member, Qt::ConnectionType type, QGenericReturnArgument ret, QGenericArgument val0, QGenericArgument val1, QGenericArgument val2, QGenericArgument val3, QGenericArgument val4, QGenericArgument val5, QGenericArgument val6, QGenericArgument val7, QGenericArgument val8, QGenericArgument val9)
Job(QObject *parent=nullptr)
Creates a new job.
Definition: job.cpp:285
QString i18n(const char *text, const TYPE &arg...)
Starting point for error codes defined by sub-classes.
Definition: job.h:103
QDBusPendingCall asyncCallWithArgumentList(const QString &method, const QList< QVariant > &args)
Helper integration between Akonadi and Qt.
QString arg(qlonglong a, int fieldWidth, int base, QChar fillChar) const const
bool isValid() const const
void aboutToStart(Akonadi::Job *job)
This signal is emitted directly before the job will be started.
The user canceled this job.
Definition: job.h:101
QueuedConnection
QMetaObject::Connection connect(const QObject *sender, const char *signal, const QObject *receiver, const char *method, Qt::ConnectionType type)
QDBusMessage callWithArgumentList(QDBus::CallMode mode, const QString &method, const QList< QVariant > &args)
qint64 elapsed() const const
bool addSubjob(KJob *job) override
Adds the given job as a subjob to this job.
Definition: job.cpp:352
Q_EMITQ_EMIT
QString errorText() const
bool isError() const const
int error() const
bool doKill() override
Kills the execution of the job.
Definition: job.cpp:314
virtual bool removeSubjob(KJob *job)
This file is part of the KDE documentation.
Documentation copyright © 1996-2020 The KDE developers.
Generated on Wed Aug 12 2020 23:16:10 by doxygen 1.8.11 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.