• Skip to content
  • Skip to link menu
KDE API Reference
  • KDE API Reference
  • kdepimlibs API Reference
  • KDE Home
  • Contact Us
 

akonadi

  • sources
  • kde-4.12
  • kdepimlibs
  • akonadi
itemsync.cpp
1 /*
2  Copyright (c) 2007 Tobias Koenig <tokoe@kde.org>
3  Copyright (c) 2007 Volker Krause <vkrause@kde.org>
4 
5  This library is free software; you can redistribute it and/or modify it
6  under the terms of the GNU Library General Public License as published by
7  the Free Software Foundation; either version 2 of the License, or (at your
8  option) any later version.
9 
10  This library is distributed in the hope that it will be useful, but WITHOUT
11  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
13  License for more details.
14 
15  You should have received a copy of the GNU Library General Public License
16  along with this library; see the file COPYING.LIB. If not, write to the
17  Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18  02110-1301, USA.
19 */
20 
21 #include "itemsync.h"
22 
23 #include "collection.h"
24 #include "item.h"
25 #include "item_p.h"
26 #include "itemcreatejob.h"
27 #include "itemdeletejob.h"
28 #include "itemfetchjob.h"
29 #include "itemmodifyjob.h"
30 #include "transactionsequence.h"
31 #include "itemfetchscope.h"
32 
33 #include <kdebug.h>
34 
35 #include <QtCore/QStringList>
36 
37 using namespace Akonadi;
38 
42 class ItemSync::Private
43 {
44  public:
45  Private( ItemSync *parent ) :
46  q( parent ),
47  mTransactionMode( SingleTransaction ),
48  mCurrentTransaction( 0 ),
49  mTransactionJobs( 0 ),
50  mPendingJobs( 0 ),
51  mProgress( 0 ),
52  mTotalItems( -1 ),
53  mTotalItemsProcessed( 0 ),
54  mStreaming( false ),
55  mIncremental( false ),
56  mLocalListDone( false ),
57  mDeliveryDone( false ),
58  mFinished( false )
59  {
60  // we want to fetch all data by default
61  mFetchScope.fetchFullPayload();
62  mFetchScope.fetchAllAttributes();
63  }
64 
65  void createLocalItem( const Item &item );
66  void checkDone();
67  void slotLocalListDone( KJob* );
68  void slotLocalDeleteDone( KJob* );
69  void slotLocalChangeDone( KJob* );
70  void execute();
71  void processItems();
72  void deleteItems( const Item::List &items );
73  void slotTransactionResult( KJob *job );
74  Job* subjobParent() const;
75 
76  ItemSync *q;
77  Collection mSyncCollection;
78  QHash<Item::Id, Akonadi::Item> mLocalItemsById;
79  QHash<QString, Akonadi::Item> mLocalItemsByRemoteId;
80  QSet<Akonadi::Item> mUnprocessedLocalItems;
81 
82  ItemSync::TransactionMode mTransactionMode;
83  TransactionSequence *mCurrentTransaction;
84  int mTransactionJobs;
85 
86  // fetch scope for initial item listing
87  ItemFetchScope mFetchScope;
88 
89  // remote items
90  Akonadi::Item::List mRemoteItems;
91 
92  // removed remote items
93  Item::List mRemovedRemoteItems;
94 
95  // create counter
96  int mPendingJobs;
97  int mProgress;
98  int mTotalItems;
99  int mTotalItemsProcessed;
100 
101  bool mStreaming;
102  bool mIncremental;
103  bool mLocalListDone;
104  bool mDeliveryDone;
105  bool mFinished;
106 };
107 
108 void ItemSync::Private::createLocalItem( const Item & item )
109 {
110  // don't try to do anything in error state
111  if ( q->error() )
112  return;
113  mPendingJobs++;
114  ItemCreateJob *create = new ItemCreateJob( item, mSyncCollection, subjobParent() );
115  q->connect( create, SIGNAL(result(KJob*)), q, SLOT(slotLocalChangeDone(KJob*)) );
116 }
117 
118 void ItemSync::Private::checkDone()
119 {
120  q->setProcessedAmount( KJob::Bytes, mProgress );
121  if ( mPendingJobs > 0 || !mDeliveryDone || mTransactionJobs > 0 )
122  return;
123 
124  if ( !mFinished ) { // prevent double result emission, can happen since checkDone() is called from all over the place
125  mFinished = true;
126  q->emitResult();
127  }
128 }
129 
130 ItemSync::ItemSync( const Collection &collection, QObject *parent ) :
131  Job( parent ),
132  d( new Private( this ) )
133 {
134  d->mSyncCollection = collection;
135 }
136 
137 ItemSync::~ItemSync()
138 {
139  delete d;
140 }
141 
142 void ItemSync::setFullSyncItems( const Item::List &items )
143 {
144  Q_ASSERT( !d->mIncremental );
145  if ( !d->mStreaming )
146  d->mDeliveryDone = true;
147  d->mRemoteItems += items;
148  d->mTotalItemsProcessed += items.count();
149  kDebug() << "Received: " << items.count() << "In total: " << d->mTotalItemsProcessed << " Wanted: " << d->mTotalItems;
150  setTotalAmount( KJob::Bytes, d->mTotalItemsProcessed );
151  if ( d->mTotalItemsProcessed == d->mTotalItems )
152  d->mDeliveryDone = true;
153  d->execute();
154 }
155 
156 void ItemSync::setTotalItems( int amount )
157 {
158  Q_ASSERT( !d->mIncremental );
159  Q_ASSERT( amount >= 0 );
160  setStreamingEnabled( true );
161  kDebug() << amount;
162  d->mTotalItems = amount;
163  setTotalAmount( KJob::Bytes, amount );
164  if ( d->mTotalItems == 0 ) {
165  d->mDeliveryDone = true;
166  d->execute();
167  }
168 }
169 
170 void ItemSync::setIncrementalSyncItems( const Item::List &changedItems, const Item::List &removedItems )
171 {
172  d->mIncremental = true;
173  if ( !d->mStreaming )
174  d->mDeliveryDone = true;
175  d->mRemoteItems += changedItems;
176  d->mRemovedRemoteItems += removedItems;
177  d->mTotalItemsProcessed += changedItems.count() + removedItems.count();
178  setTotalAmount( KJob::Bytes, d->mTotalItemsProcessed );
179  if ( d->mTotalItemsProcessed == d->mTotalItems )
180  d->mDeliveryDone = true;
181  d->execute();
182 }
183 
184 void ItemSync::setFetchScope( ItemFetchScope &fetchScope )
185 {
186  d->mFetchScope = fetchScope;
187 }
188 
189 ItemFetchScope &ItemSync::fetchScope()
190 {
191  return d->mFetchScope;
192 }
193 
194 void ItemSync::doStart()
195 {
196  ItemFetchJob* job = new ItemFetchJob( d->mSyncCollection, this );
197  job->setFetchScope( d->mFetchScope );
198 
199  // we only can fetch parts already in the cache, otherwise this will deadlock
200  job->fetchScope().setCacheOnly( true );
201 
202  connect( job, SIGNAL(result(KJob*)), SLOT(slotLocalListDone(KJob*)) );
203 }
204 
205 bool ItemSync::updateItem( const Item &storedItem, Item &newItem )
206 {
207  // we are in error state, better not change anything at all anymore
208  if ( error() )
209  return false;
210 
211  /*
212  * We know that this item has changed (as it is part of the
213  * incremental changed list), so we just put it into the
214  * storage.
215  */
216  if ( d->mIncremental )
217  return true;
218 
219  if ( newItem.d_func()->mClearPayload )
220  return true;
221 
222  // Check whether the remote revisions differ
223  if ( storedItem.remoteRevision() != newItem.remoteRevision() )
224  return true;
225 
226  // Check whether the flags differ
227  if ( storedItem.flags() != newItem.flags() ) {
228  kDebug() << "Stored flags " << storedItem.flags()
229  << "new flags " << newItem.flags();
230  return true;
231  }
232 
233  // Check whether the new item contains unknown parts
234  QSet<QByteArray> missingParts = newItem.loadedPayloadParts();
235  missingParts.subtract( storedItem.loadedPayloadParts() );
236  if ( !missingParts.isEmpty() )
237  return true;
238 
239  // ### FIXME SLOW!!!
240  // If the available part identifiers don't differ, check
241  // whether the content of the payload differs
242  if ( newItem.hasPayload()
243  && storedItem.payloadData() != newItem.payloadData() )
244  return true;
245 
246  // check if remote attributes have been changed
247  foreach ( Attribute* attr, newItem.attributes() ) {
248  if ( !storedItem.hasAttribute( attr->type() ) )
249  return true;
250  if ( attr->serialized() != storedItem.attribute( attr->type() )->serialized() )
251  return true;
252  }
253 
254  return false;
255 }
256 
257 void ItemSync::Private::slotLocalListDone( KJob * job )
258 {
259  if ( !job->error() ) {
260  const Item::List list = static_cast<ItemFetchJob*>( job )->items();
261  foreach ( const Item &item, list ) {
262  if ( item.remoteId().isEmpty() )
263  continue;
264  mLocalItemsById.insert( item.id(), item );
265  mLocalItemsByRemoteId.insert( item.remoteId(), item );
266  mUnprocessedLocalItems.insert( item );
267  }
268  }
269 
270  mLocalListDone = true;
271  execute();
272 }
273 
274 void ItemSync::Private::execute()
275 {
276  if ( !mLocalListDone )
277  return;
278 
279  // early exit to avoid unnecessary TransactionSequence creation in MultipleTransactions mode
280  // TODO: do the transaction handling in a nicer way instead, only creating TransactionSequences when really needed
281  if ( !mDeliveryDone && mRemoteItems.isEmpty() )
282  return;
283 
284  if ( (mTransactionMode == SingleTransaction && !mCurrentTransaction) || mTransactionMode == MultipleTransactions) {
285  ++mTransactionJobs;
286  mCurrentTransaction = new TransactionSequence( q );
287  mCurrentTransaction->setAutomaticCommittingEnabled( false );
288  connect( mCurrentTransaction, SIGNAL(result(KJob*)), q, SLOT(slotTransactionResult(KJob*)) );
289  }
290 
291  processItems();
292  if ( !mDeliveryDone ) {
293  if ( mTransactionMode == MultipleTransactions && mCurrentTransaction ) {
294  mCurrentTransaction->commit();
295  mCurrentTransaction = 0;
296  }
297  return;
298  }
299 
300  // removed
301  if ( !mIncremental ) {
302  mRemovedRemoteItems = mUnprocessedLocalItems.toList();
303  mUnprocessedLocalItems.clear();
304  }
305 
306  deleteItems( mRemovedRemoteItems );
307  mLocalItemsById.clear();
308  mLocalItemsByRemoteId.clear();
309  mRemovedRemoteItems.clear();
310 
311  if ( mCurrentTransaction ) {
312  mCurrentTransaction->commit();
313  mCurrentTransaction = 0;
314  }
315 
316  checkDone();
317 }
318 
319 void ItemSync::Private::processItems()
320 {
321  // added / updated
322  foreach ( Item remoteItem, mRemoteItems ) { //krazy:exclude=foreach non-const is needed here
323 #ifndef NDEBUG
324  if ( remoteItem.remoteId().isEmpty() ) {
325  kWarning() << "Item " << remoteItem.id() << " does not have a remote identifier";
326  }
327 #endif
328 
329  Item localItem = mLocalItemsById.value( remoteItem.id() );
330  if ( !localItem.isValid() )
331  localItem = mLocalItemsByRemoteId.value( remoteItem.remoteId() );
332  mUnprocessedLocalItems.remove( localItem );
333  // missing locally
334  if ( !localItem.isValid() ) {
335  createLocalItem( remoteItem );
336  continue;
337  }
338 
339  if ( q->updateItem( localItem, remoteItem ) ) {
340  mPendingJobs++;
341 
342  remoteItem.setId( localItem.id() );
343  remoteItem.setRevision( localItem.revision() );
344  remoteItem.setSize( localItem.size() );
345  remoteItem.setRemoteId( localItem.remoteId() ); // in case someone clears remoteId by accident
346  ItemModifyJob *mod = new ItemModifyJob( remoteItem, subjobParent() );
347  mod->disableRevisionCheck();
348  q->connect( mod, SIGNAL(result(KJob*)), q, SLOT(slotLocalChangeDone(KJob*)) );
349  } else {
350  mProgress++;
351  }
352  }
353  mRemoteItems.clear();
354 }
355 
356 void ItemSync::Private::deleteItems( const Item::List &items )
357 {
358  // if in error state, better not change anything anymore
359  if ( q->error() )
360  return;
361 
362  Item::List itemsToDelete;
363  foreach ( const Item &item, items ) {
364  Item delItem( item );
365  if ( !item.isValid() ) {
366  delItem = mLocalItemsByRemoteId.value( item.remoteId() );
367  }
368 
369  if ( !delItem.isValid() ) {
370 #ifndef NDEBUG
371  kWarning() << "Delete item (remoteeId=" << item.remoteId()
372  << "mimeType=" << item.mimeType()
373  << ") does not have a valid UID and no item with that remote ID exists either";
374 #endif
375  continue;
376  }
377 
378  if ( delItem.remoteId().isEmpty() ) {
379  // don't attempt to remove items that never were written to the backend
380  continue;
381  }
382 
383  itemsToDelete.append ( delItem );
384  }
385 
386  if ( !itemsToDelete.isEmpty() ) {
387  mPendingJobs++;
388  ItemDeleteJob *job = new ItemDeleteJob( itemsToDelete, subjobParent() );
389  q->connect( job, SIGNAL(result(KJob*)), q, SLOT(slotLocalDeleteDone(KJob*)) );
390 
391  // It can happen that the groupware servers report us deleted items
392  // twice, in this case this item delete job will fail on the second try.
393  // To avoid a rollback of the complete transaction we gracefully allow the job
394  // to fail :)
395  TransactionSequence *transaction = qobject_cast<TransactionSequence*>( subjobParent() );
396  if ( transaction )
397  transaction->setIgnoreJobFailure( job );
398  }
399 }
400 
401 void ItemSync::Private::slotLocalDeleteDone( KJob* )
402 {
403  mPendingJobs--;
404  mProgress++;
405 
406  checkDone();
407 }
408 
409 void ItemSync::Private::slotLocalChangeDone( KJob * job )
410 {
411  Q_UNUSED( job );
412  mPendingJobs--;
413  mProgress++;
414 
415  checkDone();
416 }
417 
418 void ItemSync::Private::slotTransactionResult( KJob *job )
419 {
420  --mTransactionJobs;
421  if ( mCurrentTransaction == job )
422  mCurrentTransaction = 0;
423 
424  checkDone();
425 }
426 
427 Job * ItemSync::Private::subjobParent() const
428 {
429  if ( mCurrentTransaction && mTransactionMode != NoTransaction )
430  return mCurrentTransaction;
431  return q;
432 }
433 
434 void ItemSync::setStreamingEnabled(bool enable)
435 {
436  d->mStreaming = enable;
437 }
438 
439 void ItemSync::deliveryDone()
440 {
441  Q_ASSERT( d->mStreaming );
442  d->mDeliveryDone = true;
443  d->execute();
444 }
445 
446 void ItemSync::slotResult(KJob* job)
447 {
448  if ( job->error() ) {
449  // pretent there were no errors
450  Akonadi::Job::removeSubjob( job );
451  // propagate the first error we got but continue, we might still be fed with stuff from a resource
452  if ( !error() ) {
453  setError( job->error() );
454  setErrorText( job->errorText() );
455  }
456  } else {
457  Akonadi::Job::slotResult( job );
458  }
459 }
460 
461 void ItemSync::rollback()
462 {
463  setError( UserCanceled );
464  if ( d->mCurrentTransaction )
465  d->mCurrentTransaction->rollback();
466  d->mDeliveryDone = true; // user wont deliver more data
467  d->execute(); // end this in an ordered way, since we have an error set no real change will be done
468 }
469 
470 void ItemSync::setTransactionMode(ItemSync::TransactionMode mode)
471 {
472  d->mTransactionMode = mode;
473 }
474 
475 #include "moc_itemsync.cpp"
Akonadi::ItemSync::doStart
void doStart()
This method must be reimplemented in the concrete jobs.
Definition: itemsync.cpp:194
Akonadi::ItemModifyJob::disableRevisionCheck
void disableRevisionCheck()
Disables the check of the revision number.
Definition: itemmodifyjob.cpp:374
Akonadi::ItemSync::updateItem
virtual bool updateItem(const Item &storedItem, Item &newItem)
Reimplement this method to customize the synchronization algorithm.
Definition: itemsync.cpp:205
Akonadi::ItemSync::setFullSyncItems
void setFullSyncItems(const Item::List &items)
Sets the full item list for the collection.
Definition: itemsync.cpp:142
Akonadi::ItemSync::setIncrementalSyncItems
void setIncrementalSyncItems(const Item::List &changedItems, const Item::List &removedItems)
Sets the item lists for incrementally syncing the collection.
Definition: itemsync.cpp:170
Akonadi::Collection
Represents a collection of PIM items.
Definition: collection.h:75
Akonadi::ItemSync::SingleTransaction
Use a single transaction for the entire sync process (default), provides maximum consistency ("all or...
Definition: itemsync.h:159
Akonadi::Job
Base class for all actions in the Akonadi storage.
Definition: job.h:86
Akonadi::ItemSync::fetchScope
ItemFetchScope & fetchScope()
Returns the item fetch scope.
Definition: itemsync.cpp:189
Akonadi::Attribute
Provides interface for custom attributes for Entity.
Definition: attribute.h:138
Akonadi::ItemSync::~ItemSync
~ItemSync()
Destroys the item synchronizer.
Definition: itemsync.cpp:137
Akonadi::ItemSync
Syncs between items known to a client (usually a resource) and the Akonadi storage.
Definition: itemsync.h:53
Akonadi::ItemDeleteJob
Job that deletes items from the Akonadi storage.
Definition: itemdeletejob.h:62
Akonadi::ItemFetchJob::fetchScope
ItemFetchScope & fetchScope()
Returns the item fetch scope.
Definition: itemfetchjob.cpp:256
Akonadi::ItemSync::setFetchScope
void setFetchScope(ItemFetchScope &fetchScope)
Sets the item fetch scope.
Definition: itemsync.cpp:184
Akonadi::ItemFetchJob::setFetchScope
void setFetchScope(ItemFetchScope &fetchScope)
Sets the item fetch scope.
Definition: itemfetchjob.cpp:242
Akonadi::TransactionSequence::setIgnoreJobFailure
void setIgnoreJobFailure(KJob *job)
Sets which job of the sequence might fail without rolling back the complete transaction.
Definition: transactionsequence.cpp:175
Akonadi::ItemSync::deliveryDone
void deliveryDone()
Notify ItemSync that all remote items have been delivered.
Definition: itemsync.cpp:439
Akonadi::ItemSync::ItemSync
ItemSync(const Collection &collection, QObject *parent=0)
Creates a new item synchronizer.
Definition: itemsync.cpp:130
Akonadi::ItemCreateJob
Job that creates a new item in the Akonadi storage.
Definition: itemcreatejob.h:73
Akonadi::ItemFetchScope
Specifies which parts of an item should be fetched from the Akonadi storage.
Definition: itemfetchscope.h:68
Akonadi::ItemSync::setTotalItems
void setTotalItems(int amount)
Set the amount of items which you are going to return in total by using the setFullSyncItems() method...
Definition: itemsync.cpp:156
Akonadi::ItemSync::setStreamingEnabled
void setStreamingEnabled(bool enable)
Enable item streaming.
Definition: itemsync.cpp:434
Akonadi::ItemSync::TransactionMode
TransactionMode
Transaction mode used by ItemSync.
Definition: itemsync.h:158
Akonadi::TransactionSequence
Base class for jobs that need to run a sequence of sub-jobs in a transaction.
Definition: transactionsequence.h:69
Akonadi::ItemSync::rollback
void rollback()
Aborts the sync process and rolls back all not yet committed transactions.
Definition: itemsync.cpp:461
Akonadi::ItemModifyJob
Job that modifies an existing item in the Akonadi storage.
Definition: itemmodifyjob.h:97
Akonadi::ItemFetchJob
Job that fetches items from the Akonadi storage.
Definition: itemfetchjob.h:82
Akonadi::Job::removeSubjob
virtual bool removeSubjob(KJob *job)
Removes the given subjob of this job.
Definition: job.cpp:328
Akonadi::Attribute::serialized
virtual QByteArray serialized() const =0
Returns a QByteArray representation of the attribute which will be storaged.
Akonadi::Job::UserCanceled
The user canceld this job.
Definition: job.h:108
Akonadi::ItemSync::setTransactionMode
void setTransactionMode(TransactionMode mode)
Set the transaction mode to use for this sync.
Definition: itemsync.cpp:470
Akonadi::Attribute::type
virtual QByteArray type() const =0
Returns the type of the attribute.
Akonadi::ItemFetchScope::setCacheOnly
void setCacheOnly(bool cacheOnly)
Sets whether payload data should be requested from remote sources or just from the local cache...
Definition: itemfetchscope.cpp:106
This file is part of the KDE documentation.
Documentation copyright © 1996-2014 The KDE developers.
Generated on Tue Oct 14 2014 23:00:27 by doxygen 1.8.7 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.

akonadi

Skip menu "akonadi"
  • Main Page
  • Namespace List
  • Namespace Members
  • Alphabetical List
  • Class List
  • Class Hierarchy
  • Class Members
  • File List
  • Modules
  • Related Pages

kdepimlibs API Reference

Skip menu "kdepimlibs API Reference"
  • akonadi
  •   contact
  •   kmime
  •   socialutils
  • kabc
  • kalarmcal
  • kblog
  • kcal
  • kcalcore
  • kcalutils
  • kholidays
  • kimap
  • kldap
  • kmbox
  • kmime
  • kpimidentities
  • kpimtextedit
  • kresources
  • ktnef
  • kxmlrpcclient
  • microblog

Search



Report problems with this website to our bug tracking system.
Contact the specific authors with questions and comments about the page contents.

KDE® and the K Desktop Environment® logo are registered trademarks of KDE e.V. | Legal