Akonadi

job.cpp
1 /*
2  Copyright (c) 2006 Tobias Koenig <[email protected]>
3  2006 Marc Mutz <[email protected]>
4  2006 - 2007 Volker Krause <[email protected]>
5 
6  This library is free software; you can redistribute it and/or modify it
7  under the terms of the GNU Library General Public License as published by
8  the Free Software Foundation; either version 2 of the License, or (at your
9  option) any later version.
10 
11  This library is distributed in the hope that it will be useful, but WITHOUT
12  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
14  License for more details.
15 
16  You should have received a copy of the GNU Library General Public License
17  along with this library; see the file COPYING.LIB. If not, write to the
18  Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  02110-1301, USA.
20 */
21 
22 #include "job.h"
23 #include "job_p.h"
24 #include "akonadicore_debug.h"
25 #include <QDBusConnection>
26 #include <QTime>
27 #include "private/protocol_p.h"
28 #include "private/instance_p.h"
29 #include "session.h"
30 #include "session_p.h"
31 
32 #include <KLocalizedString>
33 
34 #include <QTimer>
35 #include <QElapsedTimer>
36 #include <QDBusInterface>
37 #include <QDBusConnectionInterface>
38 
39 using namespace Akonadi;
40 
41 static QDBusAbstractInterface *s_jobtracker = nullptr;
42 
43 //@cond PRIVATE
44 void JobPrivate::handleResponse(qint64 tag, const Protocol::CommandPtr &response)
45 {
46  Q_Q(Job);
47 
48  if (mCurrentSubJob) {
49  mCurrentSubJob->d_ptr->handleResponse(tag, response);
50  return;
51  }
52 
53  if (tag == mTag) {
54  if (response->isResponse()) {
55  const auto &resp = Protocol::cmdCast<Protocol::Response>(response);
56  if (resp.isError()) {
57  q->setError(Job::Unknown);
58  q->setErrorText(resp.errorMessage());
59  q->emitResult();
60  return;
61  }
62  }
63  }
64 
65  if (mTag != tag) {
66  qCWarning(AKONADICORE_LOG) << "Received response with a different tag!";
67  qCDebug(AKONADICORE_LOG) << "Response tag:" << tag << ", response type:" << response->type();
68  qCDebug(AKONADICORE_LOG) << "Job tag:" << mTag << ", job:" << q;
69  return;
70  }
71 
72  if (mStarted) {
73  if (mReadingFinished) {
74  qCWarning(AKONADICORE_LOG) << "Received response for a job that does not expect any more data, ignoring";
75  qCDebug(AKONADICORE_LOG) << "Response tag:" << tag << ", response type:" << response->type();
76  qCDebug(AKONADICORE_LOG) << "Job tag:" << mTag << ", job:" << q;
77  Q_ASSERT(!mReadingFinished);
78  return;
79  }
80 
81  if (q->doHandleResponse(tag, response)) {
82  mReadingFinished = true;
83  QTimer::singleShot(0, q, [this]() {delayedEmitResult(); });
84  }
85  }
86 }
87 
88 void JobPrivate::init(QObject *parent)
89 {
90  Q_Q(Job);
91 
92  mParentJob = qobject_cast<Job *>(parent);
93  mSession = qobject_cast<Session *>(parent);
94 
95  if (!mSession) {
96  if (!mParentJob) {
97  mSession = Session::defaultSession();
98  } else {
99  mSession = mParentJob->d_ptr->mSession;
100  }
101  }
102 
103  if (!mParentJob) {
104  mSession->d->addJob(q);
105  } else {
106  mParentJob->addSubjob(q);
107  }
108  publishJob();
109 }
110 
111 void JobPrivate::publishJob()
112 {
113  Q_Q(Job);
114  // if there's a job tracker running, tell it about the new job
115  if (!s_jobtracker) {
116  // Let's only check for the debugging console every 3 seconds, otherwise every single job
117  // makes a dbus call to the dbus daemon, doesn't help performance.
118  static QElapsedTimer s_lastTime;
119  if (!s_lastTime.isValid() || s_lastTime.elapsed() > 3000) {
120  if (!s_lastTime.isValid()) {
121  s_lastTime.start();
122  }
123  const QString suffix = Akonadi::Instance::identifier().isEmpty() ? QString() : QLatin1Char('-') + Akonadi::Instance::identifier();
124  if (QDBusConnection::sessionBus().interface()->isServiceRegistered(QStringLiteral("org.kde.akonadiconsole") + suffix)) {
125  s_jobtracker = new QDBusInterface(QLatin1String("org.kde.akonadiconsole") + suffix,
126  QStringLiteral("/jobtracker"),
127  QStringLiteral("org.freedesktop.Akonadi.JobTracker"),
128  QDBusConnection::sessionBus(), nullptr);
129  mSession->d->publishOtherJobs(q);
130  } else {
131  s_lastTime.restart();
132  }
133  }
134  // Note: we never reset s_jobtracker to 0 when a call fails; but if we did
135  // then we should restart s_lastTime.
136  }
137  QMetaObject::invokeMethod(q, "signalCreationToJobTracker", Qt::QueuedConnection);
138 }
139 
140 void JobPrivate::signalCreationToJobTracker()
141 {
142  Q_Q(Job);
143  if (s_jobtracker) {
144  // We do these dbus calls manually, so as to avoid having to install (or copy) the console's
145  // xml interface document. Since this is purely a debugging aid, that seems preferable to
146  // publishing something not intended for public consumption.
147  // WARNING: for any signature change here, apply it to resourcescheduler.cpp too
148  const QList<QVariant> argumentList = QList<QVariant>() << QLatin1String(mSession->sessionId())
149  << QString::number(reinterpret_cast<quintptr>(q), 16)
150  << (mParentJob ? QString::number(reinterpret_cast<quintptr>(mParentJob), 16) : QString())
151  << QString::fromLatin1(q->metaObject()->className())
152  << jobDebuggingString();
153  QDBusPendingCall call = s_jobtracker->asyncCallWithArgumentList(QStringLiteral("jobCreated"), argumentList);
154 
155  QDBusPendingCallWatcher *watcher = new QDBusPendingCallWatcher(call, s_jobtracker);
157  QDBusPendingReply<void> reply = *w;
158  if (reply.isError() && s_jobtracker) {
159  qDebug() << reply.error().name() << reply.error().message();
160  s_jobtracker->deleteLater();
161  s_jobtracker = nullptr;
162  }
163  w->deleteLater();
164  });
165  }
166 }
167 
168 void JobPrivate::signalStartedToJobTracker()
169 {
170  Q_Q(Job);
171  if (s_jobtracker) {
172  // if there's a job tracker running, tell it a job started
173  const QList<QVariant> argumentList = { QString::number(reinterpret_cast<quintptr>(q), 16) };
174  s_jobtracker->callWithArgumentList(QDBus::NoBlock, QStringLiteral("jobStarted"), argumentList);
175  }
176 }
177 
178 void JobPrivate::aboutToFinish()
179 {
180  // Dummy
181 }
182 
183 void JobPrivate::delayedEmitResult()
184 {
185  Q_Q(Job);
186  if (q->hasSubjobs()) {
187  // We still have subjobs, wait for them to finish
188  mFinishPending = true;
189  } else {
190  aboutToFinish();
191  q->emitResult();
192  }
193 }
194 
195 void JobPrivate::startQueued()
196 {
197  Q_Q(Job);
198  mStarted = true;
199 
200  Q_EMIT q->aboutToStart(q);
201  q->doStart();
202  QTimer::singleShot(0, q, [this]() { startNext(); });
203  QMetaObject::invokeMethod(q, "signalStartedToJobTracker", Qt::QueuedConnection);
204 }
205 
206 void JobPrivate::lostConnection()
207 {
208  Q_Q(Job);
209 
210  if (mCurrentSubJob) {
211  mCurrentSubJob->d_ptr->lostConnection();
212  } else {
213  q->setError(Job::ConnectionFailed);
214  q->emitResult();
215  }
216 }
217 
218 void JobPrivate::slotSubJobAboutToStart(Job *job)
219 {
220  Q_ASSERT(mCurrentSubJob == nullptr);
221  mCurrentSubJob = job;
222 }
223 
224 void JobPrivate::startNext()
225 {
226  Q_Q(Job);
227 
228  if (mStarted && !mCurrentSubJob && q->hasSubjobs()) {
229  Job *job = qobject_cast<Akonadi::Job *>(q->subjobs().at(0));
230  Q_ASSERT(job);
231  job->d_ptr->startQueued();
232  } else if (mFinishPending && !q->hasSubjobs()) {
233  // The last subjob we've been waiting for has finished, emitResult() finally
234  QTimer::singleShot(0, q, [this]() {delayedEmitResult(); });
235  }
236 }
237 
238 qint64 JobPrivate::newTag()
239 {
240  if (mParentJob) {
241  mTag = mParentJob->d_ptr->newTag();
242  } else {
243  mTag = mSession->d->nextTag();
244  }
245  return mTag;
246 }
247 
248 qint64 JobPrivate::tag() const
249 {
250  return mTag;
251 }
252 
253 void JobPrivate::sendCommand(qint64 tag, const Protocol::CommandPtr &cmd)
254 {
255  if (mParentJob) {
256  mParentJob->d_ptr->sendCommand(tag, cmd);
257  } else {
258  mSession->d->sendCommand(tag, cmd);
259  }
260 }
261 
262 void JobPrivate::sendCommand(const Protocol::CommandPtr &cmd)
263 {
264  sendCommand(newTag(), cmd);
265 }
266 
267 void JobPrivate::itemRevisionChanged(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
268 {
269  mSession->d->itemRevisionChanged(itemId, oldRevision, newRevision);
270 }
271 
272 void JobPrivate::updateItemRevision(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
273 {
274  Q_Q(Job);
275  const auto subjobs = q->subjobs();
276  for (KJob *j : subjobs) {
277  Akonadi::Job *job = qobject_cast<Akonadi::Job *>(j);
278  if (job) {
279  job->d_ptr->updateItemRevision(itemId, oldRevision, newRevision);
280  }
281  }
282  doUpdateItemRevision(itemId, oldRevision, newRevision);
283 }
284 
285 void JobPrivate::doUpdateItemRevision(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
286 {
287  Q_UNUSED(itemId);
288  Q_UNUSED(oldRevision);
289  Q_UNUSED(newRevision);
290 }
291 
292 int JobPrivate::protocolVersion() const
293 {
294  return mSession->d->protocolVersion;
295 }
296 //@endcond
297 
299  : KCompositeJob(parent)
300  , d_ptr(new JobPrivate(this))
301 {
302  d_ptr->init(parent);
303 }
304 
305 Job::Job(JobPrivate *dd, QObject *parent)
306  : KCompositeJob(parent)
307  , d_ptr(dd)
308 {
309  d_ptr->init(parent);
310 }
311 
313 {
314  delete d_ptr;
315 
316  // if there is a job tracer listening, tell it the job is done now
317  if (s_jobtracker) {
318  const QList<QVariant> argumentList = {QString::number(reinterpret_cast<quintptr>(this), 16), errorString()};
319  s_jobtracker->callWithArgumentList(QDBus::NoBlock, QStringLiteral("jobEnded"), argumentList);
320  }
321 }
322 
324 {
325 }
326 
328 {
329  Q_D(Job);
330  if (d->mStarted) {
331  // the only way to cancel an already started job is reconnecting to the server
332  d->mSession->d->forceReconnect();
333  }
334  d->mStarted = false;
335  return true;
336 }
337 
339 {
340  QString str;
341  switch (error()) {
342  case NoError:
343  break;
344  case ConnectionFailed:
345  str = i18n("Cannot connect to the Akonadi service.");
346  break;
348  str = i18n("The protocol version of the Akonadi server is incompatible. Make sure you have a compatible version installed.");
349  break;
350  case UserCanceled:
351  str = i18n("User canceled operation.");
352  break;
353  case Unknown:
354  return errorText();
355  case UserError:
356  str = i18n("Unknown error.");
357  break;
358  }
359  if (!errorText().isEmpty()) {
360  str += QStringLiteral(" (%1)").arg(errorText());
361  }
362  return str;
363 }
364 
366 {
367  bool rv = KCompositeJob::addSubjob(job);
368  if (rv) {
369  connect(qobject_cast<Job*>(job), &Job::aboutToStart, this, [this](Job *job) { d_ptr->slotSubJobAboutToStart(job); });
370  QTimer::singleShot(0, this, [this]() { d_ptr->startNext(); });
371  }
372  return rv;
373 }
374 
376 {
377  bool rv = KCompositeJob::removeSubjob(job);
378  if (job == d_ptr->mCurrentSubJob) {
379  d_ptr->mCurrentSubJob = nullptr;
380  QTimer::singleShot(0, this, [this]() { d_ptr->startNext(); });
381  }
382  return rv;
383 }
384 
386 {
387  qCDebug(AKONADICORE_LOG) << this << "Unhandled response: " << tag << Protocol::debugString(response);
388  setError(Unknown);
389  setErrorText(i18n("Unexpected response"));
390  emitResult();
391  return true;
392 }
393 
394 void Job::slotResult(KJob *job)
395 {
396  if (d_ptr->mCurrentSubJob == job) {
397  // current job finished, start the next one
398  d_ptr->mCurrentSubJob = nullptr;
400  if (!job->error()) {
401  QTimer::singleShot(0, this, [this]() { d_ptr->startNext(); });
402  }
403  } else {
404  // job that was still waiting for execution finished, probably canceled,
405  // so just remove it from the queue and move on without caring about
406  // its error code
408  }
409 }
410 
412 {
413  d_ptr->mWriteFinished = true;
414  Q_EMIT writeFinished(this);
415 }
416 
417 #include "moc_job.cpp"
virtual bool addSubjob(KJob *job)
virtual bool doHandleResponse(qint64 tag, const Protocol::CommandPtr &response)
This method should be reimplemented in the concrete jobs in case you want to handle incoming data...
Definition: job.cpp:385
void emitResult()
The server protocol version is too old or too new.
Definition: job.h:113
Unknown error.
Definition: job.h:115
QString errorString() const override
Returns the error string, if there has been an error, an empty string otherwise.
Definition: job.cpp:338
qint64 restart()
void finished(QDBusPendingCallWatcher *self)
QString name() const const
void setError(int errorCode)
QString message() const const
QDBusConnection sessionBus()
Base class for all actions in the Akonadi storage.
Definition: job.h:93
The connection to the Akonadi server failed.
Definition: job.h:112
KCompositeJob(QObject *parent=nullptr)
void start() override
Jobs are started automatically once entering the event loop again, no need to explicitly call this...
Definition: job.cpp:323
static Session * defaultSession()
Returns the default session for this thread.
bool removeSubjob(KJob *job) override
Removes the given subjob of this job.
Definition: job.cpp:375
void writeFinished(Akonadi::Job *job)
This signal is emitted if the job has finished all write operations, ie.
~Job() override
Destroys the job.
Definition: job.cpp:312
void setErrorText(const QString &errorText)
QString number(int n, int base)
virtual void slotResult(KJob *job)
bool isEmpty() const const
void emitWriteFinished()
Call this method to indicate that this job will not call writeData() again.
Definition: job.cpp:411
A communication session with the Akonadi storage.
Definition: core/session.h:67
void deleteLater()
QDBusError error() const const
bool invokeMethod(QObject *obj, const char *member, Qt::ConnectionType type, QGenericReturnArgument ret, QGenericArgument val0, QGenericArgument val1, QGenericArgument val2, QGenericArgument val3, QGenericArgument val4, QGenericArgument val5, QGenericArgument val6, QGenericArgument val7, QGenericArgument val8, QGenericArgument val9)
Job(QObject *parent=nullptr)
Creates a new job.
Definition: job.cpp:298
QString i18n(const char *text, const TYPE &arg...)
Starting point for error codes defined by sub-classes.
Definition: job.h:116
QDBusPendingCall asyncCallWithArgumentList(const QString &method, const QList< QVariant > &args)
Helper integration between Akonadi and Qt.
QString arg(qlonglong a, int fieldWidth, int base, QChar fillChar) const const
bool isValid() const const
void aboutToStart(Akonadi::Job *job)
This signal is emitted directly before the job will be started.
The user canceled this job.
Definition: job.h:114
QMetaObject::Connection connect(const QObject *sender, const char *signal, const QObject *receiver, const char *method, Qt::ConnectionType type)
QDBusMessage callWithArgumentList(QDBus::CallMode mode, const QString &method, const QList< QVariant > &args)
qint64 elapsed() const const
bool addSubjob(KJob *job) override
Adds the given job as a subjob to this job.
Definition: job.cpp:365
Q_EMITQ_EMIT
QString errorText() const
bool isError() const const
int error() const
bool doKill() override
Kills the execution of the job.
Definition: job.cpp:327
virtual bool removeSubjob(KJob *job)
This file is part of the KDE documentation.
Documentation copyright © 1996-2020 The KDE developers.
Generated on Thu Jun 4 2020 23:08:42 by doxygen 1.8.11 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.