Akonadi

resourcebase.cpp
1/*
2 SPDX-FileCopyrightText: 2006 Till Adam <adam@kde.org>
3 SPDX-FileCopyrightText: 2007 Volker Krause <vkrause@kde.org>
4
5 SPDX-License-Identifier: LGPL-2.0-or-later
6*/
7
8#include "resourcebase.h"
9#include "agentbase_p.h"
10
11#include "akonadifull-version.h"
12#include "collectiondeletejob.h"
13#include "collectionsync_p.h"
14#include "resourceadaptor.h"
15#include "resourcescheduler_p.h"
16#include "tagsync.h"
17#include "tracerinterface.h"
18#include <QDBusConnection>
19
20#include "changerecorder.h"
21#include "collectionfetchjob.h"
22#include "collectionfetchscope.h"
23#include "collectionmodifyjob.h"
24#include "favoritecollectionattribute.h"
25#include "invalidatecachejob_p.h"
26#include "itemcreatejob.h"
27#include "itemfetchjob.h"
28#include "itemfetchscope.h"
29#include "itemmodifyjob.h"
30#include "itemmodifyjob_p.h"
31#include "monitor_p.h"
32#include "recursivemover_p.h"
33#include "resourceselectjob_p.h"
34#include "servermanager_p.h"
35#include "session.h"
36#include "specialcollectionattribute.h"
37#include "tagmodifyjob.h"
38
39#include "akonadiagentbase_debug.h"
40
41#include "shared/akranges.h"
42#include <cstdlib>
43#include <iterator>
44
45#include <KAboutData>
46#include <KCrash>
47#include <KLocalizedString>
48
49#include <QApplication>
50#include <QHash>
51#include <QTimer>
52
53using namespace Akonadi;
54using namespace AkRanges;
55using namespace std::chrono_literals;
56class Akonadi::ResourceBasePrivate : public AgentBasePrivate
57{
58 Q_OBJECT
59 Q_CLASSINFO("D-Bus Interface", "org.kde.dfaure")
60
61public:
62 explicit ResourceBasePrivate(ResourceBase *parent)
63 : AgentBasePrivate(parent)
64 , scheduler(nullptr)
65 , mItemSyncer(nullptr)
66 , mItemTransactionMode(ItemSync::SingleTransaction)
67 , mItemMergeMode(ItemSync::RIDMerge)
68 , mCollectionSyncer(nullptr)
69 , mTagSyncer(nullptr)
70 , mHierarchicalRid(false)
71 , mUnemittedProgress(0)
72 , mAutomaticProgressReporting(true)
73 , mDisableAutomaticItemDeliveryDone(false)
74 , mItemSyncBatchSize(10)
75 , mCurrentCollectionFetchJob(nullptr)
76 , mScheduleAttributeSyncBeforeCollectionSync(false)
77 {
78 Internal::setClientType(Internal::Resource);
79 mStatusMessage = defaultReadyMessage();
80 mProgressEmissionCompressor.setInterval(1000);
81 mProgressEmissionCompressor.setSingleShot(true);
82 // HACK: skip local changes of the EntityDisplayAttribute by default. Remove this for KDE5 and adjust resource implementations accordingly.
83 mKeepLocalCollectionChanges << "ENTITYDISPLAY";
84 }
85
86 ~ResourceBasePrivate() override = default;
87
88 Q_DECLARE_PUBLIC(ResourceBase)
89
90 void delayedInit() override
91 {
92 const QString serviceId = ServerManager::agentServiceName(ServerManager::Resource, mId);
93 if (!QDBusConnection::sessionBus().registerService(serviceId)) {
95 if (reason.isEmpty()) {
96 reason = QStringLiteral("this service is probably running already.");
97 }
98 qCCritical(AKONADIAGENTBASE_LOG) << "Unable to register service" << serviceId << "at D-Bus:" << reason;
99
102 }
103 } else {
104 AgentBasePrivate::delayedInit();
105 }
106 }
107
108 void changeProcessed() override
109 {
110 if (m_recursiveMover) {
111 m_recursiveMover->changeProcessed();
112 QTimer::singleShot(0s, m_recursiveMover.data(), &RecursiveMover::replayNext);
113 return;
114 }
115
116 mChangeRecorder->changeProcessed();
117 if (!mChangeRecorder->isEmpty()) {
118 scheduler->scheduleChangeReplay();
119 }
120 scheduler->taskDone();
121 }
122
123 void slotAbortRequested();
124
125 void slotDeliveryDone(KJob *job);
126 void slotCollectionSyncDone(KJob *job);
127 void slotLocalListDone(KJob *job);
128 void slotSynchronizeCollection(const Collection &col);
129 void slotItemRetrievalCollectionFetchDone(KJob *job);
130 void slotCollectionListDone(KJob *job);
131 void slotSynchronizeCollectionAttributes(const Collection &col);
132 void slotCollectionListForAttributesDone(KJob *job);
133 void slotCollectionAttributesSyncDone(KJob *job);
134 void slotSynchronizeTags();
135 void slotAttributeRetrievalCollectionFetchDone(KJob *job);
136
137 void slotItemSyncDone(KJob *job);
138
139 void slotPercent(KJob *job, quint64 percent);
140 void slotDelayedEmitProgress();
141 void slotDeleteResourceCollection();
142 void slotDeleteResourceCollectionDone(KJob *job);
143 void slotCollectionDeletionDone(KJob *job);
144
145 void slotInvalidateCache(const Akonadi::Collection &collection);
146
147 void slotPrepareItemRetrieval(const Akonadi::Item &item);
148 void slotPrepareItemRetrievalResult(KJob *job);
149
150 void slotPrepareItemsRetrieval(const QList<Akonadi::Item> &item);
151 void slotPrepareItemsRetrievalResult(KJob *job);
152
153 void changeCommittedResult(KJob *job);
154
155 void slotRecursiveMoveReplay(RecursiveMover *mover);
156 void slotRecursiveMoveReplayResult(KJob *job);
157
158 void slotTagSyncDone(KJob *job);
159
160 void slotSessionReconnected()
161 {
162 Q_Q(ResourceBase);
163
164 new ResourceSelectJob(q->identifier());
165 }
166
167 void createItemSyncInstanceIfMissing()
168 {
169 Q_Q(ResourceBase);
170 Q_ASSERT_X(scheduler->currentTask().type == ResourceScheduler::SyncCollection,
171 "createItemSyncInstance",
172 "Calling items retrieval methods although no item retrieval is in progress");
173 if (!mItemSyncer) {
174 mItemSyncer = new ItemSync(q->currentCollection(), mCollectionSyncTimestamp);
175 mItemSyncer->setTransactionMode(mItemTransactionMode);
176 mItemSyncer->setBatchSize(mItemSyncBatchSize);
177 mItemSyncer->setMergeMode(mItemMergeMode);
178 mItemSyncer->setDisableAutomaticDeliveryDone(mDisableAutomaticItemDeliveryDone);
179 mItemSyncer->setProperty("collection", QVariant::fromValue(q->currentCollection()));
180 connect(mItemSyncer, &KJob::percentChanged, this,
181 &ResourceBasePrivate::slotPercent); // NOLINT(google-runtime-int): ulong comes from KJob
182
183 connect(mItemSyncer, &KJob::result, this, &ResourceBasePrivate::slotItemSyncDone);
185 }
186 Q_ASSERT(mItemSyncer);
187 }
188
189public Q_SLOTS:
190 // Dump the state of the scheduler
191 Q_SCRIPTABLE QString dumpToString() const
192 {
193 Q_Q(const ResourceBase);
194 return scheduler->dumpToString() + QLatin1Char('\n') + q->dumpResourceToString();
195 }
196
197 Q_SCRIPTABLE void dump()
198 {
199 scheduler->dump();
200 }
201
202 Q_SCRIPTABLE void clear()
203 {
204 scheduler->clear();
205 }
206
207protected Q_SLOTS:
208 // reimplementations from AgentbBasePrivate, containing sanity checks that only apply to resources
209 // such as making sure that RIDs are present as well as translations of cross-resource moves
210 // TODO: we could possibly add recovery code for no-RID notifications by re-enquing those to the change recorder
211 // as the corresponding Add notifications, although that contains a risk of endless fail/retry loops
212
213 void itemAdded(const Akonadi::Item &item, const Akonadi::Collection &collection) override
214 {
215 if (collection.remoteId().isEmpty()) {
216 changeProcessed();
217 return;
218 }
219 AgentBasePrivate::itemAdded(item, collection);
220 }
221
222 void itemChanged(const Akonadi::Item &item, const QSet<QByteArray> &partIdentifiers) override
223 {
224 if (item.remoteId().isEmpty()) {
225 changeProcessed();
226 return;
227 }
228 AgentBasePrivate::itemChanged(item, partIdentifiers);
229 }
230
231 void itemsFlagsChanged(const Akonadi::Item::List &items, const QSet<QByteArray> &addedFlags, const QSet<QByteArray> &removedFlags) override
232 {
233 if (addedFlags.isEmpty() && removedFlags.isEmpty()) {
234 changeProcessed();
235 return;
236 }
237
238 const Item::List validItems = filterValidItems(items);
239 if (validItems.isEmpty()) {
240 changeProcessed();
241 return;
242 }
243
244 AgentBasePrivate::itemsFlagsChanged(validItems, addedFlags, removedFlags);
245 }
246
247 void itemsTagsChanged(const Akonadi::Item::List &items, const QSet<Akonadi::Tag> &addedTags, const QSet<Akonadi::Tag> &removedTags) override
248 {
249 if (addedTags.isEmpty() && removedTags.isEmpty()) {
250 changeProcessed();
251 return;
252 }
253
254 const Item::List validItems = filterValidItems(items);
255 if (validItems.isEmpty()) {
256 changeProcessed();
257 return;
258 }
259
260 AgentBasePrivate::itemsTagsChanged(validItems, addedTags, removedTags);
261 }
262
263 // TODO move the move translation code from AgentBasePrivate here, it's wrong for agents
264 void itemMoved(const Akonadi::Item &item, const Akonadi::Collection &source, const Akonadi::Collection &destination) override
265 {
266 if (item.remoteId().isEmpty() || destination.remoteId().isEmpty() || destination == source) {
267 changeProcessed();
268 return;
269 }
270 AgentBasePrivate::itemMoved(item, source, destination);
271 }
272
273 void itemsMoved(const Akonadi::Item::List &items, const Akonadi::Collection &source, const Akonadi::Collection &destination) override
274 {
275 if (destination.remoteId().isEmpty() || destination == source) {
276 changeProcessed();
277 return;
278 }
279
280 const Item::List validItems = filterValidItems(items);
281 if (validItems.isEmpty()) {
282 changeProcessed();
283 return;
284 }
285
286 AgentBasePrivate::itemsMoved(validItems, source, destination);
287 }
288
289 void itemRemoved(const Akonadi::Item &item) override
290 {
291 if (item.remoteId().isEmpty()) {
292 changeProcessed();
293 return;
294 }
295 AgentBasePrivate::itemRemoved(item);
296 }
297
298 void itemsRemoved(const Akonadi::Item::List &items) override
299 {
300 const Item::List validItems = filterValidItems(items);
301 if (validItems.isEmpty()) {
302 changeProcessed();
303 return;
304 }
305
306 AgentBasePrivate::itemsRemoved(validItems);
307 }
308
309 void collectionAdded(const Akonadi::Collection &collection, const Akonadi::Collection &parent) override
310 {
311 if (parent.remoteId().isEmpty()) {
312 changeProcessed();
313 return;
314 }
315 AgentBasePrivate::collectionAdded(collection, parent);
316 }
317
318 void collectionChanged(const Akonadi::Collection &collection) override
319 {
320 if (collection.remoteId().isEmpty()) {
321 changeProcessed();
322 return;
323 }
324 AgentBasePrivate::collectionChanged(collection);
325 }
326
327 void collectionChanged(const Akonadi::Collection &collection, const QSet<QByteArray> &partIdentifiers) override
328 {
329 if (collection.remoteId().isEmpty()) {
330 changeProcessed();
331 return;
332 }
333 AgentBasePrivate::collectionChanged(collection, partIdentifiers);
334 }
335
336 void collectionMoved(const Akonadi::Collection &collection, const Akonadi::Collection &source, const Akonadi::Collection &destination) override
337 {
338 // unknown destination or source == destination means we can't do/don't have to do anything
339 if (destination.remoteId().isEmpty() || source == destination) {
340 changeProcessed();
341 return;
342 }
343
344 // inter-resource moves, requires we know which resources the source and destination are in though
345 if (!source.resource().isEmpty() && !destination.resource().isEmpty() && source.resource() != destination.resource()) {
346 if (source.resource() == q_ptr->identifier()) { // moved away from us
347 AgentBasePrivate::collectionRemoved(collection);
348 } else if (destination.resource() == q_ptr->identifier()) { // moved to us
349 scheduler->taskDone(); // stop change replay for now
350 auto mover = new RecursiveMover(this);
351 mover->setCollection(collection, destination);
352 scheduler->scheduleMoveReplay(collection, mover);
353 }
354 return;
355 }
356
357 // intra-resource move, requires the moved collection to have a valid id though
358 if (collection.remoteId().isEmpty()) {
359 changeProcessed();
360 return;
361 }
362
363 // intra-resource move, ie. something we can handle internally
364 AgentBasePrivate::collectionMoved(collection, source, destination);
365 }
366
367 void collectionRemoved(const Akonadi::Collection &collection) override
368 {
369 if (collection.remoteId().isEmpty()) {
370 changeProcessed();
371 return;
372 }
373 AgentBasePrivate::collectionRemoved(collection);
374 }
375
376 void tagAdded(const Akonadi::Tag &tag) override
377 {
378 if (!tag.isValid()) {
379 changeProcessed();
380 return;
381 }
382
383 AgentBasePrivate::tagAdded(tag);
384 }
385
386 void tagChanged(const Akonadi::Tag &tag) override
387 {
388 if (tag.remoteId().isEmpty()) {
389 changeProcessed();
390 return;
391 }
392
393 AgentBasePrivate::tagChanged(tag);
394 }
395
396 void tagRemoved(const Akonadi::Tag &tag) override
397 {
398 if (tag.remoteId().isEmpty()) {
399 changeProcessed();
400 return;
401 }
402
403 AgentBasePrivate::tagRemoved(tag);
404 }
405
406private:
407 static Item::List filterValidItems(Item::List items)
408 {
409 items.erase(std::remove_if(items.begin(),
410 items.end(),
411 [](const auto &item) {
412 return item.remoteId().isEmpty();
413 }),
414 items.end());
415 return items;
416 }
417
418public:
419 // synchronize states
420 Collection currentCollection;
421
422 ResourceScheduler *scheduler = nullptr;
423 ItemSync *mItemSyncer = nullptr;
424 ItemSync::TransactionMode mItemTransactionMode;
425 ItemSync::MergeMode mItemMergeMode;
426 CollectionSync *mCollectionSyncer = nullptr;
427 TagSync *mTagSyncer = nullptr;
428 bool mHierarchicalRid;
429 QTimer mProgressEmissionCompressor;
430 int mUnemittedProgress;
431 QMap<Akonadi::Collection::Id, QVariantMap> mUnemittedAdvancedStatus;
432 bool mAutomaticProgressReporting;
433 bool mDisableAutomaticItemDeliveryDone;
434 QPointer<RecursiveMover> m_recursiveMover;
435 int mItemSyncBatchSize;
436 QSet<QByteArray> mKeepLocalCollectionChanges;
437 KJob *mCurrentCollectionFetchJob = nullptr;
438 bool mScheduleAttributeSyncBeforeCollectionSync;
439 QDateTime mCollectionSyncTimestamp;
440};
441
443 : AgentBase(new ResourceBasePrivate(this), id)
444{
446
447 qDBusRegisterMetaType<QByteArrayList>();
448
449 new Akonadi__ResourceAdaptor(this);
450
451 d->scheduler = new ResourceScheduler(this);
452
453 d->mChangeRecorder->setChangeRecordingEnabled(true);
454 d->mChangeRecorder->setCollectionMoveTranslationEnabled(false); // we deal with this ourselves
455 connect(d->mChangeRecorder, &ChangeRecorder::changesAdded, d->scheduler, &ResourceScheduler::scheduleChangeReplay);
456
457 d->mChangeRecorder->setResourceMonitored(d->mId.toLatin1());
458 d->mChangeRecorder->fetchCollection(true);
459
460 connect(d->scheduler, &ResourceScheduler::executeFullSync, this, &ResourceBase::retrieveCollections);
461 connect(d->scheduler, &ResourceScheduler::executeCollectionTreeSync, this, &ResourceBase::retrieveCollections);
462 connect(d->scheduler, &ResourceScheduler::executeCollectionSync, d, &ResourceBasePrivate::slotSynchronizeCollection);
463 connect(d->scheduler, &ResourceScheduler::executeCollectionAttributesSync, d, &ResourceBasePrivate::slotSynchronizeCollectionAttributes);
464 connect(d->scheduler, &ResourceScheduler::executeTagSync, d, &ResourceBasePrivate::slotSynchronizeTags);
465 connect(d->scheduler, &ResourceScheduler::executeItemFetch, d, &ResourceBasePrivate::slotPrepareItemRetrieval);
466 connect(d->scheduler, &ResourceScheduler::executeItemsFetch, d, &ResourceBasePrivate::slotPrepareItemsRetrieval);
467 connect(d->scheduler, &ResourceScheduler::executeResourceCollectionDeletion, d, &ResourceBasePrivate::slotDeleteResourceCollection);
468 connect(d->scheduler, &ResourceScheduler::executeCacheInvalidation, d, &ResourceBasePrivate::slotInvalidateCache);
469 connect(d->scheduler, &ResourceScheduler::status, this, qOverload<int, const QString &>(&ResourceBase::status));
470 connect(d->scheduler, &ResourceScheduler::executeChangeReplay, d->mChangeRecorder, &ChangeRecorder::replayNext);
471 connect(d->scheduler, &ResourceScheduler::executeRecursiveMoveReplay, d, &ResourceBasePrivate::slotRecursiveMoveReplay);
472 connect(d->scheduler, &ResourceScheduler::fullSyncComplete, this, &ResourceBase::synchronized);
473 connect(d->scheduler, &ResourceScheduler::collectionTreeSyncComplete, this, &ResourceBase::collectionTreeSynchronized);
474 connect(d->mChangeRecorder, &ChangeRecorder::nothingToReplay, d->scheduler, &ResourceScheduler::taskDone);
475 connect(d->mChangeRecorder, &Monitor::collectionRemoved, d->scheduler, &ResourceScheduler::collectionRemoved);
476 connect(this, &ResourceBase::abortRequested, d, &ResourceBasePrivate::slotAbortRequested);
477 connect(this, &ResourceBase::synchronized, d->scheduler, &ResourceScheduler::taskDone);
478 connect(this, &ResourceBase::collectionTreeSynchronized, d->scheduler, &ResourceScheduler::taskDone);
480 connect(&d->mProgressEmissionCompressor, &QTimer::timeout, d, &ResourceBasePrivate::slotDelayedEmitProgress);
481
482 d->scheduler->setOnline(d->mOnline);
483 if (!d->mChangeRecorder->isEmpty()) {
484 d->scheduler->scheduleChangeReplay();
485 }
486
487 new ResourceSelectJob(identifier());
488
489 connect(d->mChangeRecorder->session(), &Session::reconnected, d, &ResourceBasePrivate::slotSessionReconnected);
490}
491
493
495{
496 d_func()->scheduler->scheduleFullSync();
497}
498
503
505{
506 return AgentBase::agentName();
507}
508
513
518
523
528
529QString ResourceBase::parseArguments(int argc, char **argv)
530{
531 Q_UNUSED(argc)
532
533 QCommandLineOption identifierOption(QStringLiteral("identifier"), i18nc("@label command line option", "Resource identifier"), QStringLiteral("argument"));
534 QCommandLineParser parser;
535 parser.addOption(identifierOption);
536 parser.addHelpOption();
537 parser.addVersionOption();
538 parser.process(*qApp);
539 parser.setApplicationDescription(i18n("Akonadi Resource"));
540
541 if (!parser.isSet(identifierOption)) {
542 qCDebug(AKONADIAGENTBASE_LOG) << "Identifier argument missing";
543 exit(1);
544 }
545
546 const QString identifier = parser.value(identifierOption);
547
548 if (identifier.isEmpty()) {
549 qCDebug(AKONADIAGENTBASE_LOG) << "Identifier is empty";
550 exit(1);
551 }
552
554 QCoreApplication::setApplicationVersion(QStringLiteral(AKONADI_FULL_VERSION));
555
556 const QFileInfo fi(QString::fromLocal8Bit(argv[0]));
557 // strip off full path and possible .exe suffix
558 const QString catalog = fi.baseName();
559
560 auto translator = new QTranslator(qApp);
561 translator->load(catalog);
563
564 return identifier;
565}
566
568{
569 KLocalizedString::setApplicationDomain(QByteArrayLiteral("libakonadi6"));
570 KAboutData::setApplicationData(r.aboutData());
571
573
574 return qApp->exec();
575}
576
577void ResourceBasePrivate::slotAbortRequested()
578{
579 Q_Q(ResourceBase);
580
581 scheduler->cancelQueues();
582 q->abortActivity();
583}
584
586{
588 Q_ASSERT(d->scheduler->currentTask().type == ResourceScheduler::FetchItem);
589 if (!item.isValid()) {
590 d->scheduler->itemFetchDone(i18nc("@info", "Invalid item retrieved"));
591 return;
592 }
593
594 const QSet<QByteArray> requestedParts = d->scheduler->currentTask().itemParts;
595 for (const QByteArray &part : requestedParts) {
596 if (!item.loadedPayloadParts().contains(part)) {
597 qCWarning(AKONADIAGENTBASE_LOG) << "Item does not provide part" << part;
598 }
599 }
600
601 auto job = new ItemModifyJob(item);
602 job->d_func()->setSilent(true);
603 // FIXME: remove once the item with which we call retrieveItem() has a revision number
604 job->disableRevisionCheck();
605 connect(job, &KJob::result, d, &ResourceBasePrivate::slotDeliveryDone);
606}
607
608void ResourceBasePrivate::slotDeliveryDone(KJob *job)
609{
610 Q_Q(ResourceBase);
611 Q_ASSERT(scheduler->currentTask().type == ResourceScheduler::FetchItem);
612 if (job->error()) {
613 Q_EMIT q->error(i18nc("@info", "Error while creating item: %1", job->errorString()));
614 }
615 scheduler->itemFetchDone(QString());
616}
617
619{
621 Q_ASSERT(d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionAttributes);
622 if (!collection.isValid()) {
623 Q_EMIT attributesSynchronized(d->scheduler->currentTask().collection.id());
624 d->scheduler->taskDone();
625 return;
626 }
627
628 auto job = new CollectionModifyJob(collection);
629 connect(job, &KJob::result, d, &ResourceBasePrivate::slotCollectionAttributesSyncDone);
630}
631
632void ResourceBasePrivate::slotCollectionAttributesSyncDone(KJob *job)
633{
634 Q_Q(ResourceBase);
635 Q_ASSERT(scheduler->currentTask().type == ResourceScheduler::SyncCollectionAttributes);
636 if (job->error()) {
637 Q_EMIT q->error(i18nc("@info", "Error while updating collection: %1", job->errorString()));
638 }
639 Q_EMIT q->attributesSynchronized(scheduler->currentTask().collection.id());
640 scheduler->taskDone();
641}
642
643void ResourceBasePrivate::slotDeleteResourceCollection()
644{
645 Q_Q(ResourceBase);
646
648 job->fetchScope().setResource(q->identifier());
649 connect(job, &KJob::result, this, &ResourceBasePrivate::slotDeleteResourceCollectionDone);
650}
651
652void ResourceBasePrivate::slotDeleteResourceCollectionDone(KJob *job)
653{
654 Q_Q(ResourceBase);
655 if (job->error()) {
656 Q_EMIT q->error(job->errorString());
657 scheduler->taskDone();
658 } else {
659 const auto fetchJob = static_cast<const CollectionFetchJob *>(job);
660
661 if (!fetchJob->collections().isEmpty()) {
662 auto job = new CollectionDeleteJob(fetchJob->collections().at(0));
663 connect(job, &KJob::result, this, &ResourceBasePrivate::slotCollectionDeletionDone);
664 } else {
665 // there is no resource collection, so just ignore the request
666 scheduler->taskDone();
667 }
668 }
669}
670
671void ResourceBasePrivate::slotCollectionDeletionDone(KJob *job)
672{
673 Q_Q(ResourceBase);
674 if (job->error()) {
675 Q_EMIT q->error(job->errorString());
676 }
677
678 scheduler->taskDone();
679}
680
681void ResourceBasePrivate::slotInvalidateCache(const Akonadi::Collection &collection)
682{
683 Q_Q(ResourceBase);
684 auto job = new InvalidateCacheJob(collection, q);
685 connect(job, &KJob::result, scheduler, &ResourceScheduler::taskDone);
686}
687
689{
690 changesCommitted(Item::List() << item);
691}
692
694{
696 auto transaction = new TransactionSequence(this);
697 connect(transaction, &KJob::finished, d, &ResourceBasePrivate::changeCommittedResult);
698
699 // Modify the items one-by-one, because STORE does not support mass RID change
700 for (const Item &item : items) {
701 auto job = new ItemModifyJob(item, transaction);
702 job->d_func()->setClean();
703 job->disableRevisionCheck(); // TODO: remove, but where/how do we handle the error?
704 job->setIgnorePayload(true); // we only want to reset the dirty flag and update the remote id
705 }
706}
707
709{
711 auto job = new CollectionModifyJob(collection);
712 connect(job, &KJob::result, d, &ResourceBasePrivate::changeCommittedResult);
713}
714
715void ResourceBasePrivate::changeCommittedResult(KJob *job)
716{
717 if (job->error()) {
718 qCWarning(AKONADIAGENTBASE_LOG) << job->errorText();
719 }
720
721 Q_Q(ResourceBase);
722 if (qobject_cast<CollectionModifyJob *>(job)) {
723 if (job->error()) {
724 Q_EMIT q->error(i18nc("@info", "Updating local collection failed: %1.", job->errorText()));
725 }
726 mChangeRecorder->d_ptr->invalidateCache(static_cast<CollectionModifyJob *>(job)->collection());
727 } else {
728 if (job->error()) {
729 Q_EMIT q->error(i18nc("@info", "Updating local items failed: %1.", job->errorText()));
730 }
731 // Item and tag cache is invalidated by modify job
732 }
733
734 changeProcessed();
735}
736
738{
740 auto job = new TagModifyJob(tag);
741 connect(job, &KJob::result, d, &ResourceBasePrivate::changeCommittedResult);
742}
743
744void ResourceBase::requestItemDelivery(const QList<qint64> &uids, const QByteArrayList &parts)
745{
747 if (!isOnline()) {
748 const QString errorMsg = i18nc("@info", "Cannot fetch item in offline mode.");
750 Q_EMIT error(errorMsg);
751 return;
752 }
753
754 setDelayedReply(true);
755
756 const auto items = uids | Views::transform([](const auto uid) {
757 return Item{uid};
758 })
759 | Actions::toQVector;
760
761 const QSet<QByteArray> partSet = QSet<QByteArray>(parts.begin(), parts.end());
762 d->scheduler->scheduleItemsFetch(items, partSet, message());
763}
764
766{
768 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree || d->scheduler->currentTask().type == ResourceScheduler::SyncAll,
769 "ResourceBase::collectionsRetrieved()",
770 "Calling collectionsRetrieved() although no collection retrieval is in progress");
771 if (!d->mCollectionSyncer) {
772 d->mCollectionSyncer = new CollectionSync(identifier());
773 d->mCollectionSyncer->setHierarchicalRemoteIds(d->mHierarchicalRid);
774 d->mCollectionSyncer->setKeepLocalChanges(d->mKeepLocalCollectionChanges);
775 connect(d->mCollectionSyncer, &KJob::percentChanged, d,
776 &ResourceBasePrivate::slotPercent); // NOLINT(google-runtime-int): ulong comes from KJob
777 connect(d->mCollectionSyncer, &KJob::result, d, &ResourceBasePrivate::slotCollectionSyncDone);
778 }
779 d->mCollectionSyncer->setRemoteCollections(collections);
780}
781
782void ResourceBase::collectionsRetrievedIncremental(const Collection::List &changedCollections, const Collection::List &removedCollections)
783{
785 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree || d->scheduler->currentTask().type == ResourceScheduler::SyncAll,
786 "ResourceBase::collectionsRetrievedIncremental()",
787 "Calling collectionsRetrievedIncremental() although no collection retrieval is in progress");
788 if (!d->mCollectionSyncer) {
789 d->mCollectionSyncer = new CollectionSync(identifier());
790 d->mCollectionSyncer->setHierarchicalRemoteIds(d->mHierarchicalRid);
791 d->mCollectionSyncer->setKeepLocalChanges(d->mKeepLocalCollectionChanges);
792 connect(d->mCollectionSyncer, &KJob::percentChanged, d,
793 &ResourceBasePrivate::slotPercent); // NOLINT(google-runtime-int): ulong comes from KJob
794 connect(d->mCollectionSyncer, &KJob::result, d, &ResourceBasePrivate::slotCollectionSyncDone);
795 }
796 d->mCollectionSyncer->setRemoteCollections(changedCollections, removedCollections);
797}
798
800{
802 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree || d->scheduler->currentTask().type == ResourceScheduler::SyncAll,
803 "ResourceBase::setCollectionStreamingEnabled()",
804 "Calling setCollectionStreamingEnabled() although no collection retrieval is in progress");
805 if (!d->mCollectionSyncer) {
806 d->mCollectionSyncer = new CollectionSync(identifier());
807 d->mCollectionSyncer->setHierarchicalRemoteIds(d->mHierarchicalRid);
808 connect(d->mCollectionSyncer, &KJob::percentChanged, d,
809 &ResourceBasePrivate::slotPercent); // NOLINT(google-runtime-int): ulong comes from KJob
810 connect(d->mCollectionSyncer, &KJob::result, d, &ResourceBasePrivate::slotCollectionSyncDone);
811 }
812 d->mCollectionSyncer->setStreamingEnabled(enable);
813}
814
816{
818 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree || d->scheduler->currentTask().type == ResourceScheduler::SyncAll,
819 "ResourceBase::collectionsRetrievalDone()",
820 "Calling collectionsRetrievalDone() although no collection retrieval is in progress");
821 // streaming enabled, so finalize the sync
822 if (d->mCollectionSyncer) {
823 d->mCollectionSyncer->retrievalDone();
824 } else {
825 // user did the sync himself, we are done now
826 // FIXME: we need the same special case for SyncAll as in slotCollectionSyncDone here!
827 d->scheduler->taskDone();
828 }
829}
830
832{
834 d->mKeepLocalCollectionChanges = parts;
835}
836
837void ResourceBasePrivate::slotCollectionSyncDone(KJob *job)
838{
839 Q_Q(ResourceBase);
840 mCollectionSyncer = nullptr;
841 if (job->error()) {
842 if (job->error() != Job::UserCanceled) {
843 Q_EMIT q->error(job->errorString());
844 }
845 } else {
846 if (scheduler->currentTask().type == ResourceScheduler::SyncAll) {
848 list->setFetchScope(q->changeRecorder()->collectionFetchScope());
849 list->fetchScope().fetchAttribute<SpecialCollectionAttribute>();
850 list->fetchScope().fetchAttribute<FavoriteCollectionAttribute>();
851 list->fetchScope().setResource(mId);
852 list->fetchScope().setListFilter(CollectionFetchScope::Sync);
853 connect(list, &KJob::result, this, &ResourceBasePrivate::slotLocalListDone);
854 return;
855 } else if (scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree) {
856 scheduler->scheduleCollectionTreeSyncCompletion();
857 }
858 }
859 scheduler->taskDone();
860}
861
862namespace
863{
864bool sortCollectionsForSync(const Collection &l, const Collection &r)
865{
867 const bool lInbox = (lType == "inbox") || (QStringView(l.remoteId()).mid(1).compare(QLatin1StringView("inbox"), Qt::CaseInsensitive) == 0);
868 const bool lFav = l.hasAttribute<FavoriteCollectionAttribute>();
869
871 const bool rInbox = (rType == "inbox") || (QStringView(r.remoteId()).mid(1).compare(QLatin1StringView("inbox"), Qt::CaseInsensitive) == 0);
872 const bool rFav = r.hasAttribute<FavoriteCollectionAttribute>();
873
874 // inbox is always first
875 if (lInbox) {
876 return true;
877 } else if (rInbox) {
878 return false;
879 }
880
881 // favorites right after inbox
882 if (lFav) {
883 return !rInbox;
884 } else if (rFav) {
885 return lInbox;
886 }
887
888 // trash is always last (unless it's favorite)
889 if (lType == "trash") {
890 return false;
891 } else if (rType == "trash") {
892 return true;
893 }
894
895 // Fallback to sorting by id
896 return l.id() < r.id();
897}
898
899} // namespace
900
901void ResourceBasePrivate::slotLocalListDone(KJob *job)
902{
903 Q_Q(ResourceBase);
904 if (job->error()) {
905 Q_EMIT q->error(job->errorString());
906 } else {
907 Collection::List cols = static_cast<CollectionFetchJob *>(job)->collections();
908 std::sort(cols.begin(), cols.end(), sortCollectionsForSync);
909 for (const Collection &col : std::as_const(cols)) {
910 scheduler->scheduleSync(col);
911 }
912 scheduler->scheduleFullSyncCompletion();
913 }
914 scheduler->taskDone();
915}
916
917void ResourceBasePrivate::slotSynchronizeCollection(const Collection &col)
918{
919 Q_Q(ResourceBase);
920 currentCollection = col;
921 // This can happen due to FetchHelper::triggerOnDemandFetch() in the akonadi server (not an error).
922 if (!col.remoteId().isEmpty()) {
923 // check if this collection actually can contain anything
924 QStringList contentTypes = currentCollection.contentMimeTypes();
925 contentTypes.removeAll(Collection::mimeType());
927 if (!contentTypes.isEmpty() || col.isVirtual()) {
928 if (mAutomaticProgressReporting) {
929 Q_EMIT q->status(AgentBase::Running, i18nc("@info:status", "Syncing folder '%1'", currentCollection.displayName()));
930 }
931
932 qCDebug(AKONADIAGENTBASE_LOG) << "Preparing collection sync of collection" << currentCollection.id() << currentCollection.displayName();
933 auto fetchJob = new Akonadi::CollectionFetchJob(col, CollectionFetchJob::Base, this);
934 fetchJob->setFetchScope(q->changeRecorder()->collectionFetchScope());
935 connect(fetchJob, &KJob::result, this, &ResourceBasePrivate::slotItemRetrievalCollectionFetchDone);
936 mCurrentCollectionFetchJob = fetchJob;
937 return;
938 }
939 }
940 scheduler->taskDone();
941}
942
943void ResourceBasePrivate::slotItemRetrievalCollectionFetchDone(KJob *job)
944{
945 Q_Q(ResourceBase);
946 mCurrentCollectionFetchJob = nullptr;
947 if (job->error()) {
948 qCWarning(AKONADIAGENTBASE_LOG) << "Failed to retrieve collection for sync: " << job->errorString();
949 q->cancelTask(i18n("Failed to retrieve collection for sync."));
950 return;
951 }
952 auto fetchJob = static_cast<Akonadi::CollectionFetchJob *>(job);
953 const Collection::List collections = fetchJob->collections();
954 if (collections.isEmpty()) {
955 qCWarning(AKONADIAGENTBASE_LOG) << "The fetch job returned empty collection set. This is unexpected.";
956 q->cancelTask(i18n("Failed to retrieve collection for sync."));
957 return;
958 }
959 mCollectionSyncTimestamp = QDateTime::currentDateTimeUtc();
960 q->retrieveItems(collections.at(0));
961}
962
964{
965 Q_D(const ResourceBase);
966 return d->mItemSyncBatchSize;
967}
968
970{
972 d->mItemSyncBatchSize = batchSize;
973}
974
976{
978 d->mScheduleAttributeSyncBeforeCollectionSync = enable;
979}
980
981void ResourceBasePrivate::slotSynchronizeCollectionAttributes(const Collection &col)
982{
983 Q_Q(ResourceBase);
984 auto fetchJob = new Akonadi::CollectionFetchJob(col, CollectionFetchJob::Base, this);
985 fetchJob->setFetchScope(q->changeRecorder()->collectionFetchScope());
986 connect(fetchJob, &KJob::result, this, &ResourceBasePrivate::slotAttributeRetrievalCollectionFetchDone);
987 Q_ASSERT(!mCurrentCollectionFetchJob);
988 mCurrentCollectionFetchJob = fetchJob;
989}
990
991void ResourceBasePrivate::slotAttributeRetrievalCollectionFetchDone(KJob *job)
992{
993 mCurrentCollectionFetchJob = nullptr;
994 Q_Q(ResourceBase);
995 if (job->error()) {
996 qCWarning(AKONADIAGENTBASE_LOG) << "Failed to retrieve collection for attribute sync: " << job->errorString();
997 q->cancelTask(i18n("Failed to retrieve collection for attribute sync."));
998 return;
999 }
1000 auto fetchJob = static_cast<Akonadi::CollectionFetchJob *>(job);
1001 // FIXME: Why not call q-> directly?
1002 QMetaObject::invokeMethod(q, "retrieveCollectionAttributes", Q_ARG(Akonadi::Collection, fetchJob->collections().at(0)));
1003}
1004
1005void ResourceBasePrivate::slotSynchronizeTags()
1006{
1007 Q_Q(ResourceBase);
1008 QMetaObject::invokeMethod(this, [q] {
1009 q->retrieveTags();
1010 });
1011}
1012
1013void ResourceBasePrivate::slotPrepareItemRetrieval(const Item &item)
1014{
1015 Q_Q(ResourceBase);
1016 auto fetch = new ItemFetchJob(item, this);
1017 // we always need at least parent so we can use ItemCreateJob to merge
1018 fetch->fetchScope().setAncestorRetrieval(qMax(ItemFetchScope::Parent, q->changeRecorder()->itemFetchScope().ancestorRetrieval()));
1019 fetch->fetchScope().setCacheOnly(true);
1020 fetch->fetchScope().setFetchRemoteIdentification(true);
1021
1022 // copy list of attributes to fetch
1023 const QSet<QByteArray> attributes = q->changeRecorder()->itemFetchScope().attributes();
1024 for (const auto &attribute : attributes) {
1025 fetch->fetchScope().fetchAttribute(attribute);
1026 }
1027
1028 connect(fetch, &KJob::result, this, &ResourceBasePrivate::slotPrepareItemRetrievalResult);
1029}
1030
1031void ResourceBasePrivate::slotPrepareItemRetrievalResult(KJob *job)
1032{
1033 Q_Q(ResourceBase);
1034 Q_ASSERT_X(scheduler->currentTask().type == ResourceScheduler::FetchItem,
1035 "ResourceBasePrivate::slotPrepareItemRetrievalResult()",
1036 "Preparing item retrieval although no item retrieval is in progress");
1037 if (job->error()) {
1038 q->cancelTask(job->errorText());
1039 return;
1040 }
1041 auto fetch = qobject_cast<ItemFetchJob *>(job);
1042 if (fetch->items().count() != 1) {
1043 q->cancelTask(i18n("The requested item no longer exists"));
1044 return;
1045 }
1046 const QSet<QByteArray> parts = scheduler->currentTask().itemParts;
1047 if (!q->retrieveItem(fetch->items().at(0), parts)) {
1048 q->cancelTask();
1049 }
1050}
1051
1052void ResourceBasePrivate::slotPrepareItemsRetrieval(const QList<Item> &items)
1053{
1054 Q_Q(ResourceBase);
1055 auto fetch = new ItemFetchJob(items, this);
1056 // we always need at least parent so we can use ItemCreateJob to merge
1057 fetch->fetchScope().setAncestorRetrieval(qMax(ItemFetchScope::Parent, q->changeRecorder()->itemFetchScope().ancestorRetrieval()));
1058 fetch->fetchScope().setCacheOnly(true);
1059 fetch->fetchScope().setFetchRemoteIdentification(true);
1060 // It's possible that one or more items were removed before this task was
1061 // executed, so ignore it and just handle the rest.
1062 fetch->fetchScope().setIgnoreRetrievalErrors(true);
1063
1064 // copy list of attributes to fetch
1065 const QSet<QByteArray> attributes = q->changeRecorder()->itemFetchScope().attributes();
1066 for (const auto &attribute : attributes) {
1067 fetch->fetchScope().fetchAttribute(attribute);
1068 }
1069
1070 connect(fetch, &KJob::result, this, &ResourceBasePrivate::slotPrepareItemsRetrievalResult);
1071}
1072
1073void ResourceBasePrivate::slotPrepareItemsRetrievalResult(KJob *job)
1074{
1075 Q_Q(ResourceBase);
1076 Q_ASSERT_X(scheduler->currentTask().type == ResourceScheduler::FetchItems,
1077 "ResourceBasePrivate::slotPrepareItemsRetrievalResult()",
1078 "Preparing items retrieval although no items retrieval is in progress");
1079 if (job->error()) {
1080 q->cancelTask(job->errorText());
1081 return;
1082 }
1083 auto fetch = qobject_cast<ItemFetchJob *>(job);
1084 const auto items = fetch->items();
1085 if (items.isEmpty()) {
1086 q->cancelTask();
1087 return;
1088 }
1089
1090 const QSet<QByteArray> parts = scheduler->currentTask().itemParts;
1091 Q_ASSERT(items.first().parentCollection().isValid());
1092 if (!q->retrieveItems(items, parts)) {
1093 q->cancelTask();
1094 }
1095}
1096
1097void ResourceBasePrivate::slotRecursiveMoveReplay(RecursiveMover *mover)
1098{
1099 Q_ASSERT(mover);
1100 Q_ASSERT(!m_recursiveMover);
1101 m_recursiveMover = mover;
1102 connect(mover, &KJob::result, this, &ResourceBasePrivate::slotRecursiveMoveReplayResult);
1103 mover->start();
1104}
1105
1106void ResourceBasePrivate::slotRecursiveMoveReplayResult(KJob *job)
1107{
1108 Q_Q(ResourceBase);
1109 m_recursiveMover = nullptr;
1110
1111 if (job->error()) {
1112 q->deferTask();
1113 return;
1114 }
1115
1116 changeProcessed();
1117}
1118
1120{
1122 // streaming enabled, so finalize the sync
1123 if (d->mItemSyncer) {
1124 d->mItemSyncer->deliveryDone();
1125 } else {
1126 if (d->scheduler->currentTask().type == ResourceScheduler::FetchItems) {
1127 d->scheduler->currentTask().sendDBusReplies(QString());
1128 }
1129 // user did the sync himself, we are done now
1130 d->scheduler->taskDone();
1131 }
1132}
1133
1135{
1137 d->scheduler->scheduleResourceCollectionDeletion();
1138}
1139
1141{
1143 d->scheduler->scheduleCacheInvalidation(collection);
1144}
1145
1147{
1148 Q_D(const ResourceBase);
1149 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::SyncCollection,
1150 "ResourceBase::currentCollection()",
1151 "Trying to access current collection although no item retrieval is in progress");
1152 return d->currentCollection;
1153}
1154
1156{
1157 Q_D(const ResourceBase);
1158 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::FetchItem,
1159 "ResourceBase::currentItem()",
1160 "Trying to access current item although no item retrieval is in progress");
1161 return d->scheduler->currentTask().items[0];
1162}
1163
1165{
1166 Q_D(const ResourceBase);
1167 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::FetchItems,
1168 "ResourceBase::currentItems()",
1169 "Trying to access current items although no items retrieval is in progress");
1170 return d->scheduler->currentTask().items;
1171}
1172
1174{
1175 d_func()->scheduler->scheduleCollectionTreeSync();
1176}
1177
1179{
1180 d_func()->scheduler->scheduleTagSync();
1181}
1182
1184{
1186 if (d->mCurrentCollectionFetchJob) {
1187 d->mCurrentCollectionFetchJob->kill();
1188 d->mCurrentCollectionFetchJob = nullptr;
1189 }
1190 switch (d->scheduler->currentTask().type) {
1191 case ResourceScheduler::FetchItem:
1192 itemRetrieved(Item()); // sends the error reply and
1193 break;
1194 case ResourceScheduler::FetchItems:
1196 break;
1197 case ResourceScheduler::ChangeReplay:
1198 d->changeProcessed();
1199 break;
1200 case ResourceScheduler::SyncCollectionTree:
1201 case ResourceScheduler::SyncAll:
1202 if (d->mCollectionSyncer) {
1203 d->mCollectionSyncer->rollback();
1204 } else {
1205 d->scheduler->taskDone();
1206 }
1207 break;
1208 case ResourceScheduler::SyncCollection:
1209 if (d->mItemSyncer) {
1210 d->mItemSyncer->rollback();
1211 } else {
1212 d->scheduler->taskDone();
1213 }
1214 break;
1215 default:
1216 d->scheduler->taskDone();
1217 }
1218}
1219
1221{
1222 cancelTask();
1223
1224 Q_EMIT error(msg);
1225}
1226
1228{
1230 qCDebug(AKONADIAGENTBASE_LOG) << "Deferring task" << d->scheduler->currentTask();
1231 // Deferring a CollectionSync is just not implemented.
1232 // We'd need to d->mItemSyncer->rollback() but also to NOT call taskDone in slotItemSyncDone() here...
1233 Q_ASSERT(!d->mItemSyncer);
1234 d->scheduler->deferTask();
1235}
1236
1238{
1239 d_func()->scheduler->setOnline(state);
1240}
1241
1243{
1244 synchronizeCollection(collectionId, false);
1245}
1246
1247void ResourceBase::synchronizeCollection(qint64 collectionId, bool recursive)
1248{
1250 auto job = new CollectionFetchJob(Collection(collectionId), recursive ? CollectionFetchJob::Recursive : CollectionFetchJob::Base);
1251 job->setFetchScope(changeRecorder()->collectionFetchScope());
1252 job->fetchScope().setResource(identifier());
1253 job->fetchScope().setListFilter(CollectionFetchScope::Sync);
1254 connect(job, &KJob::result, d, &ResourceBasePrivate::slotCollectionListDone);
1255}
1256
1257void ResourceBasePrivate::slotCollectionListDone(KJob *job)
1258{
1259 if (!job->error()) {
1260 const Collection::List list = static_cast<CollectionFetchJob *>(job)->collections();
1261 for (const Collection &collection : list) {
1262 // We also get collections that should not be synced but are part of the tree.
1263 if (collection.shouldList(Collection::ListSync)) {
1264 if (mScheduleAttributeSyncBeforeCollectionSync) {
1265 scheduler->scheduleAttributesSync(collection);
1266 }
1267 scheduler->scheduleSync(collection);
1268 }
1269 }
1270 } else {
1271 qCWarning(AKONADIAGENTBASE_LOG) << "Failed to fetch collection for collection sync: " << job->errorString();
1272 }
1273}
1274
1276{
1278 d->scheduler->scheduleAttributesSync(col);
1279}
1280
1282{
1284 auto job = new CollectionFetchJob(Collection(collectionId), CollectionFetchJob::Base);
1285 job->setFetchScope(changeRecorder()->collectionFetchScope());
1286 job->fetchScope().setResource(identifier());
1287 connect(job, &KJob::result, d, &ResourceBasePrivate::slotCollectionListForAttributesDone);
1288}
1289
1290void ResourceBasePrivate::slotCollectionListForAttributesDone(KJob *job)
1291{
1292 if (!job->error()) {
1293 const Collection::List list = static_cast<CollectionFetchJob *>(job)->collections();
1294 if (!list.isEmpty()) {
1295 const Collection &col = list.first();
1296 scheduler->scheduleAttributesSync(col);
1297 }
1298 }
1299 // TODO: error handling
1300}
1301
1303{
1304 qCDebug(AKONADIAGENTBASE_LOG) << amount;
1307 if (d->mItemSyncer) {
1308 d->mItemSyncer->setTotalItems(amount);
1309 }
1310}
1311
1313{
1315 if (d->mItemSyncer) {
1316 d->mItemSyncer->setDisableAutomaticDeliveryDone(disable);
1317 }
1318 d->mDisableAutomaticItemDeliveryDone = disable;
1319}
1320
1322{
1324 d->createItemSyncInstanceIfMissing();
1325 if (d->mItemSyncer) {
1326 d->mItemSyncer->setStreamingEnabled(enable);
1327 }
1328}
1329
1331{
1333 if (d->scheduler->currentTask().type == ResourceScheduler::FetchItems) {
1334 auto trx = new TransactionSequence(this);
1335 connect(trx, &KJob::result, d, &ResourceBasePrivate::slotItemSyncDone);
1336 for (const Item &item : items) {
1337 Q_ASSERT(item.parentCollection().isValid());
1338 if (item.isValid()) { // NOLINT(bugprone-branch-clone)
1339 new ItemModifyJob(item, trx);
1340 } else if (!item.remoteId().isEmpty()) {
1341 auto job = new ItemCreateJob(item, item.parentCollection(), trx);
1342 job->setMerge(ItemCreateJob::RID);
1343 } else {
1344 // This should not happen, but just to be sure...
1345 new ItemModifyJob(item, trx);
1346 }
1347 }
1348 trx->commit();
1349 } else {
1350 d->createItemSyncInstanceIfMissing();
1351 if (d->mItemSyncer) {
1352 d->mItemSyncer->setFullSyncItems(items);
1353 }
1354 }
1355}
1356
1357void ResourceBase::itemsRetrievedIncremental(const Item::List &changedItems, const Item::List &removedItems)
1358{
1360 d->createItemSyncInstanceIfMissing();
1361 if (d->mItemSyncer) {
1362 d->mItemSyncer->setIncrementalSyncItems(changedItems, removedItems);
1363 }
1364}
1365
1366void ResourceBasePrivate::slotItemSyncDone(KJob *job)
1367{
1368 mItemSyncer = nullptr;
1369 Q_Q(ResourceBase);
1370 if (job->error() && job->error() != Job::UserCanceled) {
1371 Q_EMIT q->error(job->errorString());
1372 }
1373 if (scheduler->currentTask().type == ResourceScheduler::FetchItems) {
1374 scheduler->currentTask().sendDBusReplies((job->error() && job->error() != Job::UserCanceled) ? job->errorString() : QString());
1375 }
1376 scheduler->taskDone();
1377}
1378
1379void ResourceBasePrivate::slotDelayedEmitProgress()
1380{
1381 Q_Q(ResourceBase);
1382 if (mAutomaticProgressReporting) {
1383 Q_EMIT q->percent(mUnemittedProgress);
1384
1385 for (const QVariantMap &statusMap : std::as_const(mUnemittedAdvancedStatus)) {
1386 Q_EMIT q->advancedStatus(statusMap);
1387 }
1388 }
1389 mUnemittedProgress = 0;
1390 mUnemittedAdvancedStatus.clear();
1391}
1392
1393void ResourceBasePrivate::slotPercent(KJob *job, quint64 percent)
1394{
1395 mUnemittedProgress = static_cast<int>(percent);
1396
1397 const auto collection = job->property("collection").value<Collection>();
1398 if (collection.isValid()) {
1399 QVariantMap statusMap;
1400 statusMap.insert(QStringLiteral("key"), QStringLiteral("collectionSyncProgress"));
1401 statusMap.insert(QStringLiteral("collectionId"), collection.id());
1402 statusMap.insert(QStringLiteral("percent"), static_cast<unsigned int>(percent));
1403
1404 mUnemittedAdvancedStatus[collection.id()] = statusMap;
1405 }
1406 // deliver completion right away, intermediate progress at 1s intervals
1407 if (percent == 100U) {
1408 mProgressEmissionCompressor.stop();
1409 slotDelayedEmitProgress();
1410 } else if (!mProgressEmissionCompressor.isActive()) {
1411 mProgressEmissionCompressor.start();
1412 }
1413}
1414
1416{
1418 d->mHierarchicalRid = enable;
1419}
1420
1421void ResourceBase::scheduleCustomTask(QObject *receiver, const char *method, const QVariant &argument, SchedulePriority priority)
1422{
1424 d->scheduler->scheduleCustomTask(receiver, method, argument, priority);
1425}
1426
1428{
1430 d->scheduler->taskDone();
1431}
1432
1437
1439{
1441 d->scheduler->taskDone();
1442}
1443
1445{
1446 Q_UNUSED(item)
1447 Q_UNUSED(parts)
1448 // retrieveItem() can no longer be pure virtual, because then we could not mark
1449 // it as deprecated (i.e. implementations would still be forced to implement it),
1450 // so instead we assert here.
1451 // NOTE: Don't change to Q_ASSERT_X here: while the macro can be disabled at
1452 // compile time, we want to hit this assert *ALWAYS*.
1453 qt_assert_x("Akonadi::ResourceBase::retrieveItem()",
1454 "The base implementation of retrieveItem() must never be reached. "
1455 "You must implement either retrieveItem() or retrieveItems(Akonadi::Item::List, QSet<QByteArray>) overload "
1456 "to handle item retrieval requests.",
1457 __FILE__,
1458 __LINE__);
1459 return false;
1460}
1461
1463{
1465
1466 // If we reach this implementation of retrieveItems() then it means that the
1467 // resource is still using the deprecated retrieveItem() method, so we explode
1468 // this to a myriad of tasks in scheduler and let them be processed one by one
1469
1470 const qint64 id = d->scheduler->currentTask().serial;
1471 for (const auto &item : items) {
1472 d->scheduler->scheduleItemFetch(item, parts, d->scheduler->currentTask().dbusMsgs, id);
1473 }
1474 taskDone();
1475 return true;
1476}
1477
1481
1483{
1485 d->mItemTransactionMode = mode;
1486}
1487
1488void ResourceBase::setItemMergingMode(ItemSync::MergeMode mode)
1489{
1491 d->mItemMergeMode = mode;
1492}
1493
1495{
1497 d->mAutomaticProgressReporting = enabled;
1498}
1499
1501{
1502 Q_D(const ResourceBase);
1503 return d->dumpNotificationListToString();
1504}
1505
1507{
1508 Q_D(const ResourceBase);
1509 return d->dumpToString();
1510}
1511
1513{
1514 Q_D(const ResourceBase);
1515 d->dumpMemoryInfo();
1516}
1517
1519{
1520 Q_D(const ResourceBase);
1521 return d->dumpMemoryInfoToString();
1522}
1523
1524void ResourceBase::tagsRetrieved(const Tag::List &tags, const QHash<QString, Item::List> &tagMembers)
1525{
1527 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::SyncTags || d->scheduler->currentTask().type == ResourceScheduler::SyncAll
1528 || d->scheduler->currentTask().type == ResourceScheduler::Custom,
1529 "ResourceBase::tagsRetrieved()",
1530 "Calling tagsRetrieved() although no tag retrieval is in progress");
1531 if (!d->mTagSyncer) {
1532 d->mTagSyncer = new TagSync(this);
1533 connect(d->mTagSyncer, &KJob::percentChanged, d,
1534 &ResourceBasePrivate::slotPercent); // NOLINT(google-runtime-int): ulong comes from KJob
1535 connect(d->mTagSyncer, &KJob::result, d, &ResourceBasePrivate::slotTagSyncDone);
1536 }
1537 d->mTagSyncer->setFullTagList(tags);
1538 d->mTagSyncer->setTagMembers(tagMembers);
1539}
1540
1541void ResourceBasePrivate::slotTagSyncDone(KJob *job)
1542{
1543 Q_Q(ResourceBase);
1544 mTagSyncer = nullptr;
1545 if (job->error()) {
1546 if (job->error() != Job::UserCanceled) {
1547 qCWarning(AKONADIAGENTBASE_LOG) << "TagSync failed: " << job->errorString();
1548 Q_EMIT q->error(job->errorString());
1549 }
1550 }
1551
1552 scheduler->taskDone();
1553}
1554
1555#include "moc_resourcebase.cpp"
1556#include "resourcebase.moc"
The base class for all Akonadi agents and resources.
Definition agentbase.h:73
QStringList activities() const
Returns the activities of the agent.
virtual int status() const
This method returns the current status code of the agent.
void setActivities(const QStringList &activities)
This method is used to set the activities of the agent.
void setActivitiesEnabled(bool enabled)
This method is used to enabled the activities of the agent.
void setAgentName(const QString &name)
This method is used to set the name of the agent.
ChangeRecorder * changeRecorder() const
Returns the Akonadi::ChangeRecorder object used for monitoring.
QString agentName() const
Returns the name of the agent.
@ Running
The agent is working on something.
Definition agentbase.h:398
void abortRequested()
Emitted when another application has remotely asked the agent to abort its current operation.
bool isOnline() const
Returns whether the agent is currently online.
bool activitiesEnabled() const
Returns the activities status of the agent.
QString identifier() const
Returns the instance identifier of this agent.
void agentNameChanged(const QString &name)
This signal is emitted whenever the name of the agent has changed.
void error(const QString &message)
This signal shall be used to report errors.
void changesAdded()
Emitted when new changes are recorded.
void replayNext()
Replay the next change notification and erase the previous one from the record.
void nothingToReplay()
Emitted when replayNext() was called, but there was no valid change to replay.
Job that deletes a collection in the Akonadi storage.
Job that fetches collections from the Akonadi storage.
@ Recursive
List all sub-collections.
@ FirstLevel
Only list direct sub-collections of the base collection.
@ Base
Only fetch the base collection.
@ Sync
Only retrieve collections for synchronization, taking the local preference and enabled into account.
Job that modifies a collection in the Akonadi storage.
Represents a collection of PIM items.
Definition collection.h:62
static QString mimeType()
Returns the mimetype used for collections.
@ ListSync
Listing for synchronization.
Definition collection.h:480
bool hasAttribute(const QByteArray &name) const
Returns true if the collection has an attribute of the given type name, false otherwise.
static Collection root()
Returns the root collection.
static QString virtualMimeType()
Returns the mimetype used for virtual collections.
bool shouldList(ListPurpose purpose) const
Returns whether the collection should be listed or not for the specified purpose Takes enabled state ...
Attribute * attribute(const QByteArray &name)
Returns the attribute of the given type name if available, 0 otherwise.
QString remoteId() const
Returns the remote id of the collection.
Job that creates a new item in the Akonadi storage.
@ RID
Merge by remote id.
Job that fetches items from the Akonadi storage.
@ Parent
Only retrieve the immediate parent collection.
Job that modifies an existing item in the Akonadi storage.
Syncs between items known to a client (usually a resource) and the Akonadi storage.
Definition itemsync.h:41
void readyForNextBatch(int remainingBatchSize)
Signals the resource that new items can be delivered.
TransactionMode
Transaction mode used by ItemSync.
Definition itemsync.h:129
void setDisableAutomaticDeliveryDone(bool disable)
Disables the automatic completion of the item sync, based on the number of delivered items.
Definition itemsync.cpp:238
void setBatchSize(int)
Set the batch size.
Definition itemsync.cpp:539
void setMergeMode(MergeMode mergeMode)
Set what merge method should be used for next ItemSync run.
Definition itemsync.cpp:551
void setTransactionMode(TransactionMode mode)
Set the transaction mode to use for this sync.
Definition itemsync.cpp:527
Represents a PIM item stored in Akonadi storage.
Definition item.h:100
QString remoteId() const
Returns the remote id of the item.
Definition item.cpp:73
bool isValid() const
Returns whether the item is valid.
Definition item.cpp:88
QSet< QByteArray > loadedPayloadParts() const
Returns the list of loaded payload parts.
Definition item.cpp:283
@ UserCanceled
The user canceled this job.
Definition job.h:101
void collectionRemoved(const Akonadi::Collection &collection)
This signal is emitted if a monitored collection has been removed from the Akonadi storage.
The base class for all Akonadi resources.
void setScheduleAttributeSyncBeforeItemSync(bool)
Set to true to schedule an attribute sync before every item sync.
void setItemTransactionMode(ItemSync::TransactionMode mode)
Set transaction mode for item sync'ing.
QString dumpNotificationListToString() const
Dump the contents of the current ChangeReplay.
void dumpMemoryInfo() const
Dumps memory usage information to stdout.
void taskDone()
Indicate that the current task is finished.
void invalidateCache(const Collection &collection)
Call this method to invalidate all cached content in collection.
void setActivitiesEnabled(bool enable)
This method enables or not activities support.
void collectionAttributesRetrieved(const Collection &collection)
Call this method from retrieveCollectionAttributes() once the result is available.
bool activitiesEnabled() const
Returns true if activities is enabled.
int itemSyncBatchSize() const
Returns the batch size used during the item sync.
void setItemSyncBatchSize(int batchSize)
Set the batch size used during the item sync.
void setDisableAutomaticItemDeliveryDone(bool disable)
Disables the automatic completion of the item sync, based on the number of delivered items.
static int init(int argc, char **argv)
Use this method in the main function of your resource application to initialize your resource subclas...
void synchronizeCollection(qint64 id)
This method is called whenever the collection with the given id shall be synchronized.
virtual void retrieveTags()
Retrieve all tags from the backend.
void synchronized()
Emitted when a full synchronization has been completed.
virtual void retrieveCollections()=0
Retrieve the collection tree from the remote server and supply it via collectionsRetrieved() or colle...
QString dumpSchedulerToString() const
Dump the state of the scheduler.
QString dumpMemoryInfoToString() const
Returns a string with memory usage information.
SchedulePriority
Describes the scheduling priority of a task that has been queued for execution.
void retrieveNextItemSyncBatch(int remainingBatchSize)
Emitted when the item synchronization processed the current batch and is ready for a new one.
void setKeepLocalCollectionChanges(const QSet< QByteArray > &parts)
Allows to keep locally changed collection parts during the collection sync.
void attributesSynchronized(qlonglong collectionId)
Emitted when a collection attributes synchronization has been completed.
void itemsRetrievalDone()
Call this method to indicate you finished synchronizing the current collection.
QStringList activities() const
return list of activities.
Item::List currentItems() const
Returns the items that are currently retrieved.
void setTotalItems(int amount)
Call this method when you want to use the itemsRetrieved() method in streaming mode and indicate the ...
void setActivities(const QStringList &activities)
This method sets list of activities.
void setAutomaticProgressReporting(bool enabled)
Enable or disable automatic progress reporting.
void collectionsRetrieved(const Collection::List &collections)
Call this to supply the full folder tree retrieved from the remote server.
void deferTask()
Suspends the execution of the current task and tries again to execute it.
void setItemStreamingEnabled(bool enable)
Enable item streaming, which is disabled by default.
QString name() const
Returns the name of the resource.
void collectionsRetrievalDone()
Call this method to indicate you finished synchronizing the collection tree.
void clearCache()
Call this method to remove all items and collections of the resource from the server cache.
void scheduleCustomTask(QObject *receiver, const char *method, const QVariant &argument, SchedulePriority priority=Append)
Schedules a custom task in the internal scheduler.
void collectionsRetrievedIncremental(const Collection::List &changedCollections, const Collection::List &removedCollections)
Call this to supply incrementally retrieved collections from the remote server.
void itemRetrieved(const Item &item)
Call this method from retrieveItem() once the result is available.
virtual AKONADIAGENTBASE_DEPRECATED bool retrieveItem(const Akonadi::Item &item, const QSet< QByteArray > &parts)
Retrieve a single item from the backend.
void synchronize()
This method is called whenever the resource should start synchronize all data.
void changeCommitted(const Item &item)
Resets the dirty flag of the given item and updates the remote id.
void itemsRetrieved(const Item::List &items)
Call this method to supply the full collection listing from the remote server.
void doSetOnline(bool online) override
Inherited from AgentBase.
Collection currentCollection() const
Returns the collection that is currently synchronized.
void nameChanged(const QString &name)
This signal is emitted whenever the name of the resource has changed.
virtual void retrieveItems(const Akonadi::Collection &collection)=0
Retrieve all (new/changed) items in collection collection.
void cancelTask()
Stops the execution of the current task and continues with the next one.
void itemsRetrievedIncremental(const Item::List &changedItems, const Item::List &removedItems)
Call this method to supply incrementally retrieved items from the remote server.
void setItemMergingMode(ItemSync::MergeMode mode)
Set merge mode for item sync'ing.
void setHierarchicalRemoteIdentifiersEnabled(bool enable)
Indicate the use of hierarchical remote identifiers.
ResourceBase(const QString &id)
Creates a base resource.
virtual void retrieveCollectionAttributes(const Akonadi::Collection &collection)
Retrieve the attributes of a single collection from the backend.
void collectionTreeSynchronized()
Emitted when a collection tree synchronization has been completed.
~ResourceBase() override
Destroys the base resource.
void setName(const QString &name)
This method is used to set the name of the resource.
void synchronizeCollectionTree()
Refetches the Collections.
void synchronizeTags()
Refetches Tags.
void setCollectionStreamingEnabled(bool enable)
Enable collection streaming, that is collections don't have to be delivered at once as result of a re...
void changesCommitted(const Item::List &items)
Resets the dirty flag of all given items and updates remote ids.
virtual void abortActivity()
Abort any activity in progress in the backend.
void synchronizeCollectionAttributes(qint64 id)
This method is called whenever the collection with the given id shall have its attributes synchronize...
AKONADIAGENTBASE_DEPRECATED Item currentItem() const
Returns the item that is currently retrieved.
static QString agentServiceName(ServiceAgentType agentType, const QString &identifier)
Returns the namespaced D-Bus service name for an agent of type agentType with agent identifier identi...
static QString addNamespace(const QString &string)
Adds the multi-instance namespace to string if required (with '_' as separator).
void reconnected()
This signal is emitted whenever the session has been reconnected to the server (e....
An Attribute that stores the special collection type of a collection.
QByteArray collectionType() const
Returns the special collections type of the collection.
Job that modifies a tag in the Akonadi storage.
An Akonadi Tag.
Definition tag.h:26
Base class for jobs that need to run a sequence of sub-jobs in a transaction.
static void setApplicationData(const KAboutData &aboutData)
virtual QString errorString() const
int error() const
void result(KJob *job)
void finished(KJob *job)
QString errorText() const
void percentChanged(KJob *job, unsigned long percent)
static void setApplicationDomain(const QByteArray &domain)
QString i18nc(const char *context, const char *text, const TYPE &arg...)
QString i18n(const char *text, const TYPE &arg...)
Helper integration between Akonadi and Qt.
KCRASH_EXPORT void initialize()
KIOCORE_EXPORT QStringList list(const QString &fileClass)
bool isEmpty() const const
QCommandLineOption addHelpOption()
bool addOption(const QCommandLineOption &option)
QCommandLineOption addVersionOption()
bool isSet(const QCommandLineOption &option) const const
void process(const QCoreApplication &app)
void setApplicationDescription(const QString &description)
QString value(const QCommandLineOption &option) const const
void setApplicationName(const QString &application)
void setApplicationVersion(const QString &version)
void exit(int returnCode)
bool installTranslator(QTranslator *translationFile)
QCoreApplication * instance()
QDateTime currentDateTimeUtc()
QDBusError lastError() const const
QDBusConnection sessionBus()
const QDBusMessage & message() const const
void sendErrorReply(QDBusError::ErrorType type, const QString &msg) const const
void setDelayedReply(bool enable) const const
QString message() const const
const_reference at(qsizetype i) const const
iterator begin()
iterator end()
iterator erase(const_iterator begin, const_iterator end)
T & first()
bool isEmpty() const const
qsizetype removeAll(const AT &t)
iterator insert(const Key &key, const T &value)
bool invokeMethod(QObject *context, Functor &&function, FunctorReturnType *ret)
Q_EMITQ_EMIT
QMetaObject::Connection connect(const QObject *sender, PointerToMemberFunction signal, Functor functor)
QVariant property(const char *name) const const
bool setProperty(const char *name, QVariant &&value)
T * data() const const
bool contains(const QSet< T > &other) const const
bool isEmpty() const const
QString fromLocal8Bit(QByteArrayView str)
bool isEmpty() const const
QStringView mid(qsizetype start, qsizetype length) const const
int compare(QChar ch) const const
CaseInsensitive
QFuture< ArgsType< Signal > > connect(Sender *sender, Signal signal)
QThread * currentThread()
void setInterval(int msec)
bool isActive() const const
void setSingleShot(bool singleShot)
void start()
void stop()
void timeout()
QVariant fromValue(T &&value)
T value() const const
Q_D(Todo)
This file is part of the KDE documentation.
Documentation copyright © 1996-2024 The KDE developers.
Generated on Mon Nov 4 2024 16:31:58 by doxygen 1.12.0 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.