Akonadi

itemsync.cpp
1 /*
2  Copyright (c) 2007 Tobias Koenig <[email protected]>
3  Copyright (c) 2007 Volker Krause <[email protected]>
4  Copyright (c) 2014 Christian Mollekopf <[email protected]>
5 
6  This library is free software; you can redistribute it and/or modify it
7  under the terms of the GNU Library General Public License as published by
8  the Free Software Foundation; either version 2 of the License, or (at your
9  option) any later version.
10 
11  This library is distributed in the hope that it will be useful, but WITHOUT
12  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
14  License for more details.
15 
16  You should have received a copy of the GNU Library General Public License
17  along with this library; see the file COPYING.LIB. If not, write to the
18  Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  02110-1301, USA.
20 */
21 
22 #include "itemsync.h"
23 
24 #include "job_p.h"
25 #include "collection.h"
26 #include "item.h"
27 #include "item_p.h"
28 #include "itemcreatejob.h"
29 #include "itemdeletejob.h"
30 #include "itemfetchjob.h"
31 #include "itemmodifyjob.h"
32 #include "transactionsequence.h"
33 #include "itemfetchscope.h"
34 
35 
36 #include "akonadicore_debug.h"
37 
38 
39 using namespace Akonadi;
40 
44 class Akonadi::ItemSyncPrivate : public JobPrivate
45 {
46 public:
47  ItemSyncPrivate(ItemSync *parent)
48  : JobPrivate(parent)
49  , mTransactionMode(ItemSync::SingleTransaction)
50  , mCurrentTransaction(nullptr)
51  , mTransactionJobs(0)
52  , mPendingJobs(0)
53  , mProgress(0)
54  , mTotalItems(-1)
55  , mTotalItemsProcessed(0)
56  , mStreaming(false)
57  , mIncremental(false)
58  , mDeliveryDone(false)
59  , mFinished(false)
60  , mFullListingDone(false)
61  , mProcessingBatch(false)
62  , mDisableAutomaticDeliveryDone(false)
63  , mBatchSize(10)
64  , mMergeMode(Akonadi::ItemSync::RIDMerge)
65  {
66  // we want to fetch all data by default
67  mFetchScope.fetchFullPayload();
68  mFetchScope.fetchAllAttributes();
69  }
70 
71  void createOrMerge(const Item &item);
72  void checkDone();
73  void slotItemsReceived(const Item::List &items);
74  void slotLocalListDone(KJob *job);
75  void slotLocalDeleteDone(KJob *job);
76  void slotLocalChangeDone(KJob *job);
77  void execute();
78  void processItems();
79  void processBatch();
80  void deleteItems(const Item::List &items);
81  void slotTransactionResult(KJob *job);
82  void requestTransaction();
83  Job *subjobParent() const;
84  void fetchLocalItemsToDelete();
85  QString jobDebuggingString() const override;
86  bool allProcessed() const;
87 
88  Q_DECLARE_PUBLIC(ItemSync)
89  Collection mSyncCollection;
90  QSet<QString> mListedItems;
91 
92  ItemSync::TransactionMode mTransactionMode;
93  TransactionSequence *mCurrentTransaction;
94  int mTransactionJobs;
95 
96  // fetch scope for initial item listing
97  ItemFetchScope mFetchScope;
98 
99  Akonadi::Item::List mRemoteItemQueue;
100  Akonadi::Item::List mRemovedRemoteItemQueue;
101  Akonadi::Item::List mCurrentBatchRemoteItems;
102  Akonadi::Item::List mCurrentBatchRemovedRemoteItems;
103  Akonadi::Item::List mItemsToDelete;
104 
105  // create counter
106  int mPendingJobs;
107  int mProgress;
108  int mTotalItems;
109  int mTotalItemsProcessed;
110 
111  bool mStreaming;
112  bool mIncremental;
113  bool mDeliveryDone;
114  bool mFinished;
115  bool mFullListingDone;
116  bool mProcessingBatch;
117  bool mDisableAutomaticDeliveryDone;
118 
119  int mBatchSize;
120  Akonadi::ItemSync::MergeMode mMergeMode;
121 };
122 
123 void ItemSyncPrivate::createOrMerge(const Item &item)
124 {
125  Q_Q(ItemSync);
126  // don't try to do anything in error state
127  if (q->error()) {
128  return;
129  }
130  mPendingJobs++;
131  ItemCreateJob *create = new ItemCreateJob(item, mSyncCollection, subjobParent());
133  if (mMergeMode == ItemSync::GIDMerge && !item.gid().isEmpty()) {
134  merge |= ItemCreateJob::GID;
135  } else {
136  merge |= ItemCreateJob::RID;
137  }
138  create->setMerge(merge);
139  q->connect(create, &ItemCreateJob::result, q, [this](KJob *job) {slotLocalChangeDone(job);});
140 }
141 
142 bool ItemSyncPrivate::allProcessed() const
143 {
144  return mDeliveryDone && mCurrentBatchRemoteItems.isEmpty() && mRemoteItemQueue.isEmpty() && mRemovedRemoteItemQueue.isEmpty() && mCurrentBatchRemovedRemoteItems.isEmpty();
145 }
146 
147 void ItemSyncPrivate::checkDone()
148 {
149  Q_Q(ItemSync);
150  q->setProcessedAmount(KJob::Bytes, mProgress);
151  if (mPendingJobs > 0) {
152  return;
153  }
154 
155  if (mTransactionJobs > 0) {
156  //Commit the current transaction if we're in batch processing mode or done
157  //and wait until the transaction is committed to process the next batch
158  if (mTransactionMode == ItemSync::MultipleTransactions || (mDeliveryDone && mRemoteItemQueue.isEmpty())) {
159  if (mCurrentTransaction) {
160  // Note that mCurrentTransaction->commit() is a no-op if we're already rolling back
161  // so this signal is a bit misleading (but it's only used by unittests it seems)
162  Q_EMIT q->transactionCommitted();
163  mCurrentTransaction->commit();
164  mCurrentTransaction = nullptr;
165  }
166  return;
167  }
168  }
169  mProcessingBatch = false;
170 
171  if (q->error() == Job::UserCanceled && mTransactionJobs == 0 && !mFinished) {
172  qCDebug(AKONADICORE_LOG) << "ItemSync of collection" << mSyncCollection.id() << "finished due to user cancelling";
173  mFinished = true;
174  q->emitResult();
175  return;
176  }
177 
178  if (!mRemoteItemQueue.isEmpty()) {
179  execute();
180  //We don't have enough items, request more
181  if (!mProcessingBatch) {
182  Q_EMIT q->readyForNextBatch(mBatchSize - mRemoteItemQueue.size());
183  }
184  return;
185  }
186  Q_EMIT q->readyForNextBatch(mBatchSize);
187 
188  if (allProcessed() && !mFinished) {
189  // prevent double result emission, can happen since checkDone() is called from all over the place
190  qCDebug(AKONADICORE_LOG) << "ItemSync of collection" << mSyncCollection.id() << "finished";
191  mFinished = true;
192  q->emitResult();
193  }
194 }
195 
196 ItemSync::ItemSync(const Collection &collection, QObject *parent)
197  : Job(new ItemSyncPrivate(this), parent)
198 {
199  Q_D(ItemSync);
200  d->mSyncCollection = collection;
201 }
202 
204 {
205 }
206 
207 void ItemSync::setFullSyncItems(const Item::List &items)
208 {
209  /*
210  * We received a list of items from the server:
211  * * fetch all local id's + rid's only
212  * * check each full sync item whether it's locally available
213  * * if it is modify the item
214  * * if it's not create it
215  * * delete all superfluous items
216  */
217  Q_D(ItemSync);
218  Q_ASSERT(!d->mIncremental);
219  if (!d->mStreaming) {
220  d->mDeliveryDone = true;
221  }
222  d->mRemoteItemQueue += items;
223  d->mTotalItemsProcessed += items.count();
224  qCDebug(AKONADICORE_LOG) << "Received batch: " << items.count()
225  << "Already processed: " << d->mTotalItemsProcessed
226  << "Expected total amount: " << d->mTotalItems;
227  if (!d->mDisableAutomaticDeliveryDone && (d->mTotalItemsProcessed == d->mTotalItems)) {
228  d->mDeliveryDone = true;
229  }
230  d->execute();
231 }
232 
233 void ItemSync::setTotalItems(int amount)
234 {
235  Q_D(ItemSync);
236  Q_ASSERT(!d->mIncremental);
237  Q_ASSERT(amount >= 0);
238  setStreamingEnabled(true);
239  qCDebug(AKONADICORE_LOG) << "Expected total amount:" << amount;
240  d->mTotalItems = amount;
241  setTotalAmount(KJob::Bytes, amount);
242  if (!d->mDisableAutomaticDeliveryDone && (d->mTotalItems == 0)) {
243  d->mDeliveryDone = true;
244  d->execute();
245  }
246 }
247 
249 {
250  Q_D(ItemSync);
251  d->mDisableAutomaticDeliveryDone = disable;
252 }
253 
254 void ItemSync::setIncrementalSyncItems(const Item::List &changedItems, const Item::List &removedItems)
255 {
256  /*
257  * We received an incremental listing of items:
258  * * for each changed item:
259  * ** If locally available => modify
260  * ** else => create
261  * * removed items can be removed right away
262  */
263  Q_D(ItemSync);
264  d->mIncremental = true;
265  if (!d->mStreaming) {
266  d->mDeliveryDone = true;
267  }
268  d->mRemoteItemQueue += changedItems;
269  d->mRemovedRemoteItemQueue += removedItems;
270  d->mTotalItemsProcessed += changedItems.count() + removedItems.count();
271  qCDebug(AKONADICORE_LOG) << "Received: " << changedItems.count() << "Removed: " << removedItems.count() << "In total: " << d->mTotalItemsProcessed << " Wanted: " << d->mTotalItems;
272  if (!d->mDisableAutomaticDeliveryDone && (d->mTotalItemsProcessed == d->mTotalItems)) {
273  d->mDeliveryDone = true;
274  }
275  d->execute();
276 }
277 
279 {
280  Q_D(ItemSync);
281  d->mFetchScope = fetchScope;
282 }
283 
285 {
286  Q_D(ItemSync);
287  return d->mFetchScope;
288 }
289 
291 {
292 }
293 
294 void ItemSyncPrivate::fetchLocalItemsToDelete()
295 {
296  Q_Q(ItemSync);
297  if (mIncremental) {
298  qFatal("This must not be called while in incremental mode");
299  return;
300  }
301  ItemFetchJob *job = new ItemFetchJob(mSyncCollection, subjobParent());
303  job->fetchScope().setFetchModificationTime(false);
305  // we only can fetch parts already in the cache, otherwise this will deadlock
306  job->fetchScope().setCacheOnly(true);
307 
308  QObject::connect(job, &ItemFetchJob::itemsReceived, q, [this](const Akonadi::Item::List &lst) { slotItemsReceived(lst); });
309  QObject::connect(job, &ItemFetchJob::result, q, [this](KJob *job) { slotLocalListDone(job); });
310  mPendingJobs++;
311 }
312 
313 void ItemSyncPrivate::slotItemsReceived(const Item::List &items)
314 {
315  for (const Akonadi::Item &item : items) {
316  //Don't delete items that have not yet been synchronized
317  if (item.remoteId().isEmpty()) {
318  continue;
319  }
320  if (!mListedItems.contains(item.remoteId())) {
321  mItemsToDelete << Item(item.id());
322  }
323  }
324 }
325 
326 void ItemSyncPrivate::slotLocalListDone(KJob *job)
327 {
328  mPendingJobs--;
329  if (job->error()) {
330  qCWarning(AKONADICORE_LOG) << job->errorString();
331  }
332  deleteItems(mItemsToDelete);
333  checkDone();
334 }
335 
336 QString ItemSyncPrivate::jobDebuggingString() const
337 {
338  // TODO: also print out mIncremental and mTotalItemsProcessed, but they are set after the job
339  // started, so this requires passing jobDebuggingString to jobEnded().
340  return QStringLiteral("Collection %1 (%2)").arg(mSyncCollection.id()).arg(mSyncCollection.name());
341 }
342 
343 void ItemSyncPrivate::execute()
344 {
345  //shouldn't happen
346  if (mFinished) {
347  qCWarning(AKONADICORE_LOG) << "Call to execute() on finished job.";
348  Q_ASSERT(false);
349  return;
350  }
351  //not doing anything, start processing
352  if (!mProcessingBatch) {
353  if (mRemoteItemQueue.size() >= mBatchSize || mDeliveryDone) {
354  //we have a new batch to process
355  const int num = qMin(mBatchSize, mRemoteItemQueue.size());
356  mCurrentBatchRemoteItems.reserve(mBatchSize);
357  std::move(mRemoteItemQueue.begin(), mRemoteItemQueue.begin() + num, std::back_inserter(mCurrentBatchRemoteItems));
358  mRemoteItemQueue.erase(mRemoteItemQueue.begin(), mRemoteItemQueue.begin() + num);
359 
360  mCurrentBatchRemovedRemoteItems += mRemovedRemoteItemQueue;
361  mRemovedRemoteItemQueue.clear();
362  } else {
363  //nothing to do, let's wait for more data
364  return;
365  }
366  mProcessingBatch = true;
367  processBatch();
368  return;
369  }
370  checkDone();
371 }
372 
373 //process the current batch of items
374 void ItemSyncPrivate::processBatch()
375 {
376  Q_Q(ItemSync);
377  if (mCurrentBatchRemoteItems.isEmpty() && !mDeliveryDone) {
378  return;
379  }
380  if (q->error() == Job::UserCanceled) {
381  checkDone();
382  return;
383  }
384 
385  //request a transaction, there are items that require processing
386  requestTransaction();
387 
388  processItems();
389 
390  // removed
391  if (!mIncremental && allProcessed()) {
392  //the full listing is done and we know which items to remove
393  fetchLocalItemsToDelete();
394  } else {
395  deleteItems(mCurrentBatchRemovedRemoteItems);
396  mCurrentBatchRemovedRemoteItems.clear();
397  }
398 
399  checkDone();
400 }
401 
402 void ItemSyncPrivate::processItems()
403 {
404  // added / updated
405  for (const Item &remoteItem : qAsConst(mCurrentBatchRemoteItems)) {
406  if (remoteItem.remoteId().isEmpty()) {
407  qCWarning(AKONADICORE_LOG) << "Item " << remoteItem.id() << " does not have a remote identifier";
408  continue;
409  }
410  if (!mIncremental) {
411  mListedItems << remoteItem.remoteId();
412  }
413  createOrMerge(remoteItem);
414  }
415  mCurrentBatchRemoteItems.clear();
416 }
417 
418 void ItemSyncPrivate::deleteItems(const Item::List &itemsToDelete)
419 {
420  Q_Q(ItemSync);
421  // if in error state, better not change anything anymore
422  if (q->error()) {
423  return;
424  }
425 
426  if (itemsToDelete.isEmpty()) {
427  return;
428  }
429 
430  mPendingJobs++;
431  ItemDeleteJob *job = new ItemDeleteJob(itemsToDelete, subjobParent());
432  q->connect(job, &ItemDeleteJob::result, q, [this](KJob *job) { slotLocalDeleteDone(job); });
433 
434  // It can happen that the groupware servers report us deleted items
435  // twice, in this case this item delete job will fail on the second try.
436  // To avoid a rollback of the complete transaction we gracefully allow the job
437  // to fail :)
438  TransactionSequence *transaction = qobject_cast<TransactionSequence *>(subjobParent());
439  if (transaction) {
440  transaction->setIgnoreJobFailure(job);
441  }
442 }
443 
444 void ItemSyncPrivate::slotLocalDeleteDone(KJob *job)
445 {
446  if (job->error()) {
447  qCWarning(AKONADICORE_LOG) << "Deleting items from the akonadi database failed:" << job->errorString();
448  }
449  mPendingJobs--;
450  mProgress++;
451 
452  checkDone();
453 }
454 
455 void ItemSyncPrivate::slotLocalChangeDone(KJob *job)
456 {
457  if (job->error() && job->error() != Job::KilledJobError) {
458  qCWarning(AKONADICORE_LOG) << "Creating/updating items from the akonadi database failed:" << job->errorString();
459  mRemoteItemQueue.clear(); // don't try to process any more items after a rollback
460  }
461  mPendingJobs--;
462  mProgress++;
463 
464  checkDone();
465 }
466 
467 void ItemSyncPrivate::slotTransactionResult(KJob *job)
468 {
469  --mTransactionJobs;
470  if (mCurrentTransaction == job) {
471  mCurrentTransaction = nullptr;
472  }
473 
474  checkDone();
475 }
476 
477 void ItemSyncPrivate::requestTransaction()
478 {
479  Q_Q(ItemSync);
480  //we never want parallel transactions, single transaction just makes one big transaction, and multi transaction uses multiple transaction sequentially
481  if (!mCurrentTransaction) {
482  ++mTransactionJobs;
483  mCurrentTransaction = new TransactionSequence(q);
484  mCurrentTransaction->setAutomaticCommittingEnabled(false);
485  QObject::connect(mCurrentTransaction, &TransactionSequence::result, q, [this](KJob *job) { slotTransactionResult(job); });
486  }
487 }
488 
489 Job *ItemSyncPrivate::subjobParent() const
490 {
491  Q_Q(const ItemSync);
492  if (mCurrentTransaction && mTransactionMode != ItemSync::NoTransaction) {
493  return mCurrentTransaction;
494  }
495  return const_cast<ItemSync *>(q);
496 }
497 
499 {
500  Q_D(ItemSync);
501  d->mStreaming = enable;
502 }
503 
505 {
506  Q_D(ItemSync);
507  Q_ASSERT(d->mStreaming);
508  d->mDeliveryDone = true;
509  d->execute();
510 }
511 
512 void ItemSync::slotResult(KJob *job)
513 {
514  if (job->error()) {
515  qCWarning(AKONADICORE_LOG) << "Error during ItemSync: " << job->errorString();
516  // pretend there were no errors
518  // propagate the first error we got but continue, we might still be fed with stuff from a resource
519  if (!error()) {
520  setError(job->error());
521  setErrorText(job->errorText());
522  }
523  } else {
524  Akonadi::Job::slotResult(job);
525  }
526 }
527 
529 {
530  Q_D(ItemSync);
531  qCDebug(AKONADICORE_LOG) << "The item sync is being rolled-back.";
533  if (d->mCurrentTransaction) {
534  d->mCurrentTransaction->rollback();
535  }
536  d->mDeliveryDone = true; // user won't deliver more data
537  d->execute(); // end this in an ordered way, since we have an error set no real change will be done
538 }
539 
541 {
542  Q_D(ItemSync);
543  d->mTransactionMode = mode;
544 }
545 
547 {
548  Q_D(const ItemSync);
549  return d->mBatchSize;
550 }
551 
553 {
554  Q_D(ItemSync);
555  d->mBatchSize = size;
556 }
557 
558 ItemSync::MergeMode ItemSync::mergeMode() const
559 {
560  Q_D(const ItemSync);
561  return d->mMergeMode;
562 }
563 
565 {
566  Q_D(ItemSync);
567  d->mMergeMode = mergeMode;
568 }
569 
570 #include "moc_itemsync.cpp"
void itemsReceived(const Akonadi::Item::List &items)
This signal is emitted whenever new items have been fetched completely.
void setFullSyncItems(const Item::List &items)
Sets the full item list for the collection.
Definition: itemsync.cpp:207
void emitResult()
void setMergeMode(MergeMode mergeMode)
Set what merge method should be used for next ItemSync run.
Definition: itemsync.cpp:564
Use no transaction at all, provides highest responsiveness (might therefore feel faster even when act...
Definition: itemsync.h:172
void setIncrementalSyncItems(const Item::List &changedItems, const Item::List &removedItems)
Sets the item lists for incrementally syncing the collection.
Definition: itemsync.cpp:254
int batchSize() const
Minimum number of items required to start processing in streaming mode.
Definition: itemsync.cpp:546
virtual QString errorString() const
Represents a collection of PIM items.
Definition: collection.h:76
void setError(int errorCode)
ItemSync(const Collection &collection, QObject *parent=nullptr)
Creates a new item synchronizer.
Definition: itemsync.cpp:196
void setDeliveryOption(DeliveryOptions options)
Sets the mechanisms by which the items should be fetched.
Base class for all actions in the Akonadi storage.
Definition: job.h:93
ItemFetchScope & fetchScope()
Returns the item fetch scope.
Definition: itemsync.cpp:284
void setDisableAutomaticDeliveryDone(bool disable)
Disables the automatic completion of the item sync, based on the number of delivered items...
Definition: itemsync.cpp:248
void setAutomaticCommittingEnabled(bool enable)
Disable automatic committing.
emitted via signal upon reception
Definition: itemfetchjob.h:218
bool removeSubjob(KJob *job) override
Removes the given subjob of this job.
Definition: job.cpp:375
Syncs between items known to a client (usually a resource) and the Akonadi storage.
Definition: itemsync.h:54
void clear()
Job that deletes items from the Akonadi storage.
Definition: itemdeletejob.h:62
ItemFetchScope & fetchScope()
Returns the item fetch scope.
void setErrorText(const QString &errorText)
void setFetchScope(ItemFetchScope &fetchScope)
Sets the item fetch scope.
Definition: itemsync.cpp:278
void setFetchRemoteIdentification(bool retrieveRid)
Fetch remote identification for items.
void setIgnoreJobFailure(KJob *job)
Sets which job of the sequence might fail without rolling back the complete transaction.
void deliveryDone()
Notify ItemSync that all remote items have been delivered.
Definition: itemsync.cpp:504
void commit()
Commits the transaction as soon as all pending sub-jobs finished successfully.
Job that creates a new item in the Akonadi storage.
Definition: itemcreatejob.h:75
KDEGAMES_EXPORT QAction * create(StandardGameAction id, const QObject *recvr, const char *slot, QObject *parent)
Specifies which parts of an item should be fetched from the Akonadi storage.
Use one transaction per chunk of delivered items, good compromise between the other two when using st...
Definition: itemsync.h:171
void setTotalItems(int amount)
Set the amount of items which you are going to return in total by using the setFullSyncItems()/setInc...
Definition: itemsync.cpp:233
void setStreamingEnabled(bool enable)
Enable item streaming.
Definition: itemsync.cpp:498
void setBatchSize(int)
Set the batch size.
Definition: itemsync.cpp:552
TransactionMode
Transaction mode used by ItemSync.
Definition: itemsync.h:169
~ItemSync() override
Destroys the item synchronizer.
Definition: itemsync.cpp:203
Base class for jobs that need to run a sequence of sub-jobs in a transaction.
void rollback()
Aborts the sync process and rolls back all not yet committed transactions.
Definition: itemsync.cpp:528
void setFetchModificationTime(bool retrieveMtime)
Enables retrieval of the item modification time.
Helper integration between Akonadi and Qt.
void setTotalAmount(Unit unit, qulonglong amount)
QString arg(qlonglong a, int fieldWidth, int base, QChar fillChar) const const
Job that fetches items from the Akonadi storage.
Definition: itemfetchjob.h:84
MergeMode mergeMode() const
Returns current merge mode.
Definition: itemsync.cpp:558
Merge by remote id.
void result(KJob *job)
The user canceled this job.
Definition: job.h:114
void setTransactionMode(TransactionMode mode)
Set the transaction mode to use for this sync.
Definition: itemsync.cpp:540
QMetaObject::Connection connect(const QObject *sender, const char *signal, const QObject *receiver, const char *method, Qt::ConnectionType type)
T qobject_cast(QObject *object)
QVariant merge(const QVariant &lhs, const QVariant &rhs)
void doStart() override
This method must be reimplemented in the concrete jobs.
Definition: itemsync.cpp:290
Only return the id of the merged/created item.
QString errorText() const
int error() const
void setMerge(MergeOptions options)
Merge this item into an existing one if available.
void setCacheOnly(bool cacheOnly)
Sets whether payload data should be requested from remote sources or just from the local cache...
This file is part of the KDE documentation.
Documentation copyright © 1996-2020 The KDE developers.
Generated on Fri Jun 5 2020 23:08:55 by doxygen 1.8.11 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.