Akonadi

job.cpp
1/*
2 SPDX-FileCopyrightText: 2006 Tobias Koenig <tokoe@kde.org>
3 2006 Marc Mutz <mutz@kde.org>
4 2006 - 2007 Volker Krause <vkrause@kde.org>
5
6 SPDX-License-Identifier: LGPL-2.0-or-later
7*/
8
9#include "job.h"
10#include "akonadicore_debug.h"
11#include "job_p.h"
12#include "private/instance_p.h"
13#include "private/protocol_p.h"
14#include "session.h"
15#include "session_p.h"
16#include <QDBusConnection>
17#include <QTime>
18
19#include <KLocalizedString>
20
21#include <QDBusConnectionInterface>
22#include <QDBusInterface>
23#include <QElapsedTimer>
24#include <QTimer>
25
26using namespace Akonadi;
27
28static QDBusAbstractInterface *s_jobtracker = nullptr;
29
30/// @cond PRIVATE
31void JobPrivate::handleResponse(qint64 tag, const Protocol::CommandPtr &response)
32{
33 Q_Q(Job);
34
35 if (mCurrentSubJob) {
36 mCurrentSubJob->d_ptr->handleResponse(tag, response);
37 return;
38 }
39
40 if (tag == mTag) {
41 if (response->isResponse()) {
42 const auto &resp = Protocol::cmdCast<Protocol::Response>(response);
43 if (resp.isError()) {
44 q->setError(Job::Unknown);
45 q->setErrorText(resp.errorMessage());
46 q->emitResult();
47 return;
48 }
49 }
50 }
51
52 if (mTag != tag) {
53 qCWarning(AKONADICORE_LOG) << "Received response with a different tag!";
54 qCDebug(AKONADICORE_LOG) << "Response tag:" << tag << ", response type:" << response->type();
55 qCDebug(AKONADICORE_LOG) << "Job tag:" << mTag << ", job:" << q;
56 return;
57 }
58
59 if (mStarted) {
60 if (mReadingFinished) {
61 qCWarning(AKONADICORE_LOG) << "Received response for a job that does not expect any more data, ignoring";
62 qCDebug(AKONADICORE_LOG) << "Response tag:" << tag << ", response type:" << response->type();
63 qCDebug(AKONADICORE_LOG) << "Job tag:" << mTag << ", job:" << q;
64 Q_ASSERT(!mReadingFinished);
65 return;
66 }
67
68 if (q->doHandleResponse(tag, response)) {
69 mReadingFinished = true;
70 QTimer::singleShot(0, q, [this]() {
71 delayedEmitResult();
72 });
73 }
74 }
75}
76
77void JobPrivate::init(QObject *parent)
78{
79 Q_Q(Job);
80
81 mParentJob = qobject_cast<Job *>(parent);
82 mSession = qobject_cast<Session *>(parent);
83
84 if (!mSession) {
85 if (!mParentJob) {
86 mSession = Session::defaultSession();
87 } else {
88 mSession = mParentJob->d_ptr->mSession;
89 }
90 }
91
92 if (!mParentJob) {
93 mSession->d->addJob(q);
94 } else {
95 mParentJob->addSubjob(q);
96 }
97 publishJob();
98}
99
100void JobPrivate::publishJob()
101{
102 Q_Q(Job);
103 // if there's a job tracker running, tell it about the new job
104 if (!s_jobtracker) {
105 // Let's only check for the debugging console every 3 seconds, otherwise every single job
106 // makes a dbus call to the dbus daemon, doesn't help performance.
107 static QElapsedTimer s_lastTime;
108 if (!s_lastTime.isValid() || s_lastTime.elapsed() > 3000) {
109 if (!s_lastTime.isValid()) {
110 s_lastTime.start();
111 }
112 const QString suffix = Akonadi::Instance::identifier().isEmpty() ? QString() : QLatin1Char('-') + Akonadi::Instance::identifier();
113 if (QDBusConnection::sessionBus().interface()->isServiceRegistered(QStringLiteral("org.kde.akonadiconsole") + suffix)) {
114 s_jobtracker = new QDBusInterface(QLatin1StringView("org.kde.akonadiconsole") + suffix,
115 QStringLiteral("/jobtracker"),
116 QStringLiteral("org.freedesktop.Akonadi.JobTracker"),
118 nullptr);
119 mSession->d->publishOtherJobs(q);
120 } else {
121 s_lastTime.restart();
122 }
123 }
124 // Note: we never reset s_jobtracker to 0 when a call fails; but if we did
125 // then we should restart s_lastTime.
126 }
127 QMetaObject::invokeMethod(q, "signalCreationToJobTracker", Qt::QueuedConnection);
128}
129
130void JobPrivate::signalCreationToJobTracker()
131{
132 Q_Q(Job);
133 if (s_jobtracker) {
134 // We do these dbus calls manually, so as to avoid having to install (or copy) the console's
135 // xml interface document. Since this is purely a debugging aid, that seems preferable to
136 // publishing something not intended for public consumption.
137 // WARNING: for any signature change here, apply it to resourcescheduler.cpp too
138 const QList<QVariant> argumentList = QList<QVariant>() << QLatin1StringView(mSession->sessionId()) << QString::number(reinterpret_cast<quintptr>(q), 16)
139 << (mParentJob ? QString::number(reinterpret_cast<quintptr>(mParentJob), 16) : QString())
140 << QString::fromLatin1(q->metaObject()->className()) << jobDebuggingString();
141 QDBusPendingCall call = s_jobtracker->asyncCallWithArgumentList(QStringLiteral("jobCreated"), argumentList);
142
143 auto watcher = new QDBusPendingCallWatcher(call, s_jobtracker);
145 QDBusPendingReply<void> reply = *w;
146 if (reply.isError() && s_jobtracker) {
147 qDebug() << reply.error().name() << reply.error().message();
148 s_jobtracker->deleteLater();
149 s_jobtracker = nullptr;
150 }
151 w->deleteLater();
152 });
153 }
154}
155
156void JobPrivate::signalStartedToJobTracker()
157{
158 Q_Q(Job);
159 if (s_jobtracker) {
160 // if there's a job tracker running, tell it a job started
161 const QList<QVariant> argumentList = {QString::number(reinterpret_cast<quintptr>(q), 16)};
162 s_jobtracker->callWithArgumentList(QDBus::NoBlock, QStringLiteral("jobStarted"), argumentList);
163 }
164}
165
166void JobPrivate::aboutToFinish()
167{
168 // Dummy
169}
170
171void JobPrivate::delayedEmitResult()
172{
173 Q_Q(Job);
174 if (q->hasSubjobs()) {
175 // We still have subjobs, wait for them to finish
176 mFinishPending = true;
177 } else {
178 aboutToFinish();
179 q->emitResult();
180 }
181}
182
183void JobPrivate::startQueued()
184{
185 Q_Q(Job);
186 mStarted = true;
187
188 Q_EMIT q->aboutToStart(q);
189 q->doStart();
190 QTimer::singleShot(0, q, [this]() {
191 startNext();
192 });
193 QMetaObject::invokeMethod(q, "signalStartedToJobTracker", Qt::QueuedConnection);
194}
195
196void JobPrivate::lostConnection()
197{
198 Q_Q(Job);
199
200 if (mCurrentSubJob) {
201 mCurrentSubJob->d_ptr->lostConnection();
202 } else {
203 q->setError(Job::ConnectionFailed);
204 q->emitResult();
205 }
206}
207
208void JobPrivate::slotSubJobAboutToStart(Job *job)
209{
210 Q_ASSERT(mCurrentSubJob == nullptr);
211 mCurrentSubJob = job;
212}
213
214void JobPrivate::startNext()
215{
216 Q_Q(Job);
217
218 if (mStarted && !mCurrentSubJob && q->hasSubjobs()) {
219 Job *job = qobject_cast<Akonadi::Job *>(q->subjobs().at(0));
220 Q_ASSERT(job);
221 job->d_ptr->startQueued();
222 } else if (mFinishPending && !q->hasSubjobs()) {
223 // The last subjob we've been waiting for has finished, emitResult() finally
224 QTimer::singleShot(0, q, [this]() {
225 delayedEmitResult();
226 });
227 }
228}
229
230qint64 JobPrivate::newTag()
231{
232 if (mParentJob) {
233 mTag = mParentJob->d_ptr->newTag();
234 } else {
235 mTag = mSession->d->nextTag();
236 }
237 return mTag;
238}
239
240qint64 JobPrivate::tag() const
241{
242 return mTag;
243}
244
245void JobPrivate::sendCommand(qint64 tag, const Protocol::CommandPtr &cmd)
246{
247 if (mParentJob) {
248 mParentJob->d_ptr->sendCommand(tag, cmd);
249 } else {
250 mSession->d->sendCommand(tag, cmd);
251 }
252}
253
254void JobPrivate::sendCommand(const Protocol::CommandPtr &cmd)
255{
256 sendCommand(newTag(), cmd);
257}
258
259void JobPrivate::itemRevisionChanged(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
260{
261 mSession->d->itemRevisionChanged(itemId, oldRevision, newRevision);
262}
263
264void JobPrivate::updateItemRevision(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
265{
266 Q_Q(Job);
267 const auto &subjobs = q->subjobs();
268 for (KJob *j : subjobs) {
269 auto job = qobject_cast<Akonadi::Job *>(j);
270 if (job) {
271 job->d_ptr->updateItemRevision(itemId, oldRevision, newRevision);
272 }
273 }
274 doUpdateItemRevision(itemId, oldRevision, newRevision);
275}
276
277void JobPrivate::doUpdateItemRevision(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
278{
279 Q_UNUSED(itemId)
280 Q_UNUSED(oldRevision)
281 Q_UNUSED(newRevision)
282}
283
284int JobPrivate::protocolVersion() const
285{
286 return mSession->d->protocolVersion;
287}
288/// @endcond
289
291 : KCompositeJob(parent)
292 , d_ptr(new JobPrivate(this))
293{
294 d_ptr->init(parent);
295}
296
297Job::Job(JobPrivate *dd, QObject *parent)
298 : KCompositeJob(parent)
299 , d_ptr(dd)
300{
301 d_ptr->init(parent);
302}
303
305{
306 // if there is a job tracer listening, tell it the job is done now
307 if (s_jobtracker) {
308 const QList<QVariant> argumentList = {QString::number(reinterpret_cast<quintptr>(this), 16), errorString()};
309 s_jobtracker->callWithArgumentList(QDBus::NoBlock, QStringLiteral("jobEnded"), argumentList);
310 }
311}
312
314{
315}
316
318{
319 Q_D(Job);
320 if (d->mStarted) {
321 // the only way to cancel an already started job is reconnecting to the server
322 d->mSession->d->forceReconnect();
323 }
324 d->mStarted = false;
325 return true;
326}
327
329{
330 QString str;
331 switch (error()) {
332 case NoError:
333 break;
334 case ConnectionFailed:
335 str = i18n("Cannot connect to the Akonadi service.");
336 break;
338 str = i18n("The protocol version of the Akonadi server is incompatible. Make sure you have a compatible version installed.");
339 break;
340 case UserCanceled:
341 str = i18n("User canceled operation.");
342 break;
343 case Unknown:
344 return errorText();
345 case UserError:
346 str = i18n("Unknown error.");
347 break;
348 }
349 if (!errorText().isEmpty()) {
350 str += QStringLiteral(" (%1)").arg(errorText());
351 }
352 return str;
353}
354
356{
357 bool rv = KCompositeJob::addSubjob(job);
358 if (rv) {
359 connect(qobject_cast<Job *>(job), &Job::aboutToStart, this, [this](Job *job) {
360 d_ptr->slotSubJobAboutToStart(job);
361 });
362 QTimer::singleShot(0, this, [this]() {
363 d_ptr->startNext();
364 });
365 }
366 return rv;
367}
368
370{
372 if (job == d_ptr->mCurrentSubJob) {
373 d_ptr->mCurrentSubJob = nullptr;
374 QTimer::singleShot(0, this, [this]() {
375 d_ptr->startNext();
376 });
377 }
378 return rv;
379}
380
382{
383 qCDebug(AKONADICORE_LOG) << this << "Unhandled response: " << tag << Protocol::debugString(response);
384 setError(Unknown);
385 setErrorText(i18n("Unexpected response"));
386 emitResult();
387 return true;
388}
389
390void Job::slotResult(KJob *job)
391{
392 if (d_ptr->mCurrentSubJob == job) {
393 // current job finished, start the next one
394 d_ptr->mCurrentSubJob = nullptr;
396 if (!job->error()) {
397 QTimer::singleShot(0, this, [this]() {
398 d_ptr->startNext();
399 });
400 }
401 } else {
402 // job that was still waiting for execution finished, probably canceled,
403 // so just remove it from the queue and move on without caring about
404 // its error code
406 }
407}
408
410{
411 d_ptr->mWriteFinished = true;
412 Q_EMIT writeFinished(this);
413}
414
415#include "moc_job.cpp"
qint64 Id
Describes the unique id type.
Definition item.h:106
Base class for all actions in the Akonadi storage.
Definition job.h:81
QString errorString() const final
Returns the error string, if there has been an error, an empty string otherwise.
Definition job.cpp:328
void aboutToStart(Akonadi::Job *job)
This signal is emitted directly before the job will be started.
virtual bool doHandleResponse(qint64 tag, const Protocol::CommandPtr &response)
This method should be reimplemented in the concrete jobs in case you want to handle incoming data.
Definition job.cpp:381
Job(QObject *parent=nullptr)
Creates a new job.
Definition job.cpp:290
bool doKill() override
Kills the execution of the job.
Definition job.cpp:317
@ Unknown
Unknown error.
Definition job.h:102
@ ProtocolVersionMismatch
The server protocol version is too old or too new.
Definition job.h:100
@ ConnectionFailed
The connection to the Akonadi server failed.
Definition job.h:99
@ UserError
Starting point for error codes defined by sub-classes.
Definition job.h:103
@ UserCanceled
The user canceled this job.
Definition job.h:101
void emitWriteFinished()
Call this method to indicate that this job will not call writeData() again.
Definition job.cpp:409
bool removeSubjob(KJob *job) override
Removes the given subjob of this job.
Definition job.cpp:369
void writeFinished(Akonadi::Job *job)
This signal is emitted if the job has finished all write operations, ie.
bool addSubjob(KJob *job) override
Adds the given job as a subjob to this job.
Definition job.cpp:355
~Job() override
Destroys the job.
Definition job.cpp:304
void start() override
Jobs are started automatically once entering the event loop again, no need to explicitly call this.
Definition job.cpp:313
virtual bool addSubjob(KJob *job)
virtual bool removeSubjob(KJob *job)
virtual void slotResult(KJob *job)
int error() const
QString errorText() const
QString i18n(const char *text, const TYPE &arg...)
Helper integration between Akonadi and Qt.
KLEO_EXPORT std::unique_ptr< GpgME::DefaultAssuanTransaction > sendCommand(std::shared_ptr< GpgME::Context > &assuanContext, const std::string &command, GpgME::Error &err)
QDBusPendingCall asyncCallWithArgumentList(const QString &method, const QList< QVariant > &args)
QDBusMessage callWithArgumentList(QDBus::CallMode mode, const QString &method, const QList< QVariant > &args)
QDBusConnection sessionBus()
void finished(QDBusPendingCallWatcher *self)
bool isError() const const
qint64 elapsed() const const
bool isValid() const const
qint64 restart()
bool invokeMethod(QObject *context, Functor &&function, FunctorReturnType *ret)
Q_EMITQ_EMIT
QMetaObject::Connection connect(const QObject *sender, PointerToMemberFunction signal, Functor functor)
void deleteLater()
QObject * parent() const const
T qobject_cast(QObject *object)
bool isEmpty() const const
QString number(double n, char format, int precision)
QueuedConnection
Q_D(Todo)
This file is part of the KDE documentation.
Documentation copyright © 1996-2024 The KDE developers.
Generated on Tue Mar 26 2024 11:13:38 by doxygen 1.10.0 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.