Akonadi

itemsync.cpp
1 /*
2  SPDX-FileCopyrightText: 2007 Tobias Koenig <[email protected]>
3  SPDX-FileCopyrightText: 2007 Volker Krause <[email protected]>
4  SPDX-FileCopyrightText: 2014 Christian Mollekopf <[email protected]>
5 
6  SPDX-License-Identifier: LGPL-2.0-or-later
7 */
8 
9 #include "itemsync.h"
10 
11 #include "collection.h"
12 #include "item_p.h"
13 #include "itemcreatejob.h"
14 #include "itemdeletejob.h"
15 #include "itemfetchjob.h"
16 #include "itemfetchscope.h"
17 #include "itemmodifyjob.h"
18 #include "job_p.h"
19 #include "transactionsequence.h"
20 
21 #include "akonadicore_debug.h"
22 
23 using namespace Akonadi;
24 
25 /**
26  * @internal
27  */
28 class Akonadi::ItemSyncPrivate : public JobPrivate
29 {
30 public:
31  explicit ItemSyncPrivate(ItemSync *parent)
32  : JobPrivate(parent)
33  , mTransactionMode(ItemSync::SingleTransaction)
34  , mCurrentTransaction(nullptr)
35  , mTransactionJobs(0)
36  , mPendingJobs(0)
37  , mProgress(0)
38  , mTotalItems(-1)
39  , mTotalItemsProcessed(0)
40  , mStreaming(false)
41  , mIncremental(false)
42  , mDeliveryDone(false)
43  , mFinished(false)
44  , mFullListingDone(false)
45  , mProcessingBatch(false)
46  , mDisableAutomaticDeliveryDone(false)
47  , mBatchSize(10)
48  , mMergeMode(Akonadi::ItemSync::RIDMerge)
49  {
50  }
51 
52  void createOrMerge(const Item &item);
53  void checkDone();
54  void slotItemsReceived(const Item::List &items);
55  void slotLocalListDone(KJob *job);
56  void slotLocalDeleteDone(KJob *job);
57  void slotLocalChangeDone(KJob *job);
58  void execute();
59  void processItems();
60  void processBatch();
61  void deleteItems(const Item::List &items);
62  void slotTransactionResult(KJob *job);
63  void requestTransaction();
64  Job *subjobParent() const;
65  void fetchLocalItemsToDelete();
66  QString jobDebuggingString() const override;
67  bool allProcessed() const;
68 
69  Q_DECLARE_PUBLIC(ItemSync)
70  Collection mSyncCollection;
71  QSet<QString> mListedItems;
72 
73  ItemSync::TransactionMode mTransactionMode;
74  TransactionSequence *mCurrentTransaction = nullptr;
75  int mTransactionJobs;
76 
77  Akonadi::Item::List mRemoteItemQueue;
78  Akonadi::Item::List mRemovedRemoteItemQueue;
79  Akonadi::Item::List mCurrentBatchRemoteItems;
80  Akonadi::Item::List mCurrentBatchRemovedRemoteItems;
81  Akonadi::Item::List mItemsToDelete;
82 
83  // create counter
84  int mPendingJobs;
85  int mProgress;
86  int mTotalItems;
87  int mTotalItemsProcessed;
88 
89  bool mStreaming;
90  bool mIncremental;
91  bool mDeliveryDone;
92  bool mFinished;
93  bool mFullListingDone;
94  bool mProcessingBatch;
95  bool mDisableAutomaticDeliveryDone;
96 
97  int mBatchSize;
98  Akonadi::ItemSync::MergeMode mMergeMode;
99 };
100 
101 void ItemSyncPrivate::createOrMerge(const Item &item)
102 {
103  Q_Q(ItemSync);
104  // don't try to do anything in error state
105  if (q->error()) {
106  return;
107  }
108  mPendingJobs++;
109  auto create = new ItemCreateJob(item, mSyncCollection, subjobParent());
111  if (mMergeMode == ItemSync::GIDMerge && !item.gid().isEmpty()) {
113  } else {
115  }
116  create->setMerge(merge);
117  q->connect(create, &ItemCreateJob::result, q, [this](KJob *job) {
118  slotLocalChangeDone(job);
119  });
120 }
121 
122 bool ItemSyncPrivate::allProcessed() const
123 {
124  return mDeliveryDone && mCurrentBatchRemoteItems.isEmpty() && mRemoteItemQueue.isEmpty() && mRemovedRemoteItemQueue.isEmpty()
125  && mCurrentBatchRemovedRemoteItems.isEmpty();
126 }
127 
128 void ItemSyncPrivate::checkDone()
129 {
130  Q_Q(ItemSync);
131  q->setProcessedAmount(KJob::Bytes, mProgress);
132  if (mPendingJobs > 0) {
133  return;
134  }
135 
136  if (mTransactionJobs > 0) {
137  // Commit the current transaction if we're in batch processing mode or done
138  // and wait until the transaction is committed to process the next batch
139  if (mTransactionMode == ItemSync::MultipleTransactions || (mDeliveryDone && mRemoteItemQueue.isEmpty())) {
140  if (mCurrentTransaction) {
141  // Note that mCurrentTransaction->commit() is a no-op if we're already rolling back
142  // so this signal is a bit misleading (but it's only used by unittests it seems)
143  Q_EMIT q->transactionCommitted();
144  mCurrentTransaction->commit();
145  mCurrentTransaction = nullptr;
146  }
147  return;
148  }
149  }
150  mProcessingBatch = false;
151 
152  if (q->error() == Job::UserCanceled && mTransactionJobs == 0 && !mFinished) {
153  qCDebug(AKONADICORE_LOG) << "ItemSync of collection" << mSyncCollection.id() << "finished due to user cancelling";
154  mFinished = true;
155  q->emitResult();
156  return;
157  }
158 
159  if (!mRemoteItemQueue.isEmpty()) {
160  execute();
161  // We don't have enough items, request more
162  if (!mProcessingBatch) {
163  Q_EMIT q->readyForNextBatch(mBatchSize - mRemoteItemQueue.size());
164  }
165  return;
166  }
167  Q_EMIT q->readyForNextBatch(mBatchSize);
168 
169  if (allProcessed() && !mFinished) {
170  // prevent double result emission, can happen since checkDone() is called from all over the place
171  qCDebug(AKONADICORE_LOG) << "ItemSync of collection" << mSyncCollection.id() << "finished";
172  mFinished = true;
173  q->emitResult();
174  }
175 }
176 
177 ItemSync::ItemSync(const Collection &collection, QObject *parent)
178  : Job(new ItemSyncPrivate(this), parent)
179 {
180  Q_D(ItemSync);
181  d->mSyncCollection = collection;
182 }
183 
185 {
186 }
187 
189 {
190  /*
191  * We received a list of items from the server:
192  * * fetch all local id's + rid's only
193  * * check each full sync item whether it's locally available
194  * * if it is modify the item
195  * * if it's not create it
196  * * delete all superfluous items
197  */
198  Q_D(ItemSync);
199  Q_ASSERT(!d->mIncremental);
200  if (!d->mStreaming) {
201  d->mDeliveryDone = true;
202  }
203  d->mRemoteItemQueue += items;
204  d->mTotalItemsProcessed += items.count();
205  qCDebug(AKONADICORE_LOG) << "Received batch: " << items.count() << "Already processed: " << d->mTotalItemsProcessed
206  << "Expected total amount: " << d->mTotalItems;
207  if (!d->mDisableAutomaticDeliveryDone && (d->mTotalItemsProcessed == d->mTotalItems)) {
208  d->mDeliveryDone = true;
209  }
210  d->execute();
211 }
212 
213 void ItemSync::setTotalItems(int amount)
214 {
215  Q_D(ItemSync);
216  Q_ASSERT(!d->mIncremental);
217  Q_ASSERT(amount >= 0);
218  setStreamingEnabled(true);
219  qCDebug(AKONADICORE_LOG) << "Expected total amount:" << amount;
220  d->mTotalItems = amount;
221  setTotalAmount(KJob::Bytes, amount);
222  if (!d->mDisableAutomaticDeliveryDone && (d->mTotalItems == 0)) {
223  d->mDeliveryDone = true;
224  d->execute();
225  }
226 }
227 
229 {
230  Q_D(ItemSync);
231  d->mDisableAutomaticDeliveryDone = disable;
232 }
233 
234 void ItemSync::setIncrementalSyncItems(const Item::List &changedItems, const Item::List &removedItems)
235 {
236  /*
237  * We received an incremental listing of items:
238  * * for each changed item:
239  * ** If locally available => modify
240  * ** else => create
241  * * removed items can be removed right away
242  */
243  Q_D(ItemSync);
244  d->mIncremental = true;
245  if (!d->mStreaming) {
246  d->mDeliveryDone = true;
247  }
248  d->mRemoteItemQueue += changedItems;
249  d->mRemovedRemoteItemQueue += removedItems;
250  d->mTotalItemsProcessed += changedItems.count() + removedItems.count();
251  qCDebug(AKONADICORE_LOG) << "Received: " << changedItems.count() << "Removed: " << removedItems.count() << "In total: " << d->mTotalItemsProcessed
252  << " Wanted: " << d->mTotalItems;
253  if (!d->mDisableAutomaticDeliveryDone && (d->mTotalItemsProcessed == d->mTotalItems)) {
254  d->mDeliveryDone = true;
255  }
256  d->execute();
257 }
258 
260 {
261 }
262 
263 void ItemSyncPrivate::fetchLocalItemsToDelete()
264 {
265  Q_Q(ItemSync);
266  if (mIncremental) {
267  qFatal("This must not be called while in incremental mode");
268  return;
269  }
270  auto job = new ItemFetchJob(mSyncCollection, subjobParent());
271  job->fetchScope().setFetchRemoteIdentification(true);
272  job->fetchScope().setFetchModificationTime(false);
273  job->setDeliveryOption(ItemFetchJob::EmitItemsIndividually);
274  // we only can fetch parts already in the cache, otherwise this will deadlock
275  job->fetchScope().setCacheOnly(true);
276 
278  slotItemsReceived(lst);
279  });
280  QObject::connect(job, &ItemFetchJob::result, q, [this](KJob *job) {
281  slotLocalListDone(job);
282  });
283  mPendingJobs++;
284 }
285 
286 void ItemSyncPrivate::slotItemsReceived(const Item::List &items)
287 {
288  for (const Akonadi::Item &item : items) {
289  // Don't delete items that have not yet been synchronized
290  if (item.remoteId().isEmpty()) {
291  continue;
292  }
293  if (!mListedItems.contains(item.remoteId())) {
294  mItemsToDelete << Item(item.id());
295  }
296  }
297 }
298 
299 void ItemSyncPrivate::slotLocalListDone(KJob *job)
300 {
301  mPendingJobs--;
302  if (job->error()) {
303  qCWarning(AKONADICORE_LOG) << job->errorString();
304  }
305  deleteItems(mItemsToDelete);
306  checkDone();
307 }
308 
309 QString ItemSyncPrivate::jobDebuggingString() const
310 {
311  // TODO: also print out mIncremental and mTotalItemsProcessed, but they are set after the job
312  // started, so this requires passing jobDebuggingString to jobEnded().
313  return QStringLiteral("Collection %1 (%2)").arg(mSyncCollection.id()).arg(mSyncCollection.name());
314 }
315 
316 void ItemSyncPrivate::execute()
317 {
318  // shouldn't happen
319  if (mFinished) {
320  qCWarning(AKONADICORE_LOG) << "Call to execute() on finished job.";
321  Q_ASSERT(false);
322  return;
323  }
324  // not doing anything, start processing
325  if (!mProcessingBatch) {
326  if (mRemoteItemQueue.size() >= mBatchSize || mDeliveryDone) {
327  // we have a new batch to process
328  const int num = qMin(mBatchSize, mRemoteItemQueue.size());
329  mCurrentBatchRemoteItems.reserve(mBatchSize);
330  std::move(mRemoteItemQueue.begin(), mRemoteItemQueue.begin() + num, std::back_inserter(mCurrentBatchRemoteItems));
331  mRemoteItemQueue.erase(mRemoteItemQueue.begin(), mRemoteItemQueue.begin() + num);
332 
333  mCurrentBatchRemovedRemoteItems += mRemovedRemoteItemQueue;
334  mRemovedRemoteItemQueue.clear();
335  } else {
336  // nothing to do, let's wait for more data
337  return;
338  }
339  mProcessingBatch = true;
340  processBatch();
341  return;
342  }
343  checkDone();
344 }
345 
346 // process the current batch of items
347 void ItemSyncPrivate::processBatch()
348 {
349  Q_Q(ItemSync);
350  if (mCurrentBatchRemoteItems.isEmpty() && !mDeliveryDone) {
351  return;
352  }
353  if (q->error() == Job::UserCanceled) {
354  checkDone();
355  return;
356  }
357 
358  // request a transaction, there are items that require processing
359  requestTransaction();
360 
361  processItems();
362 
363  // removed
364  if (!mIncremental && allProcessed()) {
365  // the full listing is done and we know which items to remove
366  fetchLocalItemsToDelete();
367  } else {
368  deleteItems(mCurrentBatchRemovedRemoteItems);
369  mCurrentBatchRemovedRemoteItems.clear();
370  }
371 
372  checkDone();
373 }
374 
375 void ItemSyncPrivate::processItems()
376 {
377  // added / updated
378  for (const Item &remoteItem : std::as_const(mCurrentBatchRemoteItems)) {
379  if (remoteItem.remoteId().isEmpty()) {
380  qCWarning(AKONADICORE_LOG) << "Item " << remoteItem.id() << " does not have a remote identifier";
381  continue;
382  }
383  if (!mIncremental) {
384  mListedItems << remoteItem.remoteId();
385  }
386  createOrMerge(remoteItem);
387  }
388  mCurrentBatchRemoteItems.clear();
389 }
390 
391 void ItemSyncPrivate::deleteItems(const Item::List &itemsToDelete)
392 {
393  Q_Q(ItemSync);
394  // if in error state, better not change anything anymore
395  if (q->error()) {
396  return;
397  }
398 
399  if (itemsToDelete.isEmpty()) {
400  return;
401  }
402 
403  mPendingJobs++;
404  auto job = new ItemDeleteJob(itemsToDelete, subjobParent());
405  q->connect(job, &ItemDeleteJob::result, q, [this](KJob *job) {
406  slotLocalDeleteDone(job);
407  });
408 
409  // It can happen that the groupware servers report us deleted items
410  // twice, in this case this item delete job will fail on the second try.
411  // To avoid a rollback of the complete transaction we gracefully allow the job
412  // to fail :)
413  auto transaction = qobject_cast<TransactionSequence *>(subjobParent());
414  if (transaction) {
415  transaction->setIgnoreJobFailure(job);
416  }
417 }
418 
419 void ItemSyncPrivate::slotLocalDeleteDone(KJob *job)
420 {
421  if (job->error()) {
422  qCWarning(AKONADICORE_LOG) << "Deleting items from the akonadi database failed:" << job->errorString();
423  }
424  mPendingJobs--;
425  mProgress++;
426 
427  checkDone();
428 }
429 
430 void ItemSyncPrivate::slotLocalChangeDone(KJob *job)
431 {
432  if (job->error() && job->error() != Job::KilledJobError) {
433  qCWarning(AKONADICORE_LOG) << "Creating/updating items from the akonadi database failed:" << job->errorString();
434  mRemoteItemQueue.clear(); // don't try to process any more items after a rollback
435  }
436  mPendingJobs--;
437  mProgress++;
438 
439  checkDone();
440 }
441 
442 void ItemSyncPrivate::slotTransactionResult(KJob *job)
443 {
444  --mTransactionJobs;
445  if (mCurrentTransaction == job) {
446  mCurrentTransaction = nullptr;
447  }
448 
449  checkDone();
450 }
451 
452 void ItemSyncPrivate::requestTransaction()
453 {
454  Q_Q(ItemSync);
455  // we never want parallel transactions, single transaction just makes one big transaction, and multi transaction uses multiple transaction sequentially
456  if (!mCurrentTransaction) {
457  ++mTransactionJobs;
458  mCurrentTransaction = new TransactionSequence(q);
459  mCurrentTransaction->setAutomaticCommittingEnabled(false);
460  QObject::connect(mCurrentTransaction, &TransactionSequence::result, q, [this](KJob *job) {
461  slotTransactionResult(job);
462  });
463  }
464 }
465 
466 Job *ItemSyncPrivate::subjobParent() const
467 {
468  Q_Q(const ItemSync);
469  if (mCurrentTransaction && mTransactionMode != ItemSync::NoTransaction) {
470  return mCurrentTransaction;
471  }
472  return const_cast<ItemSync *>(q);
473 }
474 
476 {
477  Q_D(ItemSync);
478  d->mStreaming = enable;
479 }
480 
482 {
483  Q_D(ItemSync);
484  Q_ASSERT(d->mStreaming);
485  d->mDeliveryDone = true;
486  d->execute();
487 }
488 
489 void ItemSync::slotResult(KJob *job)
490 {
491  if (job->error()) {
492  qCWarning(AKONADICORE_LOG) << "Error during ItemSync: " << job->errorString();
493  // pretend there were no errors
495  // propagate the first error we got but continue, we might still be fed with stuff from a resource
496  if (!error()) {
497  setError(job->error());
498  setErrorText(job->errorText());
499  }
500  } else {
501  Akonadi::Job::slotResult(job);
502  }
503 }
504 
506 {
507  Q_D(ItemSync);
508  qCDebug(AKONADICORE_LOG) << "The item sync is being rolled-back.";
510  if (d->mCurrentTransaction) {
511  d->mCurrentTransaction->rollback();
512  }
513  d->mDeliveryDone = true; // user won't deliver more data
514  d->execute(); // end this in an ordered way, since we have an error set no real change will be done
515 }
516 
518 {
519  Q_D(ItemSync);
520  d->mTransactionMode = mode;
521 }
522 
524 {
525  Q_D(const ItemSync);
526  return d->mBatchSize;
527 }
528 
530 {
531  Q_D(ItemSync);
532  d->mBatchSize = size;
533 }
534 
535 ItemSync::MergeMode ItemSync::mergeMode() const
536 {
537  Q_D(const ItemSync);
538  return d->mMergeMode;
539 }
540 
541 void ItemSync::setMergeMode(MergeMode mergeMode)
542 {
543  Q_D(ItemSync);
544  d->mMergeMode = mergeMode;
545 }
546 
547 #include "moc_itemsync.cpp"
void deliveryDone()
Notify ItemSync that all remote items have been delivered.
Definition: itemsync.cpp:481
void setIncrementalSyncItems(const Item::List &changedItems, const Item::List &removedItems)
Sets the item lists for incrementally syncing the collection.
Definition: itemsync.cpp:234
void itemsReceived(const Akonadi::Item::List &items)
This signal is emitted whenever new items have been fetched completely.
void rollback()
Aborts the sync process and rolls back all not yet committed transactions.
Definition: itemsync.cpp:505
Job that deletes items from the Akonadi storage.
Definition: itemdeletejob.h:47
void setErrorText(const QString &errorText)
void result(KJob *job)
Job that creates a new item in the Akonadi storage.
Definition: itemcreatejob.h:60
@ RID
Merge by remote id.
Definition: itemcreatejob.h:87
void clear()
int batchSize() const
Minimum number of items required to start processing in streaming mode.
Definition: itemsync.cpp:523
@ MultipleTransactions
Use one transaction per chunk of delivered items, good compromise between the other two when using st...
Definition: itemsync.h:128
Represents a collection of PIM items.
Definition: collection.h:61
~ItemSync() override
Destroys the item synchronizer.
Definition: itemsync.cpp:184
@ EmitItemsIndividually
emitted via signal upon reception
Definition: itemfetchjob.h:203
void doStart() override
This method must be reimplemented in the concrete jobs.
Definition: itemsync.cpp:259
QMetaObject::Connection connect(const QObject *sender, const char *signal, const QObject *receiver, const char *method, Qt::ConnectionType type)
QVector< Item > List
Describes a list of items.
Definition: item.h:115
@ GID
Merge by GID.
Definition: itemcreatejob.h:88
bool removeSubjob(KJob *job) override
Removes the given subjob of this job.
Definition: job.cpp:369
TransactionMode
Transaction mode used by ItemSync.
Definition: itemsync.h:125
void setTotalAmount(Unit unit, qulonglong amount)
QString gid() const
Returns the gid of the entity.
Definition: item.cpp:357
bool isEmpty() const const
QString errorText() const
QAction * create(StandardGameAction id, const QObject *recvr, const char *slot, QObject *parent)
Syncs between items known to a client (usually a resource) and the Akonadi storage.
Definition: itemsync.h:38
Base class for all actions in the Akonadi storage.
Definition: job.h:80
QVariant merge(const QVariant &lhs, const QVariant &rhs)
Base class for jobs that need to run a sequence of sub-jobs in a transaction.
@ Silent
Only return the id of the merged/created item.
Definition: itemcreatejob.h:89
Id id() const
Returns the unique identifier of the item.
Definition: item.cpp:63
QString arg(qlonglong a, int fieldWidth, int base, QChar fillChar) const const
ItemSync(const Collection &collection, QObject *parent=nullptr)
Creates a new item synchronizer.
Definition: itemsync.cpp:177
void setBatchSize(int)
Set the batch size.
Definition: itemsync.cpp:529
@ NoTransaction
Use no transaction at all, provides highest responsiveness (might therefore feel faster even when act...
Definition: itemsync.h:129
void setMergeMode(MergeMode mergeMode)
Set what merge method should be used for next ItemSync run.
Definition: itemsync.cpp:541
void setStreamingEnabled(bool enable)
Enable item streaming.
Definition: itemsync.cpp:475
int count(const T &value) const const
Job that fetches items from the Akonadi storage.
Definition: itemfetchjob.h:69
MergeMode mergeMode() const
Returns current merge mode.
Definition: itemsync.cpp:535
virtual QString errorString() const
int error() const
@ UserCanceled
The user canceled this job.
Definition: job.h:101
void setError(int errorCode)
QString remoteId() const
Returns the remote id of the item.
Definition: item.cpp:73
void setTotalItems(int amount)
Set the amount of items which you are going to return in total by using the setFullSyncItems()/setInc...
Definition: itemsync.cpp:213
Represents a PIM item stored in Akonadi storage.
Definition: item.h:104
void setTransactionMode(TransactionMode mode)
Set the transaction mode to use for this sync.
Definition: itemsync.cpp:517
Q_D(Todo)
void setFullSyncItems(const Item::List &items)
Sets the full item list for the collection.
Definition: itemsync.cpp:188
void setDisableAutomaticDeliveryDone(bool disable)
Disables the automatic completion of the item sync, based on the number of delivered items.
Definition: itemsync.cpp:228
Helper integration between Akonadi and Qt.
This file is part of the KDE documentation.
Documentation copyright © 1996-2022 The KDE developers.
Generated on Mon Jun 27 2022 04:01:06 by doxygen 1.8.17 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.