Akonadi

resourcebase.cpp
1/*
2 SPDX-FileCopyrightText: 2006 Till Adam <adam@kde.org>
3 SPDX-FileCopyrightText: 2007 Volker Krause <vkrause@kde.org>
4
5 SPDX-License-Identifier: LGPL-2.0-or-later
6*/
7
8#include "resourcebase.h"
9#include "agentbase_p.h"
10
11#include "akonadifull-version.h"
12#include "collectiondeletejob.h"
13#include "collectionsync_p.h"
14#include "resourceadaptor.h"
15#include "resourcescheduler_p.h"
16#include "tagsync.h"
17#include "tracerinterface.h"
18#include <QDBusConnection>
19
20#include "changerecorder.h"
21#include "collectionfetchjob.h"
22#include "collectionfetchscope.h"
23#include "collectionmodifyjob.h"
24#include "favoritecollectionattribute.h"
25#include "invalidatecachejob_p.h"
26#include "itemcreatejob.h"
27#include "itemfetchjob.h"
28#include "itemfetchscope.h"
29#include "itemmodifyjob.h"
30#include "itemmodifyjob_p.h"
31#include "monitor_p.h"
32#include "recursivemover_p.h"
33#include "resourceselectjob_p.h"
34#include "servermanager_p.h"
35#include "session.h"
36#include "specialcollectionattribute.h"
37#include "tagmodifyjob.h"
38
39#include "akonadiagentbase_debug.h"
40
41#include "shared/akranges.h"
42#include <cstdlib>
43#include <iterator>
44
45#include <KAboutData>
46#include <KCrash>
47#include <KLocalizedString>
48
49#include <QApplication>
50#include <QHash>
51#include <QTimer>
52
53using namespace Akonadi;
54using namespace AkRanges;
55using namespace std::chrono_literals;
56class Akonadi::ResourceBasePrivate : public AgentBasePrivate
57{
58 Q_OBJECT
59 Q_CLASSINFO("D-Bus Interface", "org.kde.dfaure")
60
61public:
62 explicit ResourceBasePrivate(ResourceBase *parent)
63 : AgentBasePrivate(parent)
64 , scheduler(nullptr)
65 , mItemSyncer(nullptr)
66 , mItemTransactionMode(ItemSync::SingleTransaction)
67 , mItemMergeMode(ItemSync::RIDMerge)
68 , mCollectionSyncer(nullptr)
69 , mTagSyncer(nullptr)
70 , mHierarchicalRid(false)
71 , mUnemittedProgress(0)
72 , mAutomaticProgressReporting(true)
73 , mDisableAutomaticItemDeliveryDone(false)
74 , mItemSyncBatchSize(10)
75 , mCurrentCollectionFetchJob(nullptr)
76 , mScheduleAttributeSyncBeforeCollectionSync(false)
77 {
78 Internal::setClientType(Internal::Resource);
79 mStatusMessage = defaultReadyMessage();
80 mProgressEmissionCompressor.setInterval(1000);
81 mProgressEmissionCompressor.setSingleShot(true);
82 // HACK: skip local changes of the EntityDisplayAttribute by default. Remove this for KDE5 and adjust resource implementations accordingly.
83 mKeepLocalCollectionChanges << "ENTITYDISPLAY";
84 }
85
86 ~ResourceBasePrivate() override = default;
87
88 Q_DECLARE_PUBLIC(ResourceBase)
89
90 void delayedInit() override
91 {
92 const QString serviceId = ServerManager::agentServiceName(ServerManager::Resource, mId);
93 if (!QDBusConnection::sessionBus().registerService(serviceId)) {
95 if (reason.isEmpty()) {
96 reason = QStringLiteral("this service is probably running already.");
97 }
98 qCCritical(AKONADIAGENTBASE_LOG) << "Unable to register service" << serviceId << "at D-Bus:" << reason;
99
102 }
103 } else {
104 AgentBasePrivate::delayedInit();
105 }
106 }
107
108 void changeProcessed() override
109 {
110 if (m_recursiveMover) {
111 m_recursiveMover->changeProcessed();
112 QTimer::singleShot(0s, m_recursiveMover.data(), &RecursiveMover::replayNext);
113 return;
114 }
115
116 mChangeRecorder->changeProcessed();
117 if (!mChangeRecorder->isEmpty()) {
118 scheduler->scheduleChangeReplay();
119 }
120 scheduler->taskDone();
121 }
122
123 void slotAbortRequested();
124
125 void slotDeliveryDone(KJob *job);
126 void slotCollectionSyncDone(KJob *job);
127 void slotLocalListDone(KJob *job);
128 void slotSynchronizeCollection(const Collection &col);
129 void slotItemRetrievalCollectionFetchDone(KJob *job);
130 void slotCollectionListDone(KJob *job);
131 void slotSynchronizeCollectionAttributes(const Collection &col);
132 void slotCollectionListForAttributesDone(KJob *job);
133 void slotCollectionAttributesSyncDone(KJob *job);
134 void slotSynchronizeTags();
135 void slotAttributeRetrievalCollectionFetchDone(KJob *job);
136
137 void slotItemSyncDone(KJob *job);
138
139 void slotPercent(KJob *job, quint64 percent);
140 void slotDelayedEmitProgress();
141 void slotDeleteResourceCollection();
142 void slotDeleteResourceCollectionDone(KJob *job);
143 void slotCollectionDeletionDone(KJob *job);
144
145 void slotInvalidateCache(const Akonadi::Collection &collection);
146
147 void slotPrepareItemRetrieval(const Akonadi::Item &item);
148 void slotPrepareItemRetrievalResult(KJob *job);
149
150 void slotPrepareItemsRetrieval(const QList<Akonadi::Item> &item);
151 void slotPrepareItemsRetrievalResult(KJob *job);
152
153 void changeCommittedResult(KJob *job);
154
155 void slotRecursiveMoveReplay(RecursiveMover *mover);
156 void slotRecursiveMoveReplayResult(KJob *job);
157
158 void slotTagSyncDone(KJob *job);
159
160 void slotSessionReconnected()
161 {
162 Q_Q(ResourceBase);
163
164 new ResourceSelectJob(q->identifier());
165 }
166
167 void createItemSyncInstanceIfMissing()
168 {
169 Q_Q(ResourceBase);
170 Q_ASSERT_X(scheduler->currentTask().type == ResourceScheduler::SyncCollection,
171 "createItemSyncInstance",
172 "Calling items retrieval methods although no item retrieval is in progress");
173 if (!mItemSyncer) {
174 mItemSyncer = new ItemSync(q->currentCollection(), mCollectionSyncTimestamp);
175 mItemSyncer->setTransactionMode(mItemTransactionMode);
176 mItemSyncer->setBatchSize(mItemSyncBatchSize);
177 mItemSyncer->setMergeMode(mItemMergeMode);
178 mItemSyncer->setDisableAutomaticDeliveryDone(mDisableAutomaticItemDeliveryDone);
179 mItemSyncer->setProperty("collection", QVariant::fromValue(q->currentCollection()));
180 connect(mItemSyncer, &KJob::percentChanged, this,
181 &ResourceBasePrivate::slotPercent); // NOLINT(google-runtime-int): ulong comes from KJob
182
183 connect(mItemSyncer, &KJob::result, this, &ResourceBasePrivate::slotItemSyncDone);
185 }
186 Q_ASSERT(mItemSyncer);
187 }
188
189public Q_SLOTS:
190 // Dump the state of the scheduler
191 Q_SCRIPTABLE QString dumpToString() const
192 {
193 Q_Q(const ResourceBase);
194 return scheduler->dumpToString() + QLatin1Char('\n') + q->dumpResourceToString();
195 }
196
197 Q_SCRIPTABLE void dump()
198 {
199 scheduler->dump();
200 }
201
202 Q_SCRIPTABLE void clear()
203 {
204 scheduler->clear();
205 }
206
207protected Q_SLOTS:
208 // reimplementations from AgentbBasePrivate, containing sanity checks that only apply to resources
209 // such as making sure that RIDs are present as well as translations of cross-resource moves
210 // TODO: we could possibly add recovery code for no-RID notifications by re-enquing those to the change recorder
211 // as the corresponding Add notifications, although that contains a risk of endless fail/retry loops
212
213 void itemAdded(const Akonadi::Item &item, const Akonadi::Collection &collection) override
214 {
215 if (collection.remoteId().isEmpty()) {
216 changeProcessed();
217 return;
218 }
219 AgentBasePrivate::itemAdded(item, collection);
220 }
221
222 void itemChanged(const Akonadi::Item &item, const QSet<QByteArray> &partIdentifiers) override
223 {
224 if (item.remoteId().isEmpty()) {
225 changeProcessed();
226 return;
227 }
228 AgentBasePrivate::itemChanged(item, partIdentifiers);
229 }
230
231 void itemsFlagsChanged(const Akonadi::Item::List &items, const QSet<QByteArray> &addedFlags, const QSet<QByteArray> &removedFlags) override
232 {
233 if (addedFlags.isEmpty() && removedFlags.isEmpty()) {
234 changeProcessed();
235 return;
236 }
237
238 const Item::List validItems = filterValidItems(items);
239 if (validItems.isEmpty()) {
240 changeProcessed();
241 return;
242 }
243
244 AgentBasePrivate::itemsFlagsChanged(validItems, addedFlags, removedFlags);
245 }
246
247 void itemsTagsChanged(const Akonadi::Item::List &items, const QSet<Akonadi::Tag> &addedTags, const QSet<Akonadi::Tag> &removedTags) override
248 {
249 if (addedTags.isEmpty() && removedTags.isEmpty()) {
250 changeProcessed();
251 return;
252 }
253
254 const Item::List validItems = filterValidItems(items);
255 if (validItems.isEmpty()) {
256 changeProcessed();
257 return;
258 }
259
260 AgentBasePrivate::itemsTagsChanged(validItems, addedTags, removedTags);
261 }
262
263 // TODO move the move translation code from AgentBasePrivate here, it's wrong for agents
264 void itemMoved(const Akonadi::Item &item, const Akonadi::Collection &source, const Akonadi::Collection &destination) override
265 {
266 if (item.remoteId().isEmpty() || destination.remoteId().isEmpty() || destination == source) {
267 changeProcessed();
268 return;
269 }
270 AgentBasePrivate::itemMoved(item, source, destination);
271 }
272
273 void itemsMoved(const Akonadi::Item::List &items, const Akonadi::Collection &source, const Akonadi::Collection &destination) override
274 {
275 if (destination.remoteId().isEmpty() || destination == source) {
276 changeProcessed();
277 return;
278 }
279
280 const Item::List validItems = filterValidItems(items);
281 if (validItems.isEmpty()) {
282 changeProcessed();
283 return;
284 }
285
286 AgentBasePrivate::itemsMoved(validItems, source, destination);
287 }
288
289 void itemRemoved(const Akonadi::Item &item) override
290 {
291 if (item.remoteId().isEmpty()) {
292 changeProcessed();
293 return;
294 }
295 AgentBasePrivate::itemRemoved(item);
296 }
297
298 void itemsRemoved(const Akonadi::Item::List &items) override
299 {
300 const Item::List validItems = filterValidItems(items);
301 if (validItems.isEmpty()) {
302 changeProcessed();
303 return;
304 }
305
306 AgentBasePrivate::itemsRemoved(validItems);
307 }
308
309 void collectionAdded(const Akonadi::Collection &collection, const Akonadi::Collection &parent) override
310 {
311 if (parent.remoteId().isEmpty()) {
312 changeProcessed();
313 return;
314 }
315 AgentBasePrivate::collectionAdded(collection, parent);
316 }
317
318 void collectionChanged(const Akonadi::Collection &collection) override
319 {
320 if (collection.remoteId().isEmpty()) {
321 changeProcessed();
322 return;
323 }
324 AgentBasePrivate::collectionChanged(collection);
325 }
326
327 void collectionChanged(const Akonadi::Collection &collection, const QSet<QByteArray> &partIdentifiers) override
328 {
329 if (collection.remoteId().isEmpty()) {
330 changeProcessed();
331 return;
332 }
333 AgentBasePrivate::collectionChanged(collection, partIdentifiers);
334 }
335
336 void collectionMoved(const Akonadi::Collection &collection, const Akonadi::Collection &source, const Akonadi::Collection &destination) override
337 {
338 // unknown destination or source == destination means we can't do/don't have to do anything
339 if (destination.remoteId().isEmpty() || source == destination) {
340 changeProcessed();
341 return;
342 }
343
344 // inter-resource moves, requires we know which resources the source and destination are in though
345 if (!source.resource().isEmpty() && !destination.resource().isEmpty() && source.resource() != destination.resource()) {
346 if (source.resource() == q_ptr->identifier()) { // moved away from us
347 AgentBasePrivate::collectionRemoved(collection);
348 } else if (destination.resource() == q_ptr->identifier()) { // moved to us
349 scheduler->taskDone(); // stop change replay for now
350 auto mover = new RecursiveMover(this);
351 mover->setCollection(collection, destination);
352 scheduler->scheduleMoveReplay(collection, mover);
353 }
354 return;
355 }
356
357 // intra-resource move, requires the moved collection to have a valid id though
358 if (collection.remoteId().isEmpty()) {
359 changeProcessed();
360 return;
361 }
362
363 // intra-resource move, ie. something we can handle internally
364 AgentBasePrivate::collectionMoved(collection, source, destination);
365 }
366
367 void collectionRemoved(const Akonadi::Collection &collection) override
368 {
369 if (collection.remoteId().isEmpty()) {
370 changeProcessed();
371 return;
372 }
373 AgentBasePrivate::collectionRemoved(collection);
374 }
375
376 void tagAdded(const Akonadi::Tag &tag) override
377 {
378 if (!tag.isValid()) {
379 changeProcessed();
380 return;
381 }
382
383 AgentBasePrivate::tagAdded(tag);
384 }
385
386 void tagChanged(const Akonadi::Tag &tag) override
387 {
388 if (tag.remoteId().isEmpty()) {
389 changeProcessed();
390 return;
391 }
392
393 AgentBasePrivate::tagChanged(tag);
394 }
395
396 void tagRemoved(const Akonadi::Tag &tag) override
397 {
398 if (tag.remoteId().isEmpty()) {
399 changeProcessed();
400 return;
401 }
402
403 AgentBasePrivate::tagRemoved(tag);
404 }
405
406private:
407 static Item::List filterValidItems(Item::List items)
408 {
409 items.erase(std::remove_if(items.begin(),
410 items.end(),
411 [](const auto &item) {
412 return item.remoteId().isEmpty();
413 }),
414 items.end());
415 return items;
416 }
417
418public:
419 // synchronize states
420 Collection currentCollection;
421
422 ResourceScheduler *scheduler = nullptr;
423 ItemSync *mItemSyncer = nullptr;
424 ItemSync::TransactionMode mItemTransactionMode;
425 ItemSync::MergeMode mItemMergeMode;
426 CollectionSync *mCollectionSyncer = nullptr;
427 TagSync *mTagSyncer = nullptr;
428 bool mHierarchicalRid;
429 QTimer mProgressEmissionCompressor;
430 int mUnemittedProgress;
431 QMap<Akonadi::Collection::Id, QVariantMap> mUnemittedAdvancedStatus;
432 bool mAutomaticProgressReporting;
433 bool mDisableAutomaticItemDeliveryDone;
434 QPointer<RecursiveMover> m_recursiveMover;
435 int mItemSyncBatchSize;
436 QSet<QByteArray> mKeepLocalCollectionChanges;
437 KJob *mCurrentCollectionFetchJob = nullptr;
438 bool mScheduleAttributeSyncBeforeCollectionSync;
439 QDateTime mCollectionSyncTimestamp;
440};
441
443 : AgentBase(new ResourceBasePrivate(this), id)
444{
446
447 qDBusRegisterMetaType<QByteArrayList>();
448
449 new Akonadi__ResourceAdaptor(this);
450
451 d->scheduler = new ResourceScheduler(this);
452
453 d->mChangeRecorder->setChangeRecordingEnabled(true);
454 d->mChangeRecorder->setCollectionMoveTranslationEnabled(false); // we deal with this ourselves
455 connect(d->mChangeRecorder, &ChangeRecorder::changesAdded, d->scheduler, &ResourceScheduler::scheduleChangeReplay);
456
457 d->mChangeRecorder->setResourceMonitored(d->mId.toLatin1());
458 d->mChangeRecorder->fetchCollection(true);
459
460 connect(d->scheduler, &ResourceScheduler::executeFullSync, this, &ResourceBase::retrieveCollections);
461 connect(d->scheduler, &ResourceScheduler::executeCollectionTreeSync, this, &ResourceBase::retrieveCollections);
462 connect(d->scheduler, &ResourceScheduler::executeCollectionSync, d, &ResourceBasePrivate::slotSynchronizeCollection);
463 connect(d->scheduler, &ResourceScheduler::executeCollectionAttributesSync, d, &ResourceBasePrivate::slotSynchronizeCollectionAttributes);
464 connect(d->scheduler, &ResourceScheduler::executeTagSync, d, &ResourceBasePrivate::slotSynchronizeTags);
465 connect(d->scheduler, &ResourceScheduler::executeItemFetch, d, &ResourceBasePrivate::slotPrepareItemRetrieval);
466 connect(d->scheduler, &ResourceScheduler::executeItemsFetch, d, &ResourceBasePrivate::slotPrepareItemsRetrieval);
467 connect(d->scheduler, &ResourceScheduler::executeResourceCollectionDeletion, d, &ResourceBasePrivate::slotDeleteResourceCollection);
468 connect(d->scheduler, &ResourceScheduler::executeCacheInvalidation, d, &ResourceBasePrivate::slotInvalidateCache);
469 connect(d->scheduler, &ResourceScheduler::status, this, qOverload<int, const QString &>(&ResourceBase::status));
470 connect(d->scheduler, &ResourceScheduler::executeChangeReplay, d->mChangeRecorder, &ChangeRecorder::replayNext);
471 connect(d->scheduler, &ResourceScheduler::executeRecursiveMoveReplay, d, &ResourceBasePrivate::slotRecursiveMoveReplay);
472 connect(d->scheduler, &ResourceScheduler::fullSyncComplete, this, &ResourceBase::synchronized);
473 connect(d->scheduler, &ResourceScheduler::collectionTreeSyncComplete, this, &ResourceBase::collectionTreeSynchronized);
474 connect(d->mChangeRecorder, &ChangeRecorder::nothingToReplay, d->scheduler, &ResourceScheduler::taskDone);
475 connect(d->mChangeRecorder, &Monitor::collectionRemoved, d->scheduler, &ResourceScheduler::collectionRemoved);
476 connect(this, &ResourceBase::abortRequested, d, &ResourceBasePrivate::slotAbortRequested);
477 connect(this, &ResourceBase::synchronized, d->scheduler, &ResourceScheduler::taskDone);
478 connect(this, &ResourceBase::collectionTreeSynchronized, d->scheduler, &ResourceScheduler::taskDone);
480 connect(&d->mProgressEmissionCompressor, &QTimer::timeout, d, &ResourceBasePrivate::slotDelayedEmitProgress);
481
482 d->scheduler->setOnline(d->mOnline);
483 if (!d->mChangeRecorder->isEmpty()) {
484 d->scheduler->scheduleChangeReplay();
485 }
486
487 new ResourceSelectJob(identifier());
488
489 connect(d->mChangeRecorder->session(), &Session::reconnected, d, &ResourceBasePrivate::slotSessionReconnected);
490}
491
493
495{
496 d_func()->scheduler->scheduleFullSync();
497}
498
503
505{
506 return AgentBase::agentName();
507}
508
513
518
523
528
529QString ResourceBase::parseArguments(int argc, char **argv)
530{
531 Q_UNUSED(argc)
532
533 QCommandLineOption identifierOption(QStringLiteral("identifier"), i18nc("@label command line option", "Resource identifier"), QStringLiteral("argument"));
534 QCommandLineParser parser;
535 parser.addOption(identifierOption);
536 parser.addHelpOption();
537 parser.addVersionOption();
538 parser.process(*qApp);
539 parser.setApplicationDescription(i18n("Akonadi Resource"));
540
541 if (!parser.isSet(identifierOption)) {
542 qCDebug(AKONADIAGENTBASE_LOG) << "Identifier argument missing";
543 exit(1);
544 }
545
546 const QString identifier = parser.value(identifierOption);
547
548 if (identifier.isEmpty()) {
549 qCDebug(AKONADIAGENTBASE_LOG) << "Identifier is empty";
550 exit(1);
551 }
552
554 QCoreApplication::setApplicationVersion(QStringLiteral(AKONADI_FULL_VERSION));
555
556 const QFileInfo fi(QString::fromLocal8Bit(argv[0]));
557 // strip off full path and possible .exe suffix
558 const QString catalog = fi.baseName();
559
560 auto translator = new QTranslator(qApp);
561 translator->load(catalog);
563
564 return identifier;
565}
566
567void ResourceBasePrivate::slotAbortRequested()
568{
569 Q_Q(ResourceBase);
570
571 scheduler->cancelQueues();
572 q->abortActivity();
573}
574
576{
578 Q_ASSERT(d->scheduler->currentTask().type == ResourceScheduler::FetchItem);
579 if (!item.isValid()) {
580 d->scheduler->itemFetchDone(i18nc("@info", "Invalid item retrieved"));
581 return;
582 }
583
584 const QSet<QByteArray> requestedParts = d->scheduler->currentTask().itemParts;
585 for (const QByteArray &part : requestedParts) {
586 if (!item.loadedPayloadParts().contains(part)) {
587 qCWarning(AKONADIAGENTBASE_LOG) << "Item does not provide part" << part;
588 }
589 }
590
591 auto job = new ItemModifyJob(item);
592 job->d_func()->setSilent(true);
593 // FIXME: remove once the item with which we call retrieveItem() has a revision number
594 job->disableRevisionCheck();
595 connect(job, &KJob::result, d, &ResourceBasePrivate::slotDeliveryDone);
596}
597
598void ResourceBasePrivate::slotDeliveryDone(KJob *job)
599{
600 Q_Q(ResourceBase);
601 Q_ASSERT(scheduler->currentTask().type == ResourceScheduler::FetchItem);
602 if (job->error()) {
603 Q_EMIT q->error(i18nc("@info", "Error while creating item: %1", job->errorString()));
604 }
605 scheduler->itemFetchDone(QString());
606}
607
609{
611 Q_ASSERT(d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionAttributes);
612 if (!collection.isValid()) {
613 Q_EMIT attributesSynchronized(d->scheduler->currentTask().collection.id());
614 d->scheduler->taskDone();
615 return;
616 }
617
618 auto job = new CollectionModifyJob(collection);
619 connect(job, &KJob::result, d, &ResourceBasePrivate::slotCollectionAttributesSyncDone);
620}
621
622void ResourceBasePrivate::slotCollectionAttributesSyncDone(KJob *job)
623{
624 Q_Q(ResourceBase);
625 Q_ASSERT(scheduler->currentTask().type == ResourceScheduler::SyncCollectionAttributes);
626 if (job->error()) {
627 Q_EMIT q->error(i18nc("@info", "Error while updating collection: %1", job->errorString()));
628 }
629 Q_EMIT q->attributesSynchronized(scheduler->currentTask().collection.id());
630 scheduler->taskDone();
631}
632
633void ResourceBasePrivate::slotDeleteResourceCollection()
634{
635 Q_Q(ResourceBase);
636
638 job->fetchScope().setResource(q->identifier());
639 connect(job, &KJob::result, this, &ResourceBasePrivate::slotDeleteResourceCollectionDone);
640}
641
642void ResourceBasePrivate::slotDeleteResourceCollectionDone(KJob *job)
643{
644 Q_Q(ResourceBase);
645 if (job->error()) {
646 Q_EMIT q->error(job->errorString());
647 scheduler->taskDone();
648 } else {
649 const auto fetchJob = static_cast<const CollectionFetchJob *>(job);
650
651 if (!fetchJob->collections().isEmpty()) {
652 auto job = new CollectionDeleteJob(fetchJob->collections().at(0));
653 connect(job, &KJob::result, this, &ResourceBasePrivate::slotCollectionDeletionDone);
654 } else {
655 // there is no resource collection, so just ignore the request
656 scheduler->taskDone();
657 }
658 }
659}
660
661void ResourceBasePrivate::slotCollectionDeletionDone(KJob *job)
662{
663 Q_Q(ResourceBase);
664 if (job->error()) {
665 Q_EMIT q->error(job->errorString());
666 }
667
668 scheduler->taskDone();
669}
670
671void ResourceBasePrivate::slotInvalidateCache(const Akonadi::Collection &collection)
672{
673 Q_Q(ResourceBase);
674 auto job = new InvalidateCacheJob(collection, q);
675 connect(job, &KJob::result, scheduler, &ResourceScheduler::taskDone);
676}
677
679{
680 changesCommitted(Item::List() << item);
681}
682
684{
686 auto transaction = new TransactionSequence(this);
687 connect(transaction, &KJob::finished, d, &ResourceBasePrivate::changeCommittedResult);
688
689 // Modify the items one-by-one, because STORE does not support mass RID change
690 for (const Item &item : items) {
691 auto job = new ItemModifyJob(item, transaction);
692 job->d_func()->setClean();
693 job->disableRevisionCheck(); // TODO: remove, but where/how do we handle the error?
694 job->setIgnorePayload(true); // we only want to reset the dirty flag and update the remote id
695 }
696}
697
699{
701 auto job = new CollectionModifyJob(collection);
702 connect(job, &KJob::result, d, &ResourceBasePrivate::changeCommittedResult);
703}
704
705void ResourceBasePrivate::changeCommittedResult(KJob *job)
706{
707 if (job->error()) {
708 qCWarning(AKONADIAGENTBASE_LOG) << job->errorText();
709 }
710
711 Q_Q(ResourceBase);
712 if (qobject_cast<CollectionModifyJob *>(job)) {
713 if (job->error()) {
714 Q_EMIT q->error(i18nc("@info", "Updating local collection failed: %1.", job->errorText()));
715 }
716 mChangeRecorder->d_ptr->invalidateCache(static_cast<CollectionModifyJob *>(job)->collection());
717 } else {
718 if (job->error()) {
719 Q_EMIT q->error(i18nc("@info", "Updating local items failed: %1.", job->errorText()));
720 }
721 // Item and tag cache is invalidated by modify job
722 }
723
724 changeProcessed();
725}
726
728{
730 auto job = new TagModifyJob(tag);
731 connect(job, &KJob::result, d, &ResourceBasePrivate::changeCommittedResult);
732}
733
734void ResourceBase::requestItemDelivery(const QList<qint64> &uids, const QByteArrayList &parts)
735{
737 if (!isOnline()) {
738 const QString errorMsg = i18nc("@info", "Cannot fetch item in offline mode.");
740 Q_EMIT error(errorMsg);
741 return;
742 }
743
744 setDelayedReply(true);
745
746 const auto items = uids | Views::transform([](const auto uid) {
747 return Item{uid};
748 })
749 | Actions::toQVector;
750
751 const QSet<QByteArray> partSet = QSet<QByteArray>(parts.begin(), parts.end());
752 d->scheduler->scheduleItemsFetch(items, partSet, message());
753}
754
756{
758 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree || d->scheduler->currentTask().type == ResourceScheduler::SyncAll,
759 "ResourceBase::collectionsRetrieved()",
760 "Calling collectionsRetrieved() although no collection retrieval is in progress");
761 if (!d->mCollectionSyncer) {
762 d->mCollectionSyncer = new CollectionSync(identifier());
763 d->mCollectionSyncer->setHierarchicalRemoteIds(d->mHierarchicalRid);
764 d->mCollectionSyncer->setKeepLocalChanges(d->mKeepLocalCollectionChanges);
765 connect(d->mCollectionSyncer, &KJob::percentChanged, d,
766 &ResourceBasePrivate::slotPercent); // NOLINT(google-runtime-int): ulong comes from KJob
767 connect(d->mCollectionSyncer, &KJob::result, d, &ResourceBasePrivate::slotCollectionSyncDone);
768 }
769 d->mCollectionSyncer->setRemoteCollections(collections);
770}
771
772void ResourceBase::collectionsRetrievedIncremental(const Collection::List &changedCollections, const Collection::List &removedCollections)
773{
775 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree || d->scheduler->currentTask().type == ResourceScheduler::SyncAll,
776 "ResourceBase::collectionsRetrievedIncremental()",
777 "Calling collectionsRetrievedIncremental() although no collection retrieval is in progress");
778 if (!d->mCollectionSyncer) {
779 d->mCollectionSyncer = new CollectionSync(identifier());
780 d->mCollectionSyncer->setHierarchicalRemoteIds(d->mHierarchicalRid);
781 d->mCollectionSyncer->setKeepLocalChanges(d->mKeepLocalCollectionChanges);
782 connect(d->mCollectionSyncer, &KJob::percentChanged, d,
783 &ResourceBasePrivate::slotPercent); // NOLINT(google-runtime-int): ulong comes from KJob
784 connect(d->mCollectionSyncer, &KJob::result, d, &ResourceBasePrivate::slotCollectionSyncDone);
785 }
786 d->mCollectionSyncer->setRemoteCollections(changedCollections, removedCollections);
787}
788
790{
792 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree || d->scheduler->currentTask().type == ResourceScheduler::SyncAll,
793 "ResourceBase::setCollectionStreamingEnabled()",
794 "Calling setCollectionStreamingEnabled() although no collection retrieval is in progress");
795 if (!d->mCollectionSyncer) {
796 d->mCollectionSyncer = new CollectionSync(identifier());
797 d->mCollectionSyncer->setHierarchicalRemoteIds(d->mHierarchicalRid);
798 connect(d->mCollectionSyncer, &KJob::percentChanged, d,
799 &ResourceBasePrivate::slotPercent); // NOLINT(google-runtime-int): ulong comes from KJob
800 connect(d->mCollectionSyncer, &KJob::result, d, &ResourceBasePrivate::slotCollectionSyncDone);
801 }
802 d->mCollectionSyncer->setStreamingEnabled(enable);
803}
804
806{
808 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree || d->scheduler->currentTask().type == ResourceScheduler::SyncAll,
809 "ResourceBase::collectionsRetrievalDone()",
810 "Calling collectionsRetrievalDone() although no collection retrieval is in progress");
811 // streaming enabled, so finalize the sync
812 if (d->mCollectionSyncer) {
813 d->mCollectionSyncer->retrievalDone();
814 } else {
815 // user did the sync himself, we are done now
816 // FIXME: we need the same special case for SyncAll as in slotCollectionSyncDone here!
817 d->scheduler->taskDone();
818 }
819}
820
822{
824 d->mKeepLocalCollectionChanges = parts;
825}
826
827void ResourceBasePrivate::slotCollectionSyncDone(KJob *job)
828{
829 Q_Q(ResourceBase);
830 mCollectionSyncer = nullptr;
831 if (job->error()) {
832 if (job->error() != Job::UserCanceled) {
833 Q_EMIT q->error(job->errorString());
834 }
835 } else {
836 if (scheduler->currentTask().type == ResourceScheduler::SyncAll) {
838 list->setFetchScope(q->changeRecorder()->collectionFetchScope());
839 list->fetchScope().fetchAttribute<SpecialCollectionAttribute>();
840 list->fetchScope().fetchAttribute<FavoriteCollectionAttribute>();
841 list->fetchScope().setResource(mId);
842 list->fetchScope().setListFilter(CollectionFetchScope::Sync);
843 connect(list, &KJob::result, this, &ResourceBasePrivate::slotLocalListDone);
844 return;
845 } else if (scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree) {
846 scheduler->scheduleCollectionTreeSyncCompletion();
847 }
848 }
849 scheduler->taskDone();
850}
851
852namespace
853{
854bool sortCollectionsForSync(const Collection &l, const Collection &r)
855{
857 const bool lInbox = (lType == "inbox") || (QStringView(l.remoteId()).mid(1).compare(QLatin1StringView("inbox"), Qt::CaseInsensitive) == 0);
858 const bool lFav = l.hasAttribute<FavoriteCollectionAttribute>();
859
861 const bool rInbox = (rType == "inbox") || (QStringView(r.remoteId()).mid(1).compare(QLatin1StringView("inbox"), Qt::CaseInsensitive) == 0);
862 const bool rFav = r.hasAttribute<FavoriteCollectionAttribute>();
863
864 // inbox is always first
865 if (lInbox) {
866 return true;
867 } else if (rInbox) {
868 return false;
869 }
870
871 // favorites right after inbox
872 if (lFav) {
873 return !rInbox;
874 } else if (rFav) {
875 return lInbox;
876 }
877
878 // trash is always last (unless it's favorite)
879 if (lType == "trash") {
880 return false;
881 } else if (rType == "trash") {
882 return true;
883 }
884
885 // Fallback to sorting by id
886 return l.id() < r.id();
887}
888
889} // namespace
890
891void ResourceBasePrivate::slotLocalListDone(KJob *job)
892{
893 Q_Q(ResourceBase);
894 if (job->error()) {
895 Q_EMIT q->error(job->errorString());
896 } else {
897 Collection::List cols = static_cast<CollectionFetchJob *>(job)->collections();
898 std::sort(cols.begin(), cols.end(), sortCollectionsForSync);
899 for (const Collection &col : std::as_const(cols)) {
900 scheduler->scheduleSync(col);
901 }
902 scheduler->scheduleFullSyncCompletion();
903 }
904 scheduler->taskDone();
905}
906
907void ResourceBasePrivate::slotSynchronizeCollection(const Collection &col)
908{
909 Q_Q(ResourceBase);
910 currentCollection = col;
911 // This can happen due to FetchHelper::triggerOnDemandFetch() in the akonadi server (not an error).
912 if (!col.remoteId().isEmpty()) {
913 // check if this collection actually can contain anything
914 QStringList contentTypes = currentCollection.contentMimeTypes();
915 contentTypes.removeAll(Collection::mimeType());
917 if (!contentTypes.isEmpty() || col.isVirtual()) {
918 if (mAutomaticProgressReporting) {
919 Q_EMIT q->status(AgentBase::Running, i18nc("@info:status", "Syncing folder '%1'", currentCollection.displayName()));
920 }
921
922 qCDebug(AKONADIAGENTBASE_LOG) << "Preparing collection sync of collection" << currentCollection.id() << currentCollection.displayName();
923 auto fetchJob = new Akonadi::CollectionFetchJob(col, CollectionFetchJob::Base, this);
924 fetchJob->setFetchScope(q->changeRecorder()->collectionFetchScope());
925 connect(fetchJob, &KJob::result, this, &ResourceBasePrivate::slotItemRetrievalCollectionFetchDone);
926 mCurrentCollectionFetchJob = fetchJob;
927 return;
928 }
929 }
930 scheduler->taskDone();
931}
932
933void ResourceBasePrivate::slotItemRetrievalCollectionFetchDone(KJob *job)
934{
935 Q_Q(ResourceBase);
936 mCurrentCollectionFetchJob = nullptr;
937 if (job->error()) {
938 qCWarning(AKONADIAGENTBASE_LOG) << "Failed to retrieve collection for sync: " << job->errorString();
939 q->cancelTask(i18n("Failed to retrieve collection for sync."));
940 return;
941 }
942 auto fetchJob = static_cast<Akonadi::CollectionFetchJob *>(job);
943 const Collection::List collections = fetchJob->collections();
944 if (collections.isEmpty()) {
945 qCWarning(AKONADIAGENTBASE_LOG) << "The fetch job returned empty collection set. This is unexpected.";
946 q->cancelTask(i18n("Failed to retrieve collection for sync."));
947 return;
948 }
949 mCollectionSyncTimestamp = QDateTime::currentDateTimeUtc();
950 q->retrieveItems(collections.at(0));
951}
952
954{
955 Q_D(const ResourceBase);
956 return d->mItemSyncBatchSize;
957}
958
960{
962 d->mItemSyncBatchSize = batchSize;
963}
964
966{
968 d->mScheduleAttributeSyncBeforeCollectionSync = enable;
969}
970
971void ResourceBasePrivate::slotSynchronizeCollectionAttributes(const Collection &col)
972{
973 Q_Q(ResourceBase);
974 auto fetchJob = new Akonadi::CollectionFetchJob(col, CollectionFetchJob::Base, this);
975 fetchJob->setFetchScope(q->changeRecorder()->collectionFetchScope());
976 connect(fetchJob, &KJob::result, this, &ResourceBasePrivate::slotAttributeRetrievalCollectionFetchDone);
977 Q_ASSERT(!mCurrentCollectionFetchJob);
978 mCurrentCollectionFetchJob = fetchJob;
979}
980
981void ResourceBasePrivate::slotAttributeRetrievalCollectionFetchDone(KJob *job)
982{
983 mCurrentCollectionFetchJob = nullptr;
984 Q_Q(ResourceBase);
985 if (job->error()) {
986 qCWarning(AKONADIAGENTBASE_LOG) << "Failed to retrieve collection for attribute sync: " << job->errorString();
987 q->cancelTask(i18n("Failed to retrieve collection for attribute sync."));
988 return;
989 }
990 auto fetchJob = static_cast<Akonadi::CollectionFetchJob *>(job);
991 // FIXME: Why not call q-> directly?
992 QMetaObject::invokeMethod(q, "retrieveCollectionAttributes", Q_ARG(Akonadi::Collection, fetchJob->collections().at(0)));
993}
994
995void ResourceBasePrivate::slotSynchronizeTags()
996{
997 Q_Q(ResourceBase);
998 QMetaObject::invokeMethod(this, [q] {
999 q->retrieveTags();
1000 });
1001}
1002
1003void ResourceBasePrivate::slotPrepareItemRetrieval(const Item &item)
1004{
1005 Q_Q(ResourceBase);
1006 auto fetch = new ItemFetchJob(item, this);
1007 // we always need at least parent so we can use ItemCreateJob to merge
1008 fetch->fetchScope().setAncestorRetrieval(qMax(ItemFetchScope::Parent, q->changeRecorder()->itemFetchScope().ancestorRetrieval()));
1009 fetch->fetchScope().setCacheOnly(true);
1010 fetch->fetchScope().setFetchRemoteIdentification(true);
1011
1012 // copy list of attributes to fetch
1013 const QSet<QByteArray> attributes = q->changeRecorder()->itemFetchScope().attributes();
1014 for (const auto &attribute : attributes) {
1015 fetch->fetchScope().fetchAttribute(attribute);
1016 }
1017
1018 connect(fetch, &KJob::result, this, &ResourceBasePrivate::slotPrepareItemRetrievalResult);
1019}
1020
1021void ResourceBasePrivate::slotPrepareItemRetrievalResult(KJob *job)
1022{
1023 Q_Q(ResourceBase);
1024 Q_ASSERT_X(scheduler->currentTask().type == ResourceScheduler::FetchItem,
1025 "ResourceBasePrivate::slotPrepareItemRetrievalResult()",
1026 "Preparing item retrieval although no item retrieval is in progress");
1027 if (job->error()) {
1028 q->cancelTask(job->errorText());
1029 return;
1030 }
1031 auto fetch = qobject_cast<ItemFetchJob *>(job);
1032 if (fetch->items().count() != 1) {
1033 q->cancelTask(i18n("The requested item no longer exists"));
1034 return;
1035 }
1036 const QSet<QByteArray> parts = scheduler->currentTask().itemParts;
1037 if (!q->retrieveItem(fetch->items().at(0), parts)) {
1038 q->cancelTask();
1039 }
1040}
1041
1042void ResourceBasePrivate::slotPrepareItemsRetrieval(const QList<Item> &items)
1043{
1044 Q_Q(ResourceBase);
1045 auto fetch = new ItemFetchJob(items, this);
1046 // we always need at least parent so we can use ItemCreateJob to merge
1047 fetch->fetchScope().setAncestorRetrieval(qMax(ItemFetchScope::Parent, q->changeRecorder()->itemFetchScope().ancestorRetrieval()));
1048 fetch->fetchScope().setCacheOnly(true);
1049 fetch->fetchScope().setFetchRemoteIdentification(true);
1050 // It's possible that one or more items were removed before this task was
1051 // executed, so ignore it and just handle the rest.
1052 fetch->fetchScope().setIgnoreRetrievalErrors(true);
1053
1054 // copy list of attributes to fetch
1055 const QSet<QByteArray> attributes = q->changeRecorder()->itemFetchScope().attributes();
1056 for (const auto &attribute : attributes) {
1057 fetch->fetchScope().fetchAttribute(attribute);
1058 }
1059
1060 connect(fetch, &KJob::result, this, &ResourceBasePrivate::slotPrepareItemsRetrievalResult);
1061}
1062
1063void ResourceBasePrivate::slotPrepareItemsRetrievalResult(KJob *job)
1064{
1065 Q_Q(ResourceBase);
1066 Q_ASSERT_X(scheduler->currentTask().type == ResourceScheduler::FetchItems,
1067 "ResourceBasePrivate::slotPrepareItemsRetrievalResult()",
1068 "Preparing items retrieval although no items retrieval is in progress");
1069 if (job->error()) {
1070 q->cancelTask(job->errorText());
1071 return;
1072 }
1073 auto fetch = qobject_cast<ItemFetchJob *>(job);
1074 const auto items = fetch->items();
1075 if (items.isEmpty()) {
1076 q->cancelTask();
1077 return;
1078 }
1079
1080 const QSet<QByteArray> parts = scheduler->currentTask().itemParts;
1081 Q_ASSERT(items.first().parentCollection().isValid());
1082 if (!q->retrieveItems(items, parts)) {
1083 q->cancelTask();
1084 }
1085}
1086
1087void ResourceBasePrivate::slotRecursiveMoveReplay(RecursiveMover *mover)
1088{
1089 Q_ASSERT(mover);
1090 Q_ASSERT(!m_recursiveMover);
1091 m_recursiveMover = mover;
1092 connect(mover, &KJob::result, this, &ResourceBasePrivate::slotRecursiveMoveReplayResult);
1093 mover->start();
1094}
1095
1096void ResourceBasePrivate::slotRecursiveMoveReplayResult(KJob *job)
1097{
1098 Q_Q(ResourceBase);
1099 m_recursiveMover = nullptr;
1100
1101 if (job->error()) {
1102 q->deferTask();
1103 return;
1104 }
1105
1106 changeProcessed();
1107}
1108
1110{
1112 // streaming enabled, so finalize the sync
1113 if (d->mItemSyncer) {
1114 d->mItemSyncer->deliveryDone();
1115 } else {
1116 if (d->scheduler->currentTask().type == ResourceScheduler::FetchItems) {
1117 d->scheduler->currentTask().sendDBusReplies(QString());
1118 }
1119 // user did the sync himself, we are done now
1120 d->scheduler->taskDone();
1121 }
1122}
1123
1125{
1127 d->scheduler->scheduleResourceCollectionDeletion();
1128}
1129
1131{
1133 d->scheduler->scheduleCacheInvalidation(collection);
1134}
1135
1137{
1138 Q_D(const ResourceBase);
1139 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::SyncCollection,
1140 "ResourceBase::currentCollection()",
1141 "Trying to access current collection although no item retrieval is in progress");
1142 return d->currentCollection;
1143}
1144
1146{
1147 Q_D(const ResourceBase);
1148 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::FetchItem,
1149 "ResourceBase::currentItem()",
1150 "Trying to access current item although no item retrieval is in progress");
1151 return d->scheduler->currentTask().items[0];
1152}
1153
1155{
1156 Q_D(const ResourceBase);
1157 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::FetchItems,
1158 "ResourceBase::currentItems()",
1159 "Trying to access current items although no items retrieval is in progress");
1160 return d->scheduler->currentTask().items;
1161}
1162
1164{
1165 d_func()->scheduler->scheduleCollectionTreeSync();
1166}
1167
1169{
1170 d_func()->scheduler->scheduleTagSync();
1171}
1172
1174{
1176 if (d->mCurrentCollectionFetchJob) {
1177 d->mCurrentCollectionFetchJob->kill();
1178 d->mCurrentCollectionFetchJob = nullptr;
1179 }
1180 switch (d->scheduler->currentTask().type) {
1181 case ResourceScheduler::FetchItem:
1182 itemRetrieved(Item()); // sends the error reply and
1183 break;
1184 case ResourceScheduler::FetchItems:
1186 break;
1187 case ResourceScheduler::ChangeReplay:
1188 d->changeProcessed();
1189 break;
1190 case ResourceScheduler::SyncCollectionTree:
1191 case ResourceScheduler::SyncAll:
1192 if (d->mCollectionSyncer) {
1193 d->mCollectionSyncer->rollback();
1194 } else {
1195 d->scheduler->taskDone();
1196 }
1197 break;
1198 case ResourceScheduler::SyncCollection:
1199 if (d->mItemSyncer) {
1200 d->mItemSyncer->rollback();
1201 } else {
1202 d->scheduler->taskDone();
1203 }
1204 break;
1205 default:
1206 d->scheduler->taskDone();
1207 }
1208}
1209
1211{
1212 cancelTask();
1213
1214 Q_EMIT error(msg);
1215}
1216
1218{
1220 qCDebug(AKONADIAGENTBASE_LOG) << "Deferring task" << d->scheduler->currentTask();
1221 // Deferring a CollectionSync is just not implemented.
1222 // We'd need to d->mItemSyncer->rollback() but also to NOT call taskDone in slotItemSyncDone() here...
1223 Q_ASSERT(!d->mItemSyncer);
1224 d->scheduler->deferTask();
1225}
1226
1228{
1229 d_func()->scheduler->setOnline(state);
1230}
1231
1233{
1234 synchronizeCollection(collectionId, false);
1235}
1236
1237void ResourceBase::synchronizeCollection(qint64 collectionId, bool recursive)
1238{
1240 auto job = new CollectionFetchJob(Collection(collectionId), recursive ? CollectionFetchJob::Recursive : CollectionFetchJob::Base);
1241 job->setFetchScope(changeRecorder()->collectionFetchScope());
1242 job->fetchScope().setResource(identifier());
1243 job->fetchScope().setListFilter(CollectionFetchScope::Sync);
1244 connect(job, &KJob::result, d, &ResourceBasePrivate::slotCollectionListDone);
1245}
1246
1247void ResourceBasePrivate::slotCollectionListDone(KJob *job)
1248{
1249 if (!job->error()) {
1250 const Collection::List list = static_cast<CollectionFetchJob *>(job)->collections();
1251 for (const Collection &collection : list) {
1252 // We also get collections that should not be synced but are part of the tree.
1253 if (collection.shouldList(Collection::ListSync)) {
1254 if (mScheduleAttributeSyncBeforeCollectionSync) {
1255 scheduler->scheduleAttributesSync(collection);
1256 }
1257 scheduler->scheduleSync(collection);
1258 }
1259 }
1260 } else {
1261 qCWarning(AKONADIAGENTBASE_LOG) << "Failed to fetch collection for collection sync: " << job->errorString();
1262 }
1263}
1264
1266{
1268 d->scheduler->scheduleAttributesSync(col);
1269}
1270
1272{
1274 auto job = new CollectionFetchJob(Collection(collectionId), CollectionFetchJob::Base);
1275 job->setFetchScope(changeRecorder()->collectionFetchScope());
1276 job->fetchScope().setResource(identifier());
1277 connect(job, &KJob::result, d, &ResourceBasePrivate::slotCollectionListForAttributesDone);
1278}
1279
1280void ResourceBasePrivate::slotCollectionListForAttributesDone(KJob *job)
1281{
1282 if (!job->error()) {
1283 const Collection::List list = static_cast<CollectionFetchJob *>(job)->collections();
1284 if (!list.isEmpty()) {
1285 const Collection &col = list.first();
1286 scheduler->scheduleAttributesSync(col);
1287 }
1288 }
1289 // TODO: error handling
1290}
1291
1293{
1294 qCDebug(AKONADIAGENTBASE_LOG) << amount;
1297 if (d->mItemSyncer) {
1298 d->mItemSyncer->setTotalItems(amount);
1299 }
1300}
1301
1303{
1305 if (d->mItemSyncer) {
1306 d->mItemSyncer->setDisableAutomaticDeliveryDone(disable);
1307 }
1308 d->mDisableAutomaticItemDeliveryDone = disable;
1309}
1310
1312{
1314 d->createItemSyncInstanceIfMissing();
1315 if (d->mItemSyncer) {
1316 d->mItemSyncer->setStreamingEnabled(enable);
1317 }
1318}
1319
1321{
1323 if (d->scheduler->currentTask().type == ResourceScheduler::FetchItems) {
1324 auto trx = new TransactionSequence(this);
1325 connect(trx, &KJob::result, d, &ResourceBasePrivate::slotItemSyncDone);
1326 for (const Item &item : items) {
1327 Q_ASSERT(item.parentCollection().isValid());
1328 if (item.isValid()) { // NOLINT(bugprone-branch-clone)
1329 new ItemModifyJob(item, trx);
1330 } else if (!item.remoteId().isEmpty()) {
1331 auto job = new ItemCreateJob(item, item.parentCollection(), trx);
1332 job->setMerge(ItemCreateJob::RID);
1333 } else {
1334 // This should not happen, but just to be sure...
1335 new ItemModifyJob(item, trx);
1336 }
1337 }
1338 trx->commit();
1339 } else {
1340 d->createItemSyncInstanceIfMissing();
1341 if (d->mItemSyncer) {
1342 d->mItemSyncer->setFullSyncItems(items);
1343 }
1344 }
1345}
1346
1347void ResourceBase::itemsRetrievedIncremental(const Item::List &changedItems, const Item::List &removedItems)
1348{
1350 d->createItemSyncInstanceIfMissing();
1351 if (d->mItemSyncer) {
1352 d->mItemSyncer->setIncrementalSyncItems(changedItems, removedItems);
1353 }
1354}
1355
1356void ResourceBasePrivate::slotItemSyncDone(KJob *job)
1357{
1358 mItemSyncer = nullptr;
1359 Q_Q(ResourceBase);
1360 if (job->error() && job->error() != Job::UserCanceled) {
1361 Q_EMIT q->error(job->errorString());
1362 }
1363 if (scheduler->currentTask().type == ResourceScheduler::FetchItems) {
1364 scheduler->currentTask().sendDBusReplies((job->error() && job->error() != Job::UserCanceled) ? job->errorString() : QString());
1365 }
1366 scheduler->taskDone();
1367}
1368
1369void ResourceBasePrivate::slotDelayedEmitProgress()
1370{
1371 Q_Q(ResourceBase);
1372 if (mAutomaticProgressReporting) {
1373 Q_EMIT q->percent(mUnemittedProgress);
1374
1375 for (const QVariantMap &statusMap : std::as_const(mUnemittedAdvancedStatus)) {
1376 Q_EMIT q->advancedStatus(statusMap);
1377 }
1378 }
1379 mUnemittedProgress = 0;
1380 mUnemittedAdvancedStatus.clear();
1381}
1382
1383void ResourceBasePrivate::slotPercent(KJob *job, quint64 percent)
1384{
1385 mUnemittedProgress = static_cast<int>(percent);
1386
1387 const auto collection = job->property("collection").value<Collection>();
1388 if (collection.isValid()) {
1389 QVariantMap statusMap;
1390 statusMap.insert(QStringLiteral("key"), QStringLiteral("collectionSyncProgress"));
1391 statusMap.insert(QStringLiteral("collectionId"), collection.id());
1392 statusMap.insert(QStringLiteral("percent"), static_cast<unsigned int>(percent));
1393
1394 mUnemittedAdvancedStatus[collection.id()] = statusMap;
1395 }
1396 // deliver completion right away, intermediate progress at 1s intervals
1397 if (percent == 100U) {
1398 mProgressEmissionCompressor.stop();
1399 slotDelayedEmitProgress();
1400 } else if (!mProgressEmissionCompressor.isActive()) {
1401 mProgressEmissionCompressor.start();
1402 }
1403}
1404
1406{
1408 d->mHierarchicalRid = enable;
1409}
1410
1411void ResourceBase::scheduleCustomTask(QObject *receiver, const char *method, const QVariant &argument, SchedulePriority priority)
1412{
1414 d->scheduler->scheduleCustomTask(receiver, method, argument, priority);
1415}
1416
1418{
1420 d->scheduler->taskDone();
1421}
1422
1427
1429{
1431 d->scheduler->taskDone();
1432}
1433
1435{
1436 Q_UNUSED(item)
1437 Q_UNUSED(parts)
1438 // retrieveItem() can no longer be pure virtual, because then we could not mark
1439 // it as deprecated (i.e. implementations would still be forced to implement it),
1440 // so instead we assert here.
1441 // NOTE: Don't change to Q_ASSERT_X here: while the macro can be disabled at
1442 // compile time, we want to hit this assert *ALWAYS*.
1443 qt_assert_x("Akonadi::ResourceBase::retrieveItem()",
1444 "The base implementation of retrieveItem() must never be reached. "
1445 "You must implement either retrieveItem() or retrieveItems(Akonadi::Item::List, QSet<QByteArray>) overload "
1446 "to handle item retrieval requests.",
1447 __FILE__,
1448 __LINE__);
1449 return false;
1450}
1451
1453{
1455
1456 // If we reach this implementation of retrieveItems() then it means that the
1457 // resource is still using the deprecated retrieveItem() method, so we explode
1458 // this to a myriad of tasks in scheduler and let them be processed one by one
1459
1460 const qint64 id = d->scheduler->currentTask().serial;
1461 for (const auto &item : items) {
1462 d->scheduler->scheduleItemFetch(item, parts, d->scheduler->currentTask().dbusMsgs, id);
1463 }
1464 taskDone();
1465 return true;
1466}
1467
1471
1473{
1475 d->mItemTransactionMode = mode;
1476}
1477
1478void ResourceBase::setItemMergingMode(ItemSync::MergeMode mode)
1479{
1481 d->mItemMergeMode = mode;
1482}
1483
1485{
1487 d->mAutomaticProgressReporting = enabled;
1488}
1489
1491{
1492 Q_D(const ResourceBase);
1493 return d->dumpNotificationListToString();
1494}
1495
1497{
1498 Q_D(const ResourceBase);
1499 return d->dumpToString();
1500}
1501
1503{
1504 Q_D(const ResourceBase);
1505 d->dumpMemoryInfo();
1506}
1507
1509{
1510 Q_D(const ResourceBase);
1511 return d->dumpMemoryInfoToString();
1512}
1513
1514void ResourceBase::tagsRetrieved(const Tag::List &tags, const QHash<QString, Item::List> &tagMembers)
1515{
1517 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::SyncTags || d->scheduler->currentTask().type == ResourceScheduler::SyncAll
1518 || d->scheduler->currentTask().type == ResourceScheduler::Custom,
1519 "ResourceBase::tagsRetrieved()",
1520 "Calling tagsRetrieved() although no tag retrieval is in progress");
1521 if (!d->mTagSyncer) {
1522 d->mTagSyncer = new TagSync(this);
1523 connect(d->mTagSyncer, &KJob::percentChanged, d,
1524 &ResourceBasePrivate::slotPercent); // NOLINT(google-runtime-int): ulong comes from KJob
1525 connect(d->mTagSyncer, &KJob::result, d, &ResourceBasePrivate::slotTagSyncDone);
1526 }
1527 d->mTagSyncer->setFullTagList(tags);
1528 d->mTagSyncer->setTagMembers(tagMembers);
1529}
1530
1531void ResourceBasePrivate::slotTagSyncDone(KJob *job)
1532{
1533 Q_Q(ResourceBase);
1534 mTagSyncer = nullptr;
1535 if (job->error()) {
1536 if (job->error() != Job::UserCanceled) {
1537 qCWarning(AKONADIAGENTBASE_LOG) << "TagSync failed: " << job->errorString();
1538 Q_EMIT q->error(job->errorString());
1539 }
1540 }
1541
1542 scheduler->taskDone();
1543}
1544
1545#include "moc_resourcebase.cpp"
1546#include "resourcebase.moc"
The base class for all Akonadi agents and resources.
Definition agentbase.h:73
QStringList activities() const
Returns the activities of the agent.
virtual int status() const
This method returns the current status code of the agent.
void setActivities(const QStringList &activities)
This method is used to set the activities of the agent.
void setActivitiesEnabled(bool enabled)
This method is used to enabled the activities of the agent.
void setAgentName(const QString &name)
This method is used to set the name of the agent.
ChangeRecorder * changeRecorder() const
Returns the Akonadi::ChangeRecorder object used for monitoring.
QString agentName() const
Returns the name of the agent.
@ Running
The agent is working on something.
Definition agentbase.h:398
void abortRequested()
Emitted when another application has remotely asked the agent to abort its current operation.
bool isOnline() const
Returns whether the agent is currently online.
bool activitiesEnabled() const
Returns the activities status of the agent.
QString identifier() const
Returns the instance identifier of this agent.
void agentNameChanged(const QString &name)
This signal is emitted whenever the name of the agent has changed.
void error(const QString &message)
This signal shall be used to report errors.
void changesAdded()
Emitted when new changes are recorded.
void replayNext()
Replay the next change notification and erase the previous one from the record.
void nothingToReplay()
Emitted when replayNext() was called, but there was no valid change to replay.
Job that deletes a collection in the Akonadi storage.
Job that fetches collections from the Akonadi storage.
@ Recursive
List all sub-collections.
@ FirstLevel
Only list direct sub-collections of the base collection.
@ Base
Only fetch the base collection.
@ Sync
Only retrieve collections for synchronization, taking the local preference and enabled into account.
Job that modifies a collection in the Akonadi storage.
Represents a collection of PIM items.
Definition collection.h:62
static QString mimeType()
Returns the mimetype used for collections.
@ ListSync
Listing for synchronization.
Definition collection.h:480
bool hasAttribute(const QByteArray &name) const
Returns true if the collection has an attribute of the given type name, false otherwise.
static Collection root()
Returns the root collection.
static QString virtualMimeType()
Returns the mimetype used for virtual collections.
bool shouldList(ListPurpose purpose) const
Returns whether the collection should be listed or not for the specified purpose Takes enabled state ...
Attribute * attribute(const QByteArray &name)
Returns the attribute of the given type name if available, 0 otherwise.
QString remoteId() const
Returns the remote id of the collection.
Job that creates a new item in the Akonadi storage.
@ RID
Merge by remote id.
Job that fetches items from the Akonadi storage.
@ Parent
Only retrieve the immediate parent collection.
Job that modifies an existing item in the Akonadi storage.
Syncs between items known to a client (usually a resource) and the Akonadi storage.
Definition itemsync.h:41
void readyForNextBatch(int remainingBatchSize)
Signals the resource that new items can be delivered.
TransactionMode
Transaction mode used by ItemSync.
Definition itemsync.h:129
void setDisableAutomaticDeliveryDone(bool disable)
Disables the automatic completion of the item sync, based on the number of delivered items.
Definition itemsync.cpp:238
void setBatchSize(int)
Set the batch size.
Definition itemsync.cpp:539
void setMergeMode(MergeMode mergeMode)
Set what merge method should be used for next ItemSync run.
Definition itemsync.cpp:551
void setTransactionMode(TransactionMode mode)
Set the transaction mode to use for this sync.
Definition itemsync.cpp:527
Represents a PIM item stored in Akonadi storage.
Definition item.h:100
QString remoteId() const
Returns the remote id of the item.
Definition item.cpp:73
bool isValid() const
Returns whether the item is valid.
Definition item.cpp:88
QSet< QByteArray > loadedPayloadParts() const
Returns the list of loaded payload parts.
Definition item.cpp:283
@ UserCanceled
The user canceled this job.
Definition job.h:101
void collectionRemoved(const Akonadi::Collection &collection)
This signal is emitted if a monitored collection has been removed from the Akonadi storage.
The base class for all Akonadi resources.
void setScheduleAttributeSyncBeforeItemSync(bool)
Set to true to schedule an attribute sync before every item sync.
void setItemTransactionMode(ItemSync::TransactionMode mode)
Set transaction mode for item sync'ing.
QString dumpNotificationListToString() const
Dump the contents of the current ChangeReplay.
void dumpMemoryInfo() const
Dumps memory usage information to stdout.
void taskDone()
Indicate that the current task is finished.
void invalidateCache(const Collection &collection)
Call this method to invalidate all cached content in collection.
void setActivitiesEnabled(bool enable)
This method enables or not activities support.
void collectionAttributesRetrieved(const Collection &collection)
Call this method from retrieveCollectionAttributes() once the result is available.
bool activitiesEnabled() const
Returns true if activities is enabled.
int itemSyncBatchSize() const
Returns the batch size used during the item sync.
void setItemSyncBatchSize(int batchSize)
Set the batch size used during the item sync.
void setDisableAutomaticItemDeliveryDone(bool disable)
Disables the automatic completion of the item sync, based on the number of delivered items.
void synchronizeCollection(qint64 id)
This method is called whenever the collection with the given id shall be synchronized.
virtual void retrieveTags()
Retrieve all tags from the backend.
void synchronized()
Emitted when a full synchronization has been completed.
virtual void retrieveCollections()=0
Retrieve the collection tree from the remote server and supply it via collectionsRetrieved() or colle...
QString dumpSchedulerToString() const
Dump the state of the scheduler.
QString dumpMemoryInfoToString() const
Returns a string with memory usage information.
SchedulePriority
Describes the scheduling priority of a task that has been queued for execution.
void retrieveNextItemSyncBatch(int remainingBatchSize)
Emitted when the item synchronization processed the current batch and is ready for a new one.
void setKeepLocalCollectionChanges(const QSet< QByteArray > &parts)
Allows to keep locally changed collection parts during the collection sync.
void attributesSynchronized(qlonglong collectionId)
Emitted when a collection attributes synchronization has been completed.
void itemsRetrievalDone()
Call this method to indicate you finished synchronizing the current collection.
QStringList activities() const
return list of activities.
Item::List currentItems() const
Returns the items that are currently retrieved.
void setTotalItems(int amount)
Call this method when you want to use the itemsRetrieved() method in streaming mode and indicate the ...
void setActivities(const QStringList &activities)
This method sets list of activities.
void setAutomaticProgressReporting(bool enabled)
Enable or disable automatic progress reporting.
void collectionsRetrieved(const Collection::List &collections)
Call this to supply the full folder tree retrieved from the remote server.
void deferTask()
Suspends the execution of the current task and tries again to execute it.
void setItemStreamingEnabled(bool enable)
Enable item streaming, which is disabled by default.
QString name() const
Returns the name of the resource.
void collectionsRetrievalDone()
Call this method to indicate you finished synchronizing the collection tree.
void clearCache()
Call this method to remove all items and collections of the resource from the server cache.
void scheduleCustomTask(QObject *receiver, const char *method, const QVariant &argument, SchedulePriority priority=Append)
Schedules a custom task in the internal scheduler.
void collectionsRetrievedIncremental(const Collection::List &changedCollections, const Collection::List &removedCollections)
Call this to supply incrementally retrieved collections from the remote server.
void itemRetrieved(const Item &item)
Call this method from retrieveItem() once the result is available.
virtual AKONADIAGENTBASE_DEPRECATED bool retrieveItem(const Akonadi::Item &item, const QSet< QByteArray > &parts)
Retrieve a single item from the backend.
void synchronize()
This method is called whenever the resource should start synchronize all data.
void changeCommitted(const Item &item)
Resets the dirty flag of the given item and updates the remote id.
void itemsRetrieved(const Item::List &items)
Call this method to supply the full collection listing from the remote server.
void doSetOnline(bool online) override
Inherited from AgentBase.
Collection currentCollection() const
Returns the collection that is currently synchronized.
void nameChanged(const QString &name)
This signal is emitted whenever the name of the resource has changed.
virtual void retrieveItems(const Akonadi::Collection &collection)=0
Retrieve all (new/changed) items in collection collection.
void cancelTask()
Stops the execution of the current task and continues with the next one.
void itemsRetrievedIncremental(const Item::List &changedItems, const Item::List &removedItems)
Call this method to supply incrementally retrieved items from the remote server.
void setItemMergingMode(ItemSync::MergeMode mode)
Set merge mode for item sync'ing.
void setHierarchicalRemoteIdentifiersEnabled(bool enable)
Indicate the use of hierarchical remote identifiers.
ResourceBase(const QString &id)
Creates a base resource.
virtual void retrieveCollectionAttributes(const Akonadi::Collection &collection)
Retrieve the attributes of a single collection from the backend.
void collectionTreeSynchronized()
Emitted when a collection tree synchronization has been completed.
~ResourceBase() override
Destroys the base resource.
void setName(const QString &name)
This method is used to set the name of the resource.
void synchronizeCollectionTree()
Refetches the Collections.
void synchronizeTags()
Refetches Tags.
void setCollectionStreamingEnabled(bool enable)
Enable collection streaming, that is collections don't have to be delivered at once as result of a re...
void changesCommitted(const Item::List &items)
Resets the dirty flag of all given items and updates remote ids.
virtual void abortActivity()
Abort any activity in progress in the backend.
void synchronizeCollectionAttributes(qint64 id)
This method is called whenever the collection with the given id shall have its attributes synchronize...
AKONADIAGENTBASE_DEPRECATED Item currentItem() const
Returns the item that is currently retrieved.
static QString agentServiceName(ServiceAgentType agentType, const QString &identifier)
Returns the namespaced D-Bus service name for an agent of type agentType with agent identifier identi...
static QString addNamespace(const QString &string)
Adds the multi-instance namespace to string if required (with '_' as separator).
void reconnected()
This signal is emitted whenever the session has been reconnected to the server (e....
An Attribute that stores the special collection type of a collection.
QByteArray collectionType() const
Returns the special collections type of the collection.
Job that modifies a tag in the Akonadi storage.
An Akonadi Tag.
Definition tag.h:26
Base class for jobs that need to run a sequence of sub-jobs in a transaction.
virtual QString errorString() const
int error() const
void result(KJob *job)
void finished(KJob *job)
QString errorText() const
void percentChanged(KJob *job, unsigned long percent)
QString i18nc(const char *context, const char *text, const TYPE &arg...)
QString i18n(const char *text, const TYPE &arg...)
Helper integration between Akonadi and Qt.
KIOCORE_EXPORT QStringList list(const QString &fileClass)
bool isEmpty() const const
QCommandLineOption addHelpOption()
bool addOption(const QCommandLineOption &option)
QCommandLineOption addVersionOption()
bool isSet(const QCommandLineOption &option) const const
void process(const QCoreApplication &app)
void setApplicationDescription(const QString &description)
QString value(const QCommandLineOption &option) const const
void setApplicationName(const QString &application)
void setApplicationVersion(const QString &version)
void exit(int returnCode)
bool installTranslator(QTranslator *translationFile)
QCoreApplication * instance()
QDateTime currentDateTimeUtc()
QDBusError lastError() const const
QDBusConnection sessionBus()
const QDBusMessage & message() const const
void sendErrorReply(QDBusError::ErrorType type, const QString &msg) const const
void setDelayedReply(bool enable) const const
QString message() const const
const_reference at(qsizetype i) const const
iterator begin()
iterator end()
iterator erase(const_iterator begin, const_iterator end)
T & first()
bool isEmpty() const const
qsizetype removeAll(const AT &t)
iterator insert(const Key &key, const T &value)
bool invokeMethod(QObject *context, Functor &&function, FunctorReturnType *ret)
Q_EMITQ_EMIT
QMetaObject::Connection connect(const QObject *sender, PointerToMemberFunction signal, Functor functor)
QVariant property(const char *name) const const
bool setProperty(const char *name, QVariant &&value)
T * data() const const
bool contains(const QSet< T > &other) const const
bool isEmpty() const const
QString fromLocal8Bit(QByteArrayView str)
bool isEmpty() const const
QStringView mid(qsizetype start, qsizetype length) const const
int compare(QChar ch) const const
CaseInsensitive
QFuture< ArgsType< Signal > > connect(Sender *sender, Signal signal)
QThread * currentThread()
void setInterval(int msec)
bool isActive() const const
void setSingleShot(bool singleShot)
void start()
void stop()
void timeout()
QVariant fromValue(T &&value)
T value() const const
Q_D(Todo)
This file is part of the KDE documentation.
Documentation copyright © 1996-2024 The KDE developers.
Generated on Fri Nov 29 2024 11:49:11 by doxygen 1.12.0 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.