Akonadi

resourcebase.cpp
1/*
2 SPDX-FileCopyrightText: 2006 Till Adam <adam@kde.org>
3 SPDX-FileCopyrightText: 2007 Volker Krause <vkrause@kde.org>
4
5 SPDX-License-Identifier: LGPL-2.0-or-later
6*/
7
8#include "resourcebase.h"
9#include "agentbase_p.h"
10
11#include "akonadifull-version.h"
12#include "collectiondeletejob.h"
13#include "collectionsync_p.h"
14#include "resourceadaptor.h"
15#include "resourcescheduler_p.h"
16#include "tagsync.h"
17#include "tracerinterface.h"
18#include <QDBusConnection>
19
20#include "changerecorder.h"
21#include "collectionfetchjob.h"
22#include "collectionfetchscope.h"
23#include "collectionmodifyjob.h"
24#include "favoritecollectionattribute.h"
25#include "invalidatecachejob_p.h"
26#include "itemcreatejob.h"
27#include "itemfetchjob.h"
28#include "itemfetchscope.h"
29#include "itemmodifyjob.h"
30#include "itemmodifyjob_p.h"
31#include "monitor_p.h"
32#include "recursivemover_p.h"
33#include "resourceselectjob_p.h"
34#include "servermanager_p.h"
35#include "session.h"
36#include "specialcollectionattribute.h"
37#include "tagmodifyjob.h"
38
39#include "akonadiagentbase_debug.h"
40
41#include "shared/akranges.h"
42#include <cstdlib>
43#include <iterator>
44
45#include <KAboutData>
46#include <KLocalizedString>
47
48#include <QApplication>
49#include <QHash>
50#include <QTimer>
51
52using namespace Akonadi;
53using namespace AkRanges;
54using namespace std::chrono_literals;
55class Akonadi::ResourceBasePrivate : public AgentBasePrivate
56{
57 Q_OBJECT
58 Q_CLASSINFO("D-Bus Interface", "org.kde.dfaure")
59
60public:
61 explicit ResourceBasePrivate(ResourceBase *parent)
62 : AgentBasePrivate(parent)
63 , scheduler(nullptr)
64 , mItemSyncer(nullptr)
65 , mItemTransactionMode(ItemSync::SingleTransaction)
66 , mItemMergeMode(ItemSync::RIDMerge)
67 , mCollectionSyncer(nullptr)
68 , mTagSyncer(nullptr)
69 , mHierarchicalRid(false)
70 , mUnemittedProgress(0)
71 , mAutomaticProgressReporting(true)
72 , mDisableAutomaticItemDeliveryDone(false)
73 , mItemSyncBatchSize(10)
74 , mCurrentCollectionFetchJob(nullptr)
75 , mScheduleAttributeSyncBeforeCollectionSync(false)
76 {
77 Internal::setClientType(Internal::Resource);
78 mStatusMessage = defaultReadyMessage();
79 mProgressEmissionCompressor.setInterval(1000);
80 mProgressEmissionCompressor.setSingleShot(true);
81 // HACK: skip local changes of the EntityDisplayAttribute by default. Remove this for KDE5 and adjust resource implementations accordingly.
82 mKeepLocalCollectionChanges << "ENTITYDISPLAY";
83 }
84
85 ~ResourceBasePrivate() override = default;
86
87 Q_DECLARE_PUBLIC(ResourceBase)
88
89 void delayedInit() override
90 {
91 const QString serviceId = ServerManager::agentServiceName(ServerManager::Resource, mId);
92 if (!QDBusConnection::sessionBus().registerService(serviceId)) {
94 if (reason.isEmpty()) {
95 reason = QStringLiteral("this service is probably running already.");
96 }
97 qCCritical(AKONADIAGENTBASE_LOG) << "Unable to register service" << serviceId << "at D-Bus:" << reason;
98
101 }
102 } else {
103 AgentBasePrivate::delayedInit();
104 }
105 }
106
107 void changeProcessed() override
108 {
109 if (m_recursiveMover) {
110 m_recursiveMover->changeProcessed();
111 QTimer::singleShot(0s, m_recursiveMover.data(), &RecursiveMover::replayNext);
112 return;
113 }
114
115 mChangeRecorder->changeProcessed();
116 if (!mChangeRecorder->isEmpty()) {
117 scheduler->scheduleChangeReplay();
118 }
119 scheduler->taskDone();
120 }
121
122 void slotAbortRequested();
123
124 void slotDeliveryDone(KJob *job);
125 void slotCollectionSyncDone(KJob *job);
126 void slotLocalListDone(KJob *job);
127 void slotSynchronizeCollection(const Collection &col);
128 void slotItemRetrievalCollectionFetchDone(KJob *job);
129 void slotCollectionListDone(KJob *job);
130 void slotSynchronizeCollectionAttributes(const Collection &col);
131 void slotCollectionListForAttributesDone(KJob *job);
132 void slotCollectionAttributesSyncDone(KJob *job);
133 void slotSynchronizeTags();
134 void slotAttributeRetrievalCollectionFetchDone(KJob *job);
135
136 void slotItemSyncDone(KJob *job);
137
138 void slotPercent(KJob *job, quint64 percent);
139 void slotDelayedEmitProgress();
140 void slotDeleteResourceCollection();
141 void slotDeleteResourceCollectionDone(KJob *job);
142 void slotCollectionDeletionDone(KJob *job);
143
144 void slotInvalidateCache(const Akonadi::Collection &collection);
145
146 void slotPrepareItemRetrieval(const Akonadi::Item &item);
147 void slotPrepareItemRetrievalResult(KJob *job);
148
149 void slotPrepareItemsRetrieval(const QList<Akonadi::Item> &item);
150 void slotPrepareItemsRetrievalResult(KJob *job);
151
152 void changeCommittedResult(KJob *job);
153
154 void slotRecursiveMoveReplay(RecursiveMover *mover);
155 void slotRecursiveMoveReplayResult(KJob *job);
156
157 void slotTagSyncDone(KJob *job);
158
159 void slotSessionReconnected()
160 {
161 Q_Q(ResourceBase);
162
163 new ResourceSelectJob(q->identifier());
164 }
165
166 void createItemSyncInstanceIfMissing()
167 {
168 Q_Q(ResourceBase);
169 Q_ASSERT_X(scheduler->currentTask().type == ResourceScheduler::SyncCollection,
170 "createItemSyncInstance",
171 "Calling items retrieval methods although no item retrieval is in progress");
172 if (!mItemSyncer) {
173 mItemSyncer = new ItemSync(q->currentCollection(), mCollectionSyncTimestamp);
174 mItemSyncer->setTransactionMode(mItemTransactionMode);
175 mItemSyncer->setBatchSize(mItemSyncBatchSize);
176 mItemSyncer->setMergeMode(mItemMergeMode);
177 mItemSyncer->setDisableAutomaticDeliveryDone(mDisableAutomaticItemDeliveryDone);
178 mItemSyncer->setProperty("collection", QVariant::fromValue(q->currentCollection()));
179 connect(mItemSyncer, &KJob::percentChanged, this,
180 &ResourceBasePrivate::slotPercent); // NOLINT(google-runtime-int): ulong comes from KJob
181
182 connect(mItemSyncer, &KJob::result, this, &ResourceBasePrivate::slotItemSyncDone);
184 }
185 Q_ASSERT(mItemSyncer);
186 }
187
188public Q_SLOTS:
189 // Dump the state of the scheduler
190 Q_SCRIPTABLE QString dumpToString() const
191 {
192 Q_Q(const ResourceBase);
193 return scheduler->dumpToString() + QLatin1Char('\n') + q->dumpResourceToString();
194 }
195
196 Q_SCRIPTABLE void dump()
197 {
198 scheduler->dump();
199 }
200
201 Q_SCRIPTABLE void clear()
202 {
203 scheduler->clear();
204 }
205
206protected Q_SLOTS:
207 // reimplementations from AgentbBasePrivate, containing sanity checks that only apply to resources
208 // such as making sure that RIDs are present as well as translations of cross-resource moves
209 // TODO: we could possibly add recovery code for no-RID notifications by re-enquing those to the change recorder
210 // as the corresponding Add notifications, although that contains a risk of endless fail/retry loops
211
212 void itemAdded(const Akonadi::Item &item, const Akonadi::Collection &collection) override
213 {
214 if (collection.remoteId().isEmpty()) {
215 changeProcessed();
216 return;
217 }
218 AgentBasePrivate::itemAdded(item, collection);
219 }
220
221 void itemChanged(const Akonadi::Item &item, const QSet<QByteArray> &partIdentifiers) override
222 {
223 if (item.remoteId().isEmpty()) {
224 changeProcessed();
225 return;
226 }
227 AgentBasePrivate::itemChanged(item, partIdentifiers);
228 }
229
230 void itemsFlagsChanged(const Akonadi::Item::List &items, const QSet<QByteArray> &addedFlags, const QSet<QByteArray> &removedFlags) override
231 {
232 if (addedFlags.isEmpty() && removedFlags.isEmpty()) {
233 changeProcessed();
234 return;
235 }
236
237 const Item::List validItems = filterValidItems(items);
238 if (validItems.isEmpty()) {
239 changeProcessed();
240 return;
241 }
242
243 AgentBasePrivate::itemsFlagsChanged(validItems, addedFlags, removedFlags);
244 }
245
246 void itemsTagsChanged(const Akonadi::Item::List &items, const QSet<Akonadi::Tag> &addedTags, const QSet<Akonadi::Tag> &removedTags) override
247 {
248 if (addedTags.isEmpty() && removedTags.isEmpty()) {
249 changeProcessed();
250 return;
251 }
252
253 const Item::List validItems = filterValidItems(items);
254 if (validItems.isEmpty()) {
255 changeProcessed();
256 return;
257 }
258
259 AgentBasePrivate::itemsTagsChanged(validItems, addedTags, removedTags);
260 }
261
262 // TODO move the move translation code from AgentBasePrivate here, it's wrong for agents
263 void itemMoved(const Akonadi::Item &item, const Akonadi::Collection &source, const Akonadi::Collection &destination) override
264 {
265 if (item.remoteId().isEmpty() || destination.remoteId().isEmpty() || destination == source) {
266 changeProcessed();
267 return;
268 }
269 AgentBasePrivate::itemMoved(item, source, destination);
270 }
271
272 void itemsMoved(const Akonadi::Item::List &items, const Akonadi::Collection &source, const Akonadi::Collection &destination) override
273 {
274 if (destination.remoteId().isEmpty() || destination == source) {
275 changeProcessed();
276 return;
277 }
278
279 const Item::List validItems = filterValidItems(items);
280 if (validItems.isEmpty()) {
281 changeProcessed();
282 return;
283 }
284
285 AgentBasePrivate::itemsMoved(validItems, source, destination);
286 }
287
288 void itemRemoved(const Akonadi::Item &item) override
289 {
290 if (item.remoteId().isEmpty()) {
291 changeProcessed();
292 return;
293 }
294 AgentBasePrivate::itemRemoved(item);
295 }
296
297 void itemsRemoved(const Akonadi::Item::List &items) override
298 {
299 const Item::List validItems = filterValidItems(items);
300 if (validItems.isEmpty()) {
301 changeProcessed();
302 return;
303 }
304
305 AgentBasePrivate::itemsRemoved(validItems);
306 }
307
308 void collectionAdded(const Akonadi::Collection &collection, const Akonadi::Collection &parent) override
309 {
310 if (parent.remoteId().isEmpty()) {
311 changeProcessed();
312 return;
313 }
314 AgentBasePrivate::collectionAdded(collection, parent);
315 }
316
317 void collectionChanged(const Akonadi::Collection &collection) override
318 {
319 if (collection.remoteId().isEmpty()) {
320 changeProcessed();
321 return;
322 }
323 AgentBasePrivate::collectionChanged(collection);
324 }
325
326 void collectionChanged(const Akonadi::Collection &collection, const QSet<QByteArray> &partIdentifiers) override
327 {
328 if (collection.remoteId().isEmpty()) {
329 changeProcessed();
330 return;
331 }
332 AgentBasePrivate::collectionChanged(collection, partIdentifiers);
333 }
334
335 void collectionMoved(const Akonadi::Collection &collection, const Akonadi::Collection &source, const Akonadi::Collection &destination) override
336 {
337 // unknown destination or source == destination means we can't do/don't have to do anything
338 if (destination.remoteId().isEmpty() || source == destination) {
339 changeProcessed();
340 return;
341 }
342
343 // inter-resource moves, requires we know which resources the source and destination are in though
344 if (!source.resource().isEmpty() && !destination.resource().isEmpty() && source.resource() != destination.resource()) {
345 if (source.resource() == q_ptr->identifier()) { // moved away from us
346 AgentBasePrivate::collectionRemoved(collection);
347 } else if (destination.resource() == q_ptr->identifier()) { // moved to us
348 scheduler->taskDone(); // stop change replay for now
349 auto mover = new RecursiveMover(this);
350 mover->setCollection(collection, destination);
351 scheduler->scheduleMoveReplay(collection, mover);
352 }
353 return;
354 }
355
356 // intra-resource move, requires the moved collection to have a valid id though
357 if (collection.remoteId().isEmpty()) {
358 changeProcessed();
359 return;
360 }
361
362 // intra-resource move, ie. something we can handle internally
363 AgentBasePrivate::collectionMoved(collection, source, destination);
364 }
365
366 void collectionRemoved(const Akonadi::Collection &collection) override
367 {
368 if (collection.remoteId().isEmpty()) {
369 changeProcessed();
370 return;
371 }
372 AgentBasePrivate::collectionRemoved(collection);
373 }
374
375 void tagAdded(const Akonadi::Tag &tag) override
376 {
377 if (!tag.isValid()) {
378 changeProcessed();
379 return;
380 }
381
382 AgentBasePrivate::tagAdded(tag);
383 }
384
385 void tagChanged(const Akonadi::Tag &tag) override
386 {
387 if (tag.remoteId().isEmpty()) {
388 changeProcessed();
389 return;
390 }
391
392 AgentBasePrivate::tagChanged(tag);
393 }
394
395 void tagRemoved(const Akonadi::Tag &tag) override
396 {
397 if (tag.remoteId().isEmpty()) {
398 changeProcessed();
399 return;
400 }
401
402 AgentBasePrivate::tagRemoved(tag);
403 }
404
405private:
406 static Item::List filterValidItems(Item::List items)
407 {
408 items.erase(std::remove_if(items.begin(),
409 items.end(),
410 [](const auto &item) {
411 return item.remoteId().isEmpty();
412 }),
413 items.end());
414 return items;
415 }
416
417public:
418 // synchronize states
419 Collection currentCollection;
420
421 ResourceScheduler *scheduler = nullptr;
422 ItemSync *mItemSyncer = nullptr;
423 ItemSync::TransactionMode mItemTransactionMode;
424 ItemSync::MergeMode mItemMergeMode;
425 CollectionSync *mCollectionSyncer = nullptr;
426 TagSync *mTagSyncer = nullptr;
427 bool mHierarchicalRid;
428 QTimer mProgressEmissionCompressor;
429 int mUnemittedProgress;
430 QMap<Akonadi::Collection::Id, QVariantMap> mUnemittedAdvancedStatus;
431 bool mAutomaticProgressReporting;
432 bool mDisableAutomaticItemDeliveryDone;
433 QPointer<RecursiveMover> m_recursiveMover;
434 int mItemSyncBatchSize;
435 QSet<QByteArray> mKeepLocalCollectionChanges;
436 KJob *mCurrentCollectionFetchJob = nullptr;
437 bool mScheduleAttributeSyncBeforeCollectionSync;
438 QDateTime mCollectionSyncTimestamp;
439};
440
442 : AgentBase(new ResourceBasePrivate(this), id)
443{
445
446 qDBusRegisterMetaType<QByteArrayList>();
447
448 new Akonadi__ResourceAdaptor(this);
449
450 d->scheduler = new ResourceScheduler(this);
451
452 d->mChangeRecorder->setChangeRecordingEnabled(true);
453 d->mChangeRecorder->setCollectionMoveTranslationEnabled(false); // we deal with this ourselves
454 connect(d->mChangeRecorder, &ChangeRecorder::changesAdded, d->scheduler, &ResourceScheduler::scheduleChangeReplay);
455
456 d->mChangeRecorder->setResourceMonitored(d->mId.toLatin1());
457 d->mChangeRecorder->fetchCollection(true);
458
459 connect(d->scheduler, &ResourceScheduler::executeFullSync, this, &ResourceBase::retrieveCollections);
460 connect(d->scheduler, &ResourceScheduler::executeCollectionTreeSync, this, &ResourceBase::retrieveCollections);
461 connect(d->scheduler, &ResourceScheduler::executeCollectionSync, d, &ResourceBasePrivate::slotSynchronizeCollection);
462 connect(d->scheduler, &ResourceScheduler::executeCollectionAttributesSync, d, &ResourceBasePrivate::slotSynchronizeCollectionAttributes);
463 connect(d->scheduler, &ResourceScheduler::executeTagSync, d, &ResourceBasePrivate::slotSynchronizeTags);
464 connect(d->scheduler, &ResourceScheduler::executeItemFetch, d, &ResourceBasePrivate::slotPrepareItemRetrieval);
465 connect(d->scheduler, &ResourceScheduler::executeItemsFetch, d, &ResourceBasePrivate::slotPrepareItemsRetrieval);
466 connect(d->scheduler, &ResourceScheduler::executeResourceCollectionDeletion, d, &ResourceBasePrivate::slotDeleteResourceCollection);
467 connect(d->scheduler, &ResourceScheduler::executeCacheInvalidation, d, &ResourceBasePrivate::slotInvalidateCache);
468 connect(d->scheduler, &ResourceScheduler::status, this, qOverload<int, const QString &>(&ResourceBase::status));
469 connect(d->scheduler, &ResourceScheduler::executeChangeReplay, d->mChangeRecorder, &ChangeRecorder::replayNext);
470 connect(d->scheduler, &ResourceScheduler::executeRecursiveMoveReplay, d, &ResourceBasePrivate::slotRecursiveMoveReplay);
471 connect(d->scheduler, &ResourceScheduler::fullSyncComplete, this, &ResourceBase::synchronized);
472 connect(d->scheduler, &ResourceScheduler::collectionTreeSyncComplete, this, &ResourceBase::collectionTreeSynchronized);
473 connect(d->mChangeRecorder, &ChangeRecorder::nothingToReplay, d->scheduler, &ResourceScheduler::taskDone);
474 connect(d->mChangeRecorder, &Monitor::collectionRemoved, d->scheduler, &ResourceScheduler::collectionRemoved);
475 connect(this, &ResourceBase::abortRequested, d, &ResourceBasePrivate::slotAbortRequested);
476 connect(this, &ResourceBase::synchronized, d->scheduler, &ResourceScheduler::taskDone);
477 connect(this, &ResourceBase::collectionTreeSynchronized, d->scheduler, &ResourceScheduler::taskDone);
479 connect(&d->mProgressEmissionCompressor, &QTimer::timeout, d, &ResourceBasePrivate::slotDelayedEmitProgress);
480
481 d->scheduler->setOnline(d->mOnline);
482 if (!d->mChangeRecorder->isEmpty()) {
483 d->scheduler->scheduleChangeReplay();
484 }
485
486 new ResourceSelectJob(identifier());
487
488 connect(d->mChangeRecorder->session(), &Session::reconnected, d, &ResourceBasePrivate::slotSessionReconnected);
489}
490
492
494{
495 d_func()->scheduler->scheduleFullSync();
496}
497
502
504{
505 return AgentBase::agentName();
506}
507
508QString ResourceBase::parseArguments(int argc, char **argv)
509{
510 Q_UNUSED(argc)
511
512 QCommandLineOption identifierOption(QStringLiteral("identifier"), i18nc("@label command line option", "Resource identifier"), QStringLiteral("argument"));
513 QCommandLineParser parser;
514 parser.addOption(identifierOption);
515 parser.addHelpOption();
516 parser.addVersionOption();
517 parser.process(*qApp);
518 parser.setApplicationDescription(i18n("Akonadi Resource"));
519
520 if (!parser.isSet(identifierOption)) {
521 qCDebug(AKONADIAGENTBASE_LOG) << "Identifier argument missing";
522 exit(1);
523 }
524
525 const QString identifier = parser.value(identifierOption);
526
527 if (identifier.isEmpty()) {
528 qCDebug(AKONADIAGENTBASE_LOG) << "Identifier is empty";
529 exit(1);
530 }
531
533 QCoreApplication::setApplicationVersion(QStringLiteral(AKONADI_FULL_VERSION));
534
535 const QFileInfo fi(QString::fromLocal8Bit(argv[0]));
536 // strip off full path and possible .exe suffix
537 const QString catalog = fi.baseName();
538
539 auto translator = new QTranslator(qApp);
540 translator->load(catalog);
542
543 return identifier;
544}
545
547{
548 KLocalizedString::setApplicationDomain(QByteArrayLiteral("libakonadi6"));
549 KAboutData::setApplicationData(r.aboutData());
550 return qApp->exec();
551}
552
553void ResourceBasePrivate::slotAbortRequested()
554{
555 Q_Q(ResourceBase);
556
557 scheduler->cancelQueues();
558 q->abortActivity();
559}
560
562{
564 Q_ASSERT(d->scheduler->currentTask().type == ResourceScheduler::FetchItem);
565 if (!item.isValid()) {
566 d->scheduler->itemFetchDone(i18nc("@info", "Invalid item retrieved"));
567 return;
568 }
569
570 const QSet<QByteArray> requestedParts = d->scheduler->currentTask().itemParts;
571 for (const QByteArray &part : requestedParts) {
572 if (!item.loadedPayloadParts().contains(part)) {
573 qCWarning(AKONADIAGENTBASE_LOG) << "Item does not provide part" << part;
574 }
575 }
576
577 auto job = new ItemModifyJob(item);
578 job->d_func()->setSilent(true);
579 // FIXME: remove once the item with which we call retrieveItem() has a revision number
580 job->disableRevisionCheck();
581 connect(job, &KJob::result, d, &ResourceBasePrivate::slotDeliveryDone);
582}
583
584void ResourceBasePrivate::slotDeliveryDone(KJob *job)
585{
586 Q_Q(ResourceBase);
587 Q_ASSERT(scheduler->currentTask().type == ResourceScheduler::FetchItem);
588 if (job->error()) {
589 Q_EMIT q->error(i18nc("@info", "Error while creating item: %1", job->errorString()));
590 }
591 scheduler->itemFetchDone(QString());
592}
593
595{
597 Q_ASSERT(d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionAttributes);
598 if (!collection.isValid()) {
599 Q_EMIT attributesSynchronized(d->scheduler->currentTask().collection.id());
600 d->scheduler->taskDone();
601 return;
602 }
603
604 auto job = new CollectionModifyJob(collection);
605 connect(job, &KJob::result, d, &ResourceBasePrivate::slotCollectionAttributesSyncDone);
606}
607
608void ResourceBasePrivate::slotCollectionAttributesSyncDone(KJob *job)
609{
610 Q_Q(ResourceBase);
611 Q_ASSERT(scheduler->currentTask().type == ResourceScheduler::SyncCollectionAttributes);
612 if (job->error()) {
613 Q_EMIT q->error(i18nc("@info", "Error while updating collection: %1", job->errorString()));
614 }
615 Q_EMIT q->attributesSynchronized(scheduler->currentTask().collection.id());
616 scheduler->taskDone();
617}
618
619void ResourceBasePrivate::slotDeleteResourceCollection()
620{
621 Q_Q(ResourceBase);
622
624 job->fetchScope().setResource(q->identifier());
625 connect(job, &KJob::result, this, &ResourceBasePrivate::slotDeleteResourceCollectionDone);
626}
627
628void ResourceBasePrivate::slotDeleteResourceCollectionDone(KJob *job)
629{
630 Q_Q(ResourceBase);
631 if (job->error()) {
632 Q_EMIT q->error(job->errorString());
633 scheduler->taskDone();
634 } else {
635 const auto fetchJob = static_cast<const CollectionFetchJob *>(job);
636
637 if (!fetchJob->collections().isEmpty()) {
638 auto job = new CollectionDeleteJob(fetchJob->collections().at(0));
639 connect(job, &KJob::result, this, &ResourceBasePrivate::slotCollectionDeletionDone);
640 } else {
641 // there is no resource collection, so just ignore the request
642 scheduler->taskDone();
643 }
644 }
645}
646
647void ResourceBasePrivate::slotCollectionDeletionDone(KJob *job)
648{
649 Q_Q(ResourceBase);
650 if (job->error()) {
651 Q_EMIT q->error(job->errorString());
652 }
653
654 scheduler->taskDone();
655}
656
657void ResourceBasePrivate::slotInvalidateCache(const Akonadi::Collection &collection)
658{
659 Q_Q(ResourceBase);
660 auto job = new InvalidateCacheJob(collection, q);
661 connect(job, &KJob::result, scheduler, &ResourceScheduler::taskDone);
662}
663
665{
666 changesCommitted(Item::List() << item);
667}
668
670{
672 auto transaction = new TransactionSequence(this);
673 connect(transaction, &KJob::finished, d, &ResourceBasePrivate::changeCommittedResult);
674
675 // Modify the items one-by-one, because STORE does not support mass RID change
676 for (const Item &item : items) {
677 auto job = new ItemModifyJob(item, transaction);
678 job->d_func()->setClean();
679 job->disableRevisionCheck(); // TODO: remove, but where/how do we handle the error?
680 job->setIgnorePayload(true); // we only want to reset the dirty flag and update the remote id
681 }
682}
683
685{
687 auto job = new CollectionModifyJob(collection);
688 connect(job, &KJob::result, d, &ResourceBasePrivate::changeCommittedResult);
689}
690
691void ResourceBasePrivate::changeCommittedResult(KJob *job)
692{
693 if (job->error()) {
694 qCWarning(AKONADIAGENTBASE_LOG) << job->errorText();
695 }
696
697 Q_Q(ResourceBase);
698 if (qobject_cast<CollectionModifyJob *>(job)) {
699 if (job->error()) {
700 Q_EMIT q->error(i18nc("@info", "Updating local collection failed: %1.", job->errorText()));
701 }
702 mChangeRecorder->d_ptr->invalidateCache(static_cast<CollectionModifyJob *>(job)->collection());
703 } else {
704 if (job->error()) {
705 Q_EMIT q->error(i18nc("@info", "Updating local items failed: %1.", job->errorText()));
706 }
707 // Item and tag cache is invalidated by modify job
708 }
709
710 changeProcessed();
711}
712
714{
716 auto job = new TagModifyJob(tag);
717 connect(job, &KJob::result, d, &ResourceBasePrivate::changeCommittedResult);
718}
719
720void ResourceBase::requestItemDelivery(const QList<qint64> &uids, const QByteArrayList &parts)
721{
723 if (!isOnline()) {
724 const QString errorMsg = i18nc("@info", "Cannot fetch item in offline mode.");
726 Q_EMIT error(errorMsg);
727 return;
728 }
729
730 setDelayedReply(true);
731
732 const auto items = uids | Views::transform([](const auto uid) {
733 return Item{uid};
734 })
735 | Actions::toQVector;
736
737 const QSet<QByteArray> partSet = QSet<QByteArray>(parts.begin(), parts.end());
738 d->scheduler->scheduleItemsFetch(items, partSet, message());
739}
740
742{
744 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree || d->scheduler->currentTask().type == ResourceScheduler::SyncAll,
745 "ResourceBase::collectionsRetrieved()",
746 "Calling collectionsRetrieved() although no collection retrieval is in progress");
747 if (!d->mCollectionSyncer) {
748 d->mCollectionSyncer = new CollectionSync(identifier());
749 d->mCollectionSyncer->setHierarchicalRemoteIds(d->mHierarchicalRid);
750 d->mCollectionSyncer->setKeepLocalChanges(d->mKeepLocalCollectionChanges);
751 connect(d->mCollectionSyncer, &KJob::percentChanged, d,
752 &ResourceBasePrivate::slotPercent); // NOLINT(google-runtime-int): ulong comes from KJob
753 connect(d->mCollectionSyncer, &KJob::result, d, &ResourceBasePrivate::slotCollectionSyncDone);
754 }
755 d->mCollectionSyncer->setRemoteCollections(collections);
756}
757
758void ResourceBase::collectionsRetrievedIncremental(const Collection::List &changedCollections, const Collection::List &removedCollections)
759{
761 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree || d->scheduler->currentTask().type == ResourceScheduler::SyncAll,
762 "ResourceBase::collectionsRetrievedIncremental()",
763 "Calling collectionsRetrievedIncremental() although no collection retrieval is in progress");
764 if (!d->mCollectionSyncer) {
765 d->mCollectionSyncer = new CollectionSync(identifier());
766 d->mCollectionSyncer->setHierarchicalRemoteIds(d->mHierarchicalRid);
767 d->mCollectionSyncer->setKeepLocalChanges(d->mKeepLocalCollectionChanges);
768 connect(d->mCollectionSyncer, &KJob::percentChanged, d,
769 &ResourceBasePrivate::slotPercent); // NOLINT(google-runtime-int): ulong comes from KJob
770 connect(d->mCollectionSyncer, &KJob::result, d, &ResourceBasePrivate::slotCollectionSyncDone);
771 }
772 d->mCollectionSyncer->setRemoteCollections(changedCollections, removedCollections);
773}
774
776{
778 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree || d->scheduler->currentTask().type == ResourceScheduler::SyncAll,
779 "ResourceBase::setCollectionStreamingEnabled()",
780 "Calling setCollectionStreamingEnabled() although no collection retrieval is in progress");
781 if (!d->mCollectionSyncer) {
782 d->mCollectionSyncer = new CollectionSync(identifier());
783 d->mCollectionSyncer->setHierarchicalRemoteIds(d->mHierarchicalRid);
784 connect(d->mCollectionSyncer, &KJob::percentChanged, d,
785 &ResourceBasePrivate::slotPercent); // NOLINT(google-runtime-int): ulong comes from KJob
786 connect(d->mCollectionSyncer, &KJob::result, d, &ResourceBasePrivate::slotCollectionSyncDone);
787 }
788 d->mCollectionSyncer->setStreamingEnabled(enable);
789}
790
792{
794 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree || d->scheduler->currentTask().type == ResourceScheduler::SyncAll,
795 "ResourceBase::collectionsRetrievalDone()",
796 "Calling collectionsRetrievalDone() although no collection retrieval is in progress");
797 // streaming enabled, so finalize the sync
798 if (d->mCollectionSyncer) {
799 d->mCollectionSyncer->retrievalDone();
800 } else {
801 // user did the sync himself, we are done now
802 // FIXME: we need the same special case for SyncAll as in slotCollectionSyncDone here!
803 d->scheduler->taskDone();
804 }
805}
806
808{
810 d->mKeepLocalCollectionChanges = parts;
811}
812
813void ResourceBasePrivate::slotCollectionSyncDone(KJob *job)
814{
815 Q_Q(ResourceBase);
816 mCollectionSyncer = nullptr;
817 if (job->error()) {
818 if (job->error() != Job::UserCanceled) {
819 Q_EMIT q->error(job->errorString());
820 }
821 } else {
822 if (scheduler->currentTask().type == ResourceScheduler::SyncAll) {
824 list->setFetchScope(q->changeRecorder()->collectionFetchScope());
825 list->fetchScope().fetchAttribute<SpecialCollectionAttribute>();
826 list->fetchScope().fetchAttribute<FavoriteCollectionAttribute>();
827 list->fetchScope().setResource(mId);
828 list->fetchScope().setListFilter(CollectionFetchScope::Sync);
829 connect(list, &KJob::result, this, &ResourceBasePrivate::slotLocalListDone);
830 return;
831 } else if (scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree) {
832 scheduler->scheduleCollectionTreeSyncCompletion();
833 }
834 }
835 scheduler->taskDone();
836}
837
838namespace
839{
840bool sortCollectionsForSync(const Collection &l, const Collection &r)
841{
843 const bool lInbox = (lType == "inbox") || (QStringView(l.remoteId()).mid(1).compare(QLatin1StringView("inbox"), Qt::CaseInsensitive) == 0);
844 const bool lFav = l.hasAttribute<FavoriteCollectionAttribute>();
845
847 const bool rInbox = (rType == "inbox") || (QStringView(r.remoteId()).mid(1).compare(QLatin1StringView("inbox"), Qt::CaseInsensitive) == 0);
848 const bool rFav = r.hasAttribute<FavoriteCollectionAttribute>();
849
850 // inbox is always first
851 if (lInbox) {
852 return true;
853 } else if (rInbox) {
854 return false;
855 }
856
857 // favorites right after inbox
858 if (lFav) {
859 return !rInbox;
860 } else if (rFav) {
861 return lInbox;
862 }
863
864 // trash is always last (unless it's favorite)
865 if (lType == "trash") {
866 return false;
867 } else if (rType == "trash") {
868 return true;
869 }
870
871 // Fallback to sorting by id
872 return l.id() < r.id();
873}
874
875} // namespace
876
877void ResourceBasePrivate::slotLocalListDone(KJob *job)
878{
879 Q_Q(ResourceBase);
880 if (job->error()) {
881 Q_EMIT q->error(job->errorString());
882 } else {
883 Collection::List cols = static_cast<CollectionFetchJob *>(job)->collections();
884 std::sort(cols.begin(), cols.end(), sortCollectionsForSync);
885 for (const Collection &col : std::as_const(cols)) {
886 scheduler->scheduleSync(col);
887 }
888 scheduler->scheduleFullSyncCompletion();
889 }
890 scheduler->taskDone();
891}
892
893void ResourceBasePrivate::slotSynchronizeCollection(const Collection &col)
894{
895 Q_Q(ResourceBase);
896 currentCollection = col;
897 // This can happen due to FetchHelper::triggerOnDemandFetch() in the akonadi server (not an error).
898 if (!col.remoteId().isEmpty()) {
899 // check if this collection actually can contain anything
900 QStringList contentTypes = currentCollection.contentMimeTypes();
901 contentTypes.removeAll(Collection::mimeType());
903 if (!contentTypes.isEmpty() || col.isVirtual()) {
904 if (mAutomaticProgressReporting) {
905 Q_EMIT q->status(AgentBase::Running, i18nc("@info:status", "Syncing folder '%1'", currentCollection.displayName()));
906 }
907
908 qCDebug(AKONADIAGENTBASE_LOG) << "Preparing collection sync of collection" << currentCollection.id() << currentCollection.displayName();
909 auto fetchJob = new Akonadi::CollectionFetchJob(col, CollectionFetchJob::Base, this);
910 fetchJob->setFetchScope(q->changeRecorder()->collectionFetchScope());
911 connect(fetchJob, &KJob::result, this, &ResourceBasePrivate::slotItemRetrievalCollectionFetchDone);
912 mCurrentCollectionFetchJob = fetchJob;
913 return;
914 }
915 }
916 scheduler->taskDone();
917}
918
919void ResourceBasePrivate::slotItemRetrievalCollectionFetchDone(KJob *job)
920{
921 Q_Q(ResourceBase);
922 mCurrentCollectionFetchJob = nullptr;
923 if (job->error()) {
924 qCWarning(AKONADIAGENTBASE_LOG) << "Failed to retrieve collection for sync: " << job->errorString();
925 q->cancelTask(i18n("Failed to retrieve collection for sync."));
926 return;
927 }
928 auto fetchJob = static_cast<Akonadi::CollectionFetchJob *>(job);
929 const Collection::List collections = fetchJob->collections();
930 if (collections.isEmpty()) {
931 qCWarning(AKONADIAGENTBASE_LOG) << "The fetch job returned empty collection set. This is unexpected.";
932 q->cancelTask(i18n("Failed to retrieve collection for sync."));
933 return;
934 }
935 mCollectionSyncTimestamp = QDateTime::currentDateTimeUtc();
936 q->retrieveItems(collections.at(0));
937}
938
940{
941 Q_D(const ResourceBase);
942 return d->mItemSyncBatchSize;
943}
944
946{
948 d->mItemSyncBatchSize = batchSize;
949}
950
952{
954 d->mScheduleAttributeSyncBeforeCollectionSync = enable;
955}
956
957void ResourceBasePrivate::slotSynchronizeCollectionAttributes(const Collection &col)
958{
959 Q_Q(ResourceBase);
960 auto fetchJob = new Akonadi::CollectionFetchJob(col, CollectionFetchJob::Base, this);
961 fetchJob->setFetchScope(q->changeRecorder()->collectionFetchScope());
962 connect(fetchJob, &KJob::result, this, &ResourceBasePrivate::slotAttributeRetrievalCollectionFetchDone);
963 Q_ASSERT(!mCurrentCollectionFetchJob);
964 mCurrentCollectionFetchJob = fetchJob;
965}
966
967void ResourceBasePrivate::slotAttributeRetrievalCollectionFetchDone(KJob *job)
968{
969 mCurrentCollectionFetchJob = nullptr;
970 Q_Q(ResourceBase);
971 if (job->error()) {
972 qCWarning(AKONADIAGENTBASE_LOG) << "Failed to retrieve collection for attribute sync: " << job->errorString();
973 q->cancelTask(i18n("Failed to retrieve collection for attribute sync."));
974 return;
975 }
976 auto fetchJob = static_cast<Akonadi::CollectionFetchJob *>(job);
977 // FIXME: Why not call q-> directly?
978 QMetaObject::invokeMethod(q, "retrieveCollectionAttributes", Q_ARG(Akonadi::Collection, fetchJob->collections().at(0)));
979}
980
981void ResourceBasePrivate::slotSynchronizeTags()
982{
983 Q_Q(ResourceBase);
984 QMetaObject::invokeMethod(this, [q] {
985 q->retrieveTags();
986 });
987}
988
989void ResourceBasePrivate::slotPrepareItemRetrieval(const Item &item)
990{
991 Q_Q(ResourceBase);
992 auto fetch = new ItemFetchJob(item, this);
993 // we always need at least parent so we can use ItemCreateJob to merge
994 fetch->fetchScope().setAncestorRetrieval(qMax(ItemFetchScope::Parent, q->changeRecorder()->itemFetchScope().ancestorRetrieval()));
995 fetch->fetchScope().setCacheOnly(true);
996 fetch->fetchScope().setFetchRemoteIdentification(true);
997
998 // copy list of attributes to fetch
999 const QSet<QByteArray> attributes = q->changeRecorder()->itemFetchScope().attributes();
1000 for (const auto &attribute : attributes) {
1001 fetch->fetchScope().fetchAttribute(attribute);
1002 }
1003
1004 connect(fetch, &KJob::result, this, &ResourceBasePrivate::slotPrepareItemRetrievalResult);
1005}
1006
1007void ResourceBasePrivate::slotPrepareItemRetrievalResult(KJob *job)
1008{
1009 Q_Q(ResourceBase);
1010 Q_ASSERT_X(scheduler->currentTask().type == ResourceScheduler::FetchItem,
1011 "ResourceBasePrivate::slotPrepareItemRetrievalResult()",
1012 "Preparing item retrieval although no item retrieval is in progress");
1013 if (job->error()) {
1014 q->cancelTask(job->errorText());
1015 return;
1016 }
1017 auto fetch = qobject_cast<ItemFetchJob *>(job);
1018 if (fetch->items().count() != 1) {
1019 q->cancelTask(i18n("The requested item no longer exists"));
1020 return;
1021 }
1022 const QSet<QByteArray> parts = scheduler->currentTask().itemParts;
1023 if (!q->retrieveItem(fetch->items().at(0), parts)) {
1024 q->cancelTask();
1025 }
1026}
1027
1028void ResourceBasePrivate::slotPrepareItemsRetrieval(const QList<Item> &items)
1029{
1030 Q_Q(ResourceBase);
1031 auto fetch = new ItemFetchJob(items, this);
1032 // we always need at least parent so we can use ItemCreateJob to merge
1033 fetch->fetchScope().setAncestorRetrieval(qMax(ItemFetchScope::Parent, q->changeRecorder()->itemFetchScope().ancestorRetrieval()));
1034 fetch->fetchScope().setCacheOnly(true);
1035 fetch->fetchScope().setFetchRemoteIdentification(true);
1036 // It's possible that one or more items were removed before this task was
1037 // executed, so ignore it and just handle the rest.
1038 fetch->fetchScope().setIgnoreRetrievalErrors(true);
1039
1040 // copy list of attributes to fetch
1041 const QSet<QByteArray> attributes = q->changeRecorder()->itemFetchScope().attributes();
1042 for (const auto &attribute : attributes) {
1043 fetch->fetchScope().fetchAttribute(attribute);
1044 }
1045
1046 connect(fetch, &KJob::result, this, &ResourceBasePrivate::slotPrepareItemsRetrievalResult);
1047}
1048
1049void ResourceBasePrivate::slotPrepareItemsRetrievalResult(KJob *job)
1050{
1051 Q_Q(ResourceBase);
1052 Q_ASSERT_X(scheduler->currentTask().type == ResourceScheduler::FetchItems,
1053 "ResourceBasePrivate::slotPrepareItemsRetrievalResult()",
1054 "Preparing items retrieval although no items retrieval is in progress");
1055 if (job->error()) {
1056 q->cancelTask(job->errorText());
1057 return;
1058 }
1059 auto fetch = qobject_cast<ItemFetchJob *>(job);
1060 const auto items = fetch->items();
1061 if (items.isEmpty()) {
1062 q->cancelTask();
1063 return;
1064 }
1065
1066 const QSet<QByteArray> parts = scheduler->currentTask().itemParts;
1067 Q_ASSERT(items.first().parentCollection().isValid());
1068 if (!q->retrieveItems(items, parts)) {
1069 q->cancelTask();
1070 }
1071}
1072
1073void ResourceBasePrivate::slotRecursiveMoveReplay(RecursiveMover *mover)
1074{
1075 Q_ASSERT(mover);
1076 Q_ASSERT(!m_recursiveMover);
1077 m_recursiveMover = mover;
1078 connect(mover, &KJob::result, this, &ResourceBasePrivate::slotRecursiveMoveReplayResult);
1079 mover->start();
1080}
1081
1082void ResourceBasePrivate::slotRecursiveMoveReplayResult(KJob *job)
1083{
1084 Q_Q(ResourceBase);
1085 m_recursiveMover = nullptr;
1086
1087 if (job->error()) {
1088 q->deferTask();
1089 return;
1090 }
1091
1092 changeProcessed();
1093}
1094
1096{
1098 // streaming enabled, so finalize the sync
1099 if (d->mItemSyncer) {
1100 d->mItemSyncer->deliveryDone();
1101 } else {
1102 if (d->scheduler->currentTask().type == ResourceScheduler::FetchItems) {
1103 d->scheduler->currentTask().sendDBusReplies(QString());
1104 }
1105 // user did the sync himself, we are done now
1106 d->scheduler->taskDone();
1107 }
1108}
1109
1111{
1113 d->scheduler->scheduleResourceCollectionDeletion();
1114}
1115
1117{
1119 d->scheduler->scheduleCacheInvalidation(collection);
1120}
1121
1123{
1124 Q_D(const ResourceBase);
1125 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::SyncCollection,
1126 "ResourceBase::currentCollection()",
1127 "Trying to access current collection although no item retrieval is in progress");
1128 return d->currentCollection;
1129}
1130
1132{
1133 Q_D(const ResourceBase);
1134 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::FetchItem,
1135 "ResourceBase::currentItem()",
1136 "Trying to access current item although no item retrieval is in progress");
1137 return d->scheduler->currentTask().items[0];
1138}
1139
1141{
1142 Q_D(const ResourceBase);
1143 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::FetchItems,
1144 "ResourceBase::currentItems()",
1145 "Trying to access current items although no items retrieval is in progress");
1146 return d->scheduler->currentTask().items;
1147}
1148
1150{
1151 d_func()->scheduler->scheduleCollectionTreeSync();
1152}
1153
1155{
1156 d_func()->scheduler->scheduleTagSync();
1157}
1158
1160{
1162 if (d->mCurrentCollectionFetchJob) {
1163 d->mCurrentCollectionFetchJob->kill();
1164 d->mCurrentCollectionFetchJob = nullptr;
1165 }
1166 switch (d->scheduler->currentTask().type) {
1167 case ResourceScheduler::FetchItem:
1168 itemRetrieved(Item()); // sends the error reply and
1169 break;
1170 case ResourceScheduler::FetchItems:
1172 break;
1173 case ResourceScheduler::ChangeReplay:
1174 d->changeProcessed();
1175 break;
1176 case ResourceScheduler::SyncCollectionTree:
1177 case ResourceScheduler::SyncAll:
1178 if (d->mCollectionSyncer) {
1179 d->mCollectionSyncer->rollback();
1180 } else {
1181 d->scheduler->taskDone();
1182 }
1183 break;
1184 case ResourceScheduler::SyncCollection:
1185 if (d->mItemSyncer) {
1186 d->mItemSyncer->rollback();
1187 } else {
1188 d->scheduler->taskDone();
1189 }
1190 break;
1191 default:
1192 d->scheduler->taskDone();
1193 }
1194}
1195
1197{
1198 cancelTask();
1199
1200 Q_EMIT error(msg);
1201}
1202
1204{
1206 qCDebug(AKONADIAGENTBASE_LOG) << "Deferring task" << d->scheduler->currentTask();
1207 // Deferring a CollectionSync is just not implemented.
1208 // We'd need to d->mItemSyncer->rollback() but also to NOT call taskDone in slotItemSyncDone() here...
1209 Q_ASSERT(!d->mItemSyncer);
1210 d->scheduler->deferTask();
1211}
1212
1214{
1215 d_func()->scheduler->setOnline(state);
1216}
1217
1219{
1220 synchronizeCollection(collectionId, false);
1221}
1222
1223void ResourceBase::synchronizeCollection(qint64 collectionId, bool recursive)
1224{
1226 auto job = new CollectionFetchJob(Collection(collectionId), recursive ? CollectionFetchJob::Recursive : CollectionFetchJob::Base);
1227 job->setFetchScope(changeRecorder()->collectionFetchScope());
1228 job->fetchScope().setResource(identifier());
1229 job->fetchScope().setListFilter(CollectionFetchScope::Sync);
1230 connect(job, &KJob::result, d, &ResourceBasePrivate::slotCollectionListDone);
1231}
1232
1233void ResourceBasePrivate::slotCollectionListDone(KJob *job)
1234{
1235 if (!job->error()) {
1236 const Collection::List list = static_cast<CollectionFetchJob *>(job)->collections();
1237 for (const Collection &collection : list) {
1238 // We also get collections that should not be synced but are part of the tree.
1239 if (collection.shouldList(Collection::ListSync)) {
1240 if (mScheduleAttributeSyncBeforeCollectionSync) {
1241 scheduler->scheduleAttributesSync(collection);
1242 }
1243 scheduler->scheduleSync(collection);
1244 }
1245 }
1246 } else {
1247 qCWarning(AKONADIAGENTBASE_LOG) << "Failed to fetch collection for collection sync: " << job->errorString();
1248 }
1249}
1250
1252{
1254 d->scheduler->scheduleAttributesSync(col);
1255}
1256
1258{
1260 auto job = new CollectionFetchJob(Collection(collectionId), CollectionFetchJob::Base);
1261 job->setFetchScope(changeRecorder()->collectionFetchScope());
1262 job->fetchScope().setResource(identifier());
1263 connect(job, &KJob::result, d, &ResourceBasePrivate::slotCollectionListForAttributesDone);
1264}
1265
1266void ResourceBasePrivate::slotCollectionListForAttributesDone(KJob *job)
1267{
1268 if (!job->error()) {
1269 const Collection::List list = static_cast<CollectionFetchJob *>(job)->collections();
1270 if (!list.isEmpty()) {
1271 const Collection &col = list.first();
1272 scheduler->scheduleAttributesSync(col);
1273 }
1274 }
1275 // TODO: error handling
1276}
1277
1279{
1280 qCDebug(AKONADIAGENTBASE_LOG) << amount;
1283 if (d->mItemSyncer) {
1284 d->mItemSyncer->setTotalItems(amount);
1285 }
1286}
1287
1289{
1291 if (d->mItemSyncer) {
1292 d->mItemSyncer->setDisableAutomaticDeliveryDone(disable);
1293 }
1294 d->mDisableAutomaticItemDeliveryDone = disable;
1295}
1296
1298{
1300 d->createItemSyncInstanceIfMissing();
1301 if (d->mItemSyncer) {
1302 d->mItemSyncer->setStreamingEnabled(enable);
1303 }
1304}
1305
1307{
1309 if (d->scheduler->currentTask().type == ResourceScheduler::FetchItems) {
1310 auto trx = new TransactionSequence(this);
1311 connect(trx, &KJob::result, d, &ResourceBasePrivate::slotItemSyncDone);
1312 for (const Item &item : items) {
1313 Q_ASSERT(item.parentCollection().isValid());
1314 if (item.isValid()) { // NOLINT(bugprone-branch-clone)
1315 new ItemModifyJob(item, trx);
1316 } else if (!item.remoteId().isEmpty()) {
1317 auto job = new ItemCreateJob(item, item.parentCollection(), trx);
1318 job->setMerge(ItemCreateJob::RID);
1319 } else {
1320 // This should not happen, but just to be sure...
1321 new ItemModifyJob(item, trx);
1322 }
1323 }
1324 trx->commit();
1325 } else {
1326 d->createItemSyncInstanceIfMissing();
1327 if (d->mItemSyncer) {
1328 d->mItemSyncer->setFullSyncItems(items);
1329 }
1330 }
1331}
1332
1333void ResourceBase::itemsRetrievedIncremental(const Item::List &changedItems, const Item::List &removedItems)
1334{
1336 d->createItemSyncInstanceIfMissing();
1337 if (d->mItemSyncer) {
1338 d->mItemSyncer->setIncrementalSyncItems(changedItems, removedItems);
1339 }
1340}
1341
1342void ResourceBasePrivate::slotItemSyncDone(KJob *job)
1343{
1344 mItemSyncer = nullptr;
1345 Q_Q(ResourceBase);
1346 if (job->error() && job->error() != Job::UserCanceled) {
1347 Q_EMIT q->error(job->errorString());
1348 }
1349 if (scheduler->currentTask().type == ResourceScheduler::FetchItems) {
1350 scheduler->currentTask().sendDBusReplies((job->error() && job->error() != Job::UserCanceled) ? job->errorString() : QString());
1351 }
1352 scheduler->taskDone();
1353}
1354
1355void ResourceBasePrivate::slotDelayedEmitProgress()
1356{
1357 Q_Q(ResourceBase);
1358 if (mAutomaticProgressReporting) {
1359 Q_EMIT q->percent(mUnemittedProgress);
1360
1361 for (const QVariantMap &statusMap : std::as_const(mUnemittedAdvancedStatus)) {
1362 Q_EMIT q->advancedStatus(statusMap);
1363 }
1364 }
1365 mUnemittedProgress = 0;
1366 mUnemittedAdvancedStatus.clear();
1367}
1368
1369void ResourceBasePrivate::slotPercent(KJob *job, quint64 percent)
1370{
1371 mUnemittedProgress = static_cast<int>(percent);
1372
1373 const auto collection = job->property("collection").value<Collection>();
1374 if (collection.isValid()) {
1375 QVariantMap statusMap;
1376 statusMap.insert(QStringLiteral("key"), QStringLiteral("collectionSyncProgress"));
1377 statusMap.insert(QStringLiteral("collectionId"), collection.id());
1378 statusMap.insert(QStringLiteral("percent"), static_cast<unsigned int>(percent));
1379
1380 mUnemittedAdvancedStatus[collection.id()] = statusMap;
1381 }
1382 // deliver completion right away, intermediate progress at 1s intervals
1383 if (percent == 100U) {
1384 mProgressEmissionCompressor.stop();
1385 slotDelayedEmitProgress();
1386 } else if (!mProgressEmissionCompressor.isActive()) {
1387 mProgressEmissionCompressor.start();
1388 }
1389}
1390
1392{
1394 d->mHierarchicalRid = enable;
1395}
1396
1397void ResourceBase::scheduleCustomTask(QObject *receiver, const char *method, const QVariant &argument, SchedulePriority priority)
1398{
1400 d->scheduler->scheduleCustomTask(receiver, method, argument, priority);
1401}
1402
1404{
1406 d->scheduler->taskDone();
1407}
1408
1413
1415{
1417 d->scheduler->taskDone();
1418}
1419
1421{
1422 Q_UNUSED(item)
1423 Q_UNUSED(parts)
1424 // retrieveItem() can no longer be pure virtual, because then we could not mark
1425 // it as deprecated (i.e. implementations would still be forced to implement it),
1426 // so instead we assert here.
1427 // NOTE: Don't change to Q_ASSERT_X here: while the macro can be disabled at
1428 // compile time, we want to hit this assert *ALWAYS*.
1429 qt_assert_x("Akonadi::ResourceBase::retrieveItem()",
1430 "The base implementation of retrieveItem() must never be reached. "
1431 "You must implement either retrieveItem() or retrieveItems(Akonadi::Item::List, QSet<QByteArray>) overload "
1432 "to handle item retrieval requests.",
1433 __FILE__,
1434 __LINE__);
1435 return false;
1436}
1437
1439{
1441
1442 // If we reach this implementation of retrieveItems() then it means that the
1443 // resource is still using the deprecated retrieveItem() method, so we explode
1444 // this to a myriad of tasks in scheduler and let them be processed one by one
1445
1446 const qint64 id = d->scheduler->currentTask().serial;
1447 for (const auto &item : items) {
1448 d->scheduler->scheduleItemFetch(item, parts, d->scheduler->currentTask().dbusMsgs, id);
1449 }
1450 taskDone();
1451 return true;
1452}
1453
1457
1459{
1461 d->mItemTransactionMode = mode;
1462}
1463
1464void ResourceBase::setItemMergingMode(ItemSync::MergeMode mode)
1465{
1467 d->mItemMergeMode = mode;
1468}
1469
1471{
1473 d->mAutomaticProgressReporting = enabled;
1474}
1475
1477{
1478 Q_D(const ResourceBase);
1479 return d->dumpNotificationListToString();
1480}
1481
1483{
1484 Q_D(const ResourceBase);
1485 return d->dumpToString();
1486}
1487
1489{
1490 Q_D(const ResourceBase);
1491 d->dumpMemoryInfo();
1492}
1493
1495{
1496 Q_D(const ResourceBase);
1497 return d->dumpMemoryInfoToString();
1498}
1499
1500void ResourceBase::tagsRetrieved(const Tag::List &tags, const QHash<QString, Item::List> &tagMembers)
1501{
1503 Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::SyncTags || d->scheduler->currentTask().type == ResourceScheduler::SyncAll
1504 || d->scheduler->currentTask().type == ResourceScheduler::Custom,
1505 "ResourceBase::tagsRetrieved()",
1506 "Calling tagsRetrieved() although no tag retrieval is in progress");
1507 if (!d->mTagSyncer) {
1508 d->mTagSyncer = new TagSync(this);
1509 connect(d->mTagSyncer, &KJob::percentChanged, d,
1510 &ResourceBasePrivate::slotPercent); // NOLINT(google-runtime-int): ulong comes from KJob
1511 connect(d->mTagSyncer, &KJob::result, d, &ResourceBasePrivate::slotTagSyncDone);
1512 }
1513 d->mTagSyncer->setFullTagList(tags);
1514 d->mTagSyncer->setTagMembers(tagMembers);
1515}
1516
1517void ResourceBasePrivate::slotTagSyncDone(KJob *job)
1518{
1519 Q_Q(ResourceBase);
1520 mTagSyncer = nullptr;
1521 if (job->error()) {
1522 if (job->error() != Job::UserCanceled) {
1523 qCWarning(AKONADIAGENTBASE_LOG) << "TagSync failed: " << job->errorString();
1524 Q_EMIT q->error(job->errorString());
1525 }
1526 }
1527
1528 scheduler->taskDone();
1529}
1530
1531#include "moc_resourcebase.cpp"
1532#include "resourcebase.moc"
The base class for all Akonadi agents and resources.
Definition agentbase.h:73
virtual int status() const
This method returns the current status code of the agent.
void setAgentName(const QString &name)
This method is used to set the name of the agent.
ChangeRecorder * changeRecorder() const
Returns the Akonadi::ChangeRecorder object used for monitoring.
QString agentName() const
Returns the name of the agent.
@ Running
The agent is working on something.
Definition agentbase.h:398
void abortRequested()
Emitted when another application has remotely asked the agent to abort its current operation.
bool isOnline() const
Returns whether the agent is currently online.
QString identifier() const
Returns the instance identifier of this agent.
void agentNameChanged(const QString &name)
This signal is emitted whenever the name of the agent has changed.
void error(const QString &message)
This signal shall be used to report errors.
void changesAdded()
Emitted when new changes are recorded.
void replayNext()
Replay the next change notification and erase the previous one from the record.
void nothingToReplay()
Emitted when replayNext() was called, but there was no valid change to replay.
Job that deletes a collection in the Akonadi storage.
Job that fetches collections from the Akonadi storage.
@ Recursive
List all sub-collections.
@ FirstLevel
Only list direct sub-collections of the base collection.
@ Base
Only fetch the base collection.
@ Sync
Only retrieve collections for synchronization, taking the local preference and enabled into account.
Job that modifies a collection in the Akonadi storage.
Represents a collection of PIM items.
Definition collection.h:62
static QString mimeType()
Returns the mimetype used for collections.
@ ListSync
Listing for synchronization.
Definition collection.h:480
bool hasAttribute(const QByteArray &name) const
Returns true if the collection has an attribute of the given type name, false otherwise.
static Collection root()
Returns the root collection.
static QString virtualMimeType()
Returns the mimetype used for virtual collections.
bool shouldList(ListPurpose purpose) const
Returns whether the collection should be listed or not for the specified purpose Takes enabled state ...
Attribute * attribute(const QByteArray &name)
Returns the attribute of the given type name if available, 0 otherwise.
QString remoteId() const
Returns the remote id of the collection.
Job that creates a new item in the Akonadi storage.
@ RID
Merge by remote id.
Job that fetches items from the Akonadi storage.
@ Parent
Only retrieve the immediate parent collection.
Job that modifies an existing item in the Akonadi storage.
Syncs between items known to a client (usually a resource) and the Akonadi storage.
Definition itemsync.h:41
void readyForNextBatch(int remainingBatchSize)
Signals the resource that new items can be delivered.
TransactionMode
Transaction mode used by ItemSync.
Definition itemsync.h:129
void setDisableAutomaticDeliveryDone(bool disable)
Disables the automatic completion of the item sync, based on the number of delivered items.
Definition itemsync.cpp:238
void setBatchSize(int)
Set the batch size.
Definition itemsync.cpp:539
void setMergeMode(MergeMode mergeMode)
Set what merge method should be used for next ItemSync run.
Definition itemsync.cpp:551
void setTransactionMode(TransactionMode mode)
Set the transaction mode to use for this sync.
Definition itemsync.cpp:527
Represents a PIM item stored in Akonadi storage.
Definition item.h:100
QString remoteId() const
Returns the remote id of the item.
Definition item.cpp:73
bool isValid() const
Returns whether the item is valid.
Definition item.cpp:88
QSet< QByteArray > loadedPayloadParts() const
Returns the list of loaded payload parts.
Definition item.cpp:283
@ UserCanceled
The user canceled this job.
Definition job.h:101
CollectionFetchScope & collectionFetchScope()
Returns the collection fetch scope.
Definition monitor.cpp:235
void collectionRemoved(const Akonadi::Collection &collection)
This signal is emitted if a monitored collection has been removed from the Akonadi storage.
The base class for all Akonadi resources.
void setScheduleAttributeSyncBeforeItemSync(bool)
Set to true to schedule an attribute sync before every item sync.
void setItemTransactionMode(ItemSync::TransactionMode mode)
Set transaction mode for item sync'ing.
QString dumpNotificationListToString() const
Dump the contents of the current ChangeReplay.
void dumpMemoryInfo() const
Dumps memory usage information to stdout.
void taskDone()
Indicate that the current task is finished.
void invalidateCache(const Collection &collection)
Call this method to invalidate all cached content in collection.
void collectionAttributesRetrieved(const Collection &collection)
Call this method from retrieveCollectionAttributes() once the result is available.
int itemSyncBatchSize() const
Returns the batch size used during the item sync.
void setItemSyncBatchSize(int batchSize)
Set the batch size used during the item sync.
void setDisableAutomaticItemDeliveryDone(bool disable)
Disables the automatic completion of the item sync, based on the number of delivered items.
static int init(int argc, char **argv)
Use this method in the main function of your resource application to initialize your resource subclas...
void synchronizeCollection(qint64 id)
This method is called whenever the collection with the given id shall be synchronized.
virtual void retrieveTags()
Retrieve all tags from the backend.
void synchronized()
Emitted when a full synchronization has been completed.
virtual void retrieveCollections()=0
Retrieve the collection tree from the remote server and supply it via collectionsRetrieved() or colle...
QString dumpSchedulerToString() const
Dump the state of the scheduler.
QString dumpMemoryInfoToString() const
Returns a string with memory usage information.
SchedulePriority
Describes the scheduling priority of a task that has been queued for execution.
void retrieveNextItemSyncBatch(int remainingBatchSize)
Emitted when the item synchronization processed the current batch and is ready for a new one.
void setKeepLocalCollectionChanges(const QSet< QByteArray > &parts)
Allows to keep locally changed collection parts during the collection sync.
void attributesSynchronized(qlonglong collectionId)
Emitted when a collection attributes synchronization has been completed.
void itemsRetrievalDone()
Call this method to indicate you finished synchronizing the current collection.
Item::List currentItems() const
Returns the items that are currently retrieved.
void setTotalItems(int amount)
Call this method when you want to use the itemsRetrieved() method in streaming mode and indicate the ...
void setAutomaticProgressReporting(bool enabled)
Enable or disable automatic progress reporting.
void collectionsRetrieved(const Collection::List &collections)
Call this to supply the full folder tree retrieved from the remote server.
void deferTask()
Suspends the execution of the current task and tries again to execute it.
void setItemStreamingEnabled(bool enable)
Enable item streaming, which is disabled by default.
QString name() const
Returns the name of the resource.
void collectionsRetrievalDone()
Call this method to indicate you finished synchronizing the collection tree.
void clearCache()
Call this method to remove all items and collections of the resource from the server cache.
void scheduleCustomTask(QObject *receiver, const char *method, const QVariant &argument, SchedulePriority priority=Append)
Schedules a custom task in the internal scheduler.
void collectionsRetrievedIncremental(const Collection::List &changedCollections, const Collection::List &removedCollections)
Call this to supply incrementally retrieved collections from the remote server.
void itemRetrieved(const Item &item)
Call this method from retrieveItem() once the result is available.
virtual AKONADIAGENTBASE_DEPRECATED bool retrieveItem(const Akonadi::Item &item, const QSet< QByteArray > &parts)
Retrieve a single item from the backend.
void synchronize()
This method is called whenever the resource should start synchronize all data.
void changeCommitted(const Item &item)
Resets the dirty flag of the given item and updates the remote id.
void itemsRetrieved(const Item::List &items)
Call this method to supply the full collection listing from the remote server.
void doSetOnline(bool online) override
Inherited from AgentBase.
Collection currentCollection() const
Returns the collection that is currently synchronized.
void nameChanged(const QString &name)
This signal is emitted whenever the name of the resource has changed.
virtual void retrieveItems(const Akonadi::Collection &collection)=0
Retrieve all (new/changed) items in collection collection.
void cancelTask()
Stops the execution of the current task and continues with the next one.
void itemsRetrievedIncremental(const Item::List &changedItems, const Item::List &removedItems)
Call this method to supply incrementally retrieved items from the remote server.
void setItemMergingMode(ItemSync::MergeMode mode)
Set merge mode for item sync'ing.
void setHierarchicalRemoteIdentifiersEnabled(bool enable)
Indicate the use of hierarchical remote identifiers.
ResourceBase(const QString &id)
Creates a base resource.
virtual void retrieveCollectionAttributes(const Akonadi::Collection &collection)
Retrieve the attributes of a single collection from the backend.
void collectionTreeSynchronized()
Emitted when a collection tree synchronization has been completed.
~ResourceBase() override
Destroys the base resource.
void setName(const QString &name)
This method is used to set the name of the resource.
void synchronizeCollectionTree()
Refetches the Collections.
void synchronizeTags()
Refetches Tags.
void setCollectionStreamingEnabled(bool enable)
Enable collection streaming, that is collections don't have to be delivered at once as result of a re...
void changesCommitted(const Item::List &items)
Resets the dirty flag of all given items and updates remote ids.
virtual void abortActivity()
Abort any activity in progress in the backend.
void synchronizeCollectionAttributes(qint64 id)
This method is called whenever the collection with the given id shall have its attributes synchronize...
AKONADIAGENTBASE_DEPRECATED Item currentItem() const
Returns the item that is currently retrieved.
static QString agentServiceName(ServiceAgentType agentType, const QString &identifier)
Returns the namespaced D-Bus service name for an agent of type agentType with agent identifier identi...
static QString addNamespace(const QString &string)
Adds the multi-instance namespace to string if required (with '_' as separator).
void reconnected()
This signal is emitted whenever the session has been reconnected to the server (e....
An Attribute that stores the special collection type of a collection.
QByteArray collectionType() const
Returns the special collections type of the collection.
Job that modifies a tag in the Akonadi storage.
An Akonadi Tag.
Definition tag.h:26
Base class for jobs that need to run a sequence of sub-jobs in a transaction.
static void setApplicationData(const KAboutData &aboutData)
virtual QString errorString() const
int error() const
void result(KJob *job)
void finished(KJob *job)
QString errorText() const
void percentChanged(KJob *job, unsigned long percent)
static void setApplicationDomain(const QByteArray &domain)
QString i18nc(const char *context, const char *text, const TYPE &arg...)
QString i18n(const char *text, const TYPE &arg...)
Helper integration between Akonadi and Qt.
KIOCORE_EXPORT QStringList list(const QString &fileClass)
bool isEmpty() const const
QCommandLineOption addHelpOption()
bool addOption(const QCommandLineOption &option)
QCommandLineOption addVersionOption()
bool isSet(const QCommandLineOption &option) const const
void process(const QCoreApplication &app)
void setApplicationDescription(const QString &description)
QString value(const QCommandLineOption &option) const const
void setApplicationName(const QString &application)
void setApplicationVersion(const QString &version)
void exit(int returnCode)
bool installTranslator(QTranslator *translationFile)
QCoreApplication * instance()
QDateTime currentDateTimeUtc()
QDBusError lastError() const const
QDBusConnection sessionBus()
const QDBusMessage & message() const const
void sendErrorReply(QDBusError::ErrorType type, const QString &msg) const const
void setDelayedReply(bool enable) const const
QString message() const const
const_reference at(qsizetype i) const const
iterator begin()
iterator end()
iterator erase(const_iterator begin, const_iterator end)
T & first()
bool isEmpty() const const
qsizetype removeAll(const AT &t)
iterator insert(const Key &key, const T &value)
bool invokeMethod(QObject *context, Functor &&function, FunctorReturnType *ret)
Q_EMITQ_EMIT
QMetaObject::Connection connect(const QObject *sender, PointerToMemberFunction signal, Functor functor)
QVariant property(const char *name) const const
bool setProperty(const char *name, QVariant &&value)
T * data() const const
bool contains(const QSet< T > &other) const const
bool isEmpty() const const
QString fromLocal8Bit(QByteArrayView str)
bool isEmpty() const const
QStringView mid(qsizetype start, qsizetype length) const const
int compare(QChar ch) const const
CaseInsensitive
QFuture< ArgsType< Signal > > connect(Sender *sender, Signal signal)
QThread * currentThread()
void setInterval(int msec)
bool isActive() const const
void setSingleShot(bool singleShot)
void start()
void stop()
void timeout()
QVariant fromValue(T &&value)
T value() const const
Q_D(Todo)
This file is part of the KDE documentation.
Documentation copyright © 1996-2024 The KDE developers.
Generated on Fri Jul 26 2024 11:52:52 by doxygen 1.11.0 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.