23 #include "collection.h"
26 #include "itemcreatejob.h"
27 #include "itemdeletejob.h"
28 #include "itemfetchjob.h"
29 #include "itemmodifyjob.h"
30 #include "transactionsequence.h"
31 #include "itemfetchscope.h"
35 #include <QtCore/QStringList>
37 using namespace Akonadi;
42 class ItemSync::Private
48 mCurrentTransaction( 0 ),
49 mTransactionJobs( 0 ),
53 mTotalItemsProcessed( 0 ),
55 mIncremental( false ),
56 mLocalListDone( false ),
57 mDeliveryDone( false ),
61 mFetchScope.fetchFullPayload();
62 mFetchScope.fetchAllAttributes();
65 void createLocalItem(
const Item &item );
67 void slotLocalListDone( KJob* );
68 void slotLocalDeleteDone( KJob* );
69 void slotLocalChangeDone( KJob* );
72 void deleteItems(
const Item::List &items );
73 void slotTransactionResult( KJob *job );
74 Job* subjobParent()
const;
78 QHash<Item::Id, Akonadi::Item> mLocalItemsById;
79 QHash<QString, Akonadi::Item> mLocalItemsByRemoteId;
80 QSet<Akonadi::Item> mUnprocessedLocalItems;
90 Akonadi::Item::List mRemoteItems;
93 Item::List mRemovedRemoteItems;
99 int mTotalItemsProcessed;
108 void ItemSync::Private::createLocalItem(
const Item & item )
115 q->connect( create, SIGNAL(result(KJob*)), q, SLOT(slotLocalChangeDone(KJob*)) );
118 void ItemSync::Private::checkDone()
120 q->setProcessedAmount( KJob::Bytes, mProgress );
121 if ( mPendingJobs > 0 || !mDeliveryDone || mTransactionJobs > 0 )
132 d( new Private( this ) )
134 d->mSyncCollection = collection;
144 Q_ASSERT( !d->mIncremental );
145 if ( !d->mStreaming )
146 d->mDeliveryDone =
true;
147 d->mRemoteItems += items;
148 d->mTotalItemsProcessed += items.count();
149 kDebug() <<
"Received: " << items.count() <<
"In total: " << d->mTotalItemsProcessed <<
" Wanted: " << d->mTotalItems;
150 setTotalAmount( KJob::Bytes, d->mTotalItemsProcessed );
151 if ( d->mTotalItemsProcessed == d->mTotalItems )
152 d->mDeliveryDone =
true;
158 Q_ASSERT( !d->mIncremental );
159 Q_ASSERT( amount >= 0 );
162 d->mTotalItems = amount;
163 setTotalAmount( KJob::Bytes, amount );
164 if ( d->mTotalItems == 0 ) {
165 d->mDeliveryDone =
true;
172 d->mIncremental =
true;
173 if ( !d->mStreaming )
174 d->mDeliveryDone =
true;
175 d->mRemoteItems += changedItems;
176 d->mRemovedRemoteItems += removedItems;
177 d->mTotalItemsProcessed += changedItems.count() + removedItems.count();
178 setTotalAmount( KJob::Bytes, d->mTotalItemsProcessed );
179 if ( d->mTotalItemsProcessed == d->mTotalItems )
180 d->mDeliveryDone =
true;
191 return d->mFetchScope;
202 connect( job, SIGNAL(result(KJob*)), SLOT(slotLocalListDone(KJob*)) );
216 if ( d->mIncremental )
219 if ( newItem.d_func()->mClearPayload )
223 if ( storedItem.remoteRevision() != newItem.remoteRevision() )
227 if ( storedItem.flags() != newItem.flags() ) {
228 kDebug() <<
"Stored flags " << storedItem.flags()
229 <<
"new flags " << newItem.flags();
234 QSet<QByteArray> missingParts = newItem.loadedPayloadParts();
235 missingParts.subtract( storedItem.loadedPayloadParts() );
236 if ( !missingParts.isEmpty() )
242 if ( newItem.hasPayload()
243 && storedItem.payloadData() != newItem.payloadData() )
247 foreach (
Attribute* attr, newItem.attributes() ) {
248 if ( !storedItem.hasAttribute( attr->
type() ) )
250 if ( attr->
serialized() != storedItem.attribute( attr->
type() )->serialized() )
257 void ItemSync::Private::slotLocalListDone( KJob * job )
259 if ( !job->error() ) {
260 const Item::List list =
static_cast<ItemFetchJob*
>( job )->items();
261 foreach (
const Item &item, list ) {
262 if ( item.remoteId().isEmpty() )
264 mLocalItemsById.insert( item.id(), item );
265 mLocalItemsByRemoteId.insert( item.remoteId(), item );
266 mUnprocessedLocalItems.insert( item );
270 mLocalListDone =
true;
274 void ItemSync::Private::execute()
276 if ( !mLocalListDone )
281 if ( !mDeliveryDone && mRemoteItems.isEmpty() )
284 if ( (mTransactionMode == SingleTransaction && !mCurrentTransaction) || mTransactionMode == MultipleTransactions) {
287 mCurrentTransaction->setAutomaticCommittingEnabled(
false );
288 connect( mCurrentTransaction, SIGNAL(result(KJob*)), q, SLOT(slotTransactionResult(KJob*)) );
292 if ( !mDeliveryDone ) {
293 if ( mTransactionMode == MultipleTransactions && mCurrentTransaction ) {
294 mCurrentTransaction->commit();
295 mCurrentTransaction = 0;
301 if ( !mIncremental ) {
302 mRemovedRemoteItems = mUnprocessedLocalItems.toList();
303 mUnprocessedLocalItems.clear();
306 deleteItems( mRemovedRemoteItems );
307 mLocalItemsById.clear();
308 mLocalItemsByRemoteId.clear();
309 mRemovedRemoteItems.clear();
311 if ( mCurrentTransaction ) {
312 mCurrentTransaction->commit();
313 mCurrentTransaction = 0;
319 void ItemSync::Private::processItems()
322 foreach ( Item remoteItem, mRemoteItems ) {
324 if ( remoteItem.remoteId().isEmpty() ) {
325 kWarning() <<
"Item " << remoteItem.id() <<
" does not have a remote identifier";
329 Item localItem = mLocalItemsById.value( remoteItem.id() );
330 if ( !localItem.isValid() )
331 localItem = mLocalItemsByRemoteId.value( remoteItem.remoteId() );
332 mUnprocessedLocalItems.remove( localItem );
334 if ( !localItem.isValid() ) {
335 createLocalItem( remoteItem );
339 if ( q->updateItem( localItem, remoteItem ) ) {
342 remoteItem.setId( localItem.id() );
343 remoteItem.setRevision( localItem.revision() );
344 remoteItem.setSize( localItem.size() );
345 remoteItem.setRemoteId( localItem.remoteId() );
348 q->connect( mod, SIGNAL(result(KJob*)), q, SLOT(slotLocalChangeDone(KJob*)) );
353 mRemoteItems.clear();
356 void ItemSync::Private::deleteItems(
const Item::List &items )
362 Item::List itemsToDelete;
363 foreach (
const Item &item, items ) {
364 Item delItem( item );
365 if ( !item.isValid() ) {
366 delItem = mLocalItemsByRemoteId.value( item.remoteId() );
369 if ( !delItem.isValid() ) {
371 kWarning() <<
"Delete item (remoteeId=" << item.remoteId()
372 <<
"mimeType=" << item.mimeType()
373 <<
") does not have a valid UID and no item with that remote ID exists either";
378 if ( delItem.remoteId().isEmpty() ) {
383 itemsToDelete.append ( delItem );
386 if ( !itemsToDelete.isEmpty() ) {
389 q->connect( job, SIGNAL(result(KJob*)), q, SLOT(slotLocalDeleteDone(KJob*)) );
401 void ItemSync::Private::slotLocalDeleteDone( KJob* )
409 void ItemSync::Private::slotLocalChangeDone( KJob * job )
418 void ItemSync::Private::slotTransactionResult( KJob *job )
421 if ( mCurrentTransaction == job )
422 mCurrentTransaction = 0;
427 Job * ItemSync::Private::subjobParent()
const
429 if ( mCurrentTransaction && mTransactionMode != NoTransaction )
430 return mCurrentTransaction;
436 d->mStreaming = enable;
441 Q_ASSERT( d->mStreaming );
442 d->mDeliveryDone =
true;
446 void ItemSync::slotResult(KJob* job)
448 if ( job->error() ) {
453 setError( job->error() );
454 setErrorText( job->errorText() );
457 Akonadi::Job::slotResult( job );
464 if ( d->mCurrentTransaction )
465 d->mCurrentTransaction->rollback();
466 d->mDeliveryDone =
true;
472 d->mTransactionMode = mode;
475 #include "moc_itemsync.cpp"
void doStart()
This method must be reimplemented in the concrete jobs.
void disableRevisionCheck()
Disables the check of the revision number.
virtual bool updateItem(const Item &storedItem, Item &newItem)
Reimplement this method to customize the synchronization algorithm.
void setFullSyncItems(const Item::List &items)
Sets the full item list for the collection.
void setIncrementalSyncItems(const Item::List &changedItems, const Item::List &removedItems)
Sets the item lists for incrementally syncing the collection.
Represents a collection of PIM items.
Use a single transaction for the entire sync process (default), provides maximum consistency ("all or...
Base class for all actions in the Akonadi storage.
ItemFetchScope & fetchScope()
Returns the item fetch scope.
Provides interface for custom attributes for Entity.
~ItemSync()
Destroys the item synchronizer.
Syncs between items known to a client (usually a resource) and the Akonadi storage.
Job that deletes items from the Akonadi storage.
ItemFetchScope & fetchScope()
Returns the item fetch scope.
void setFetchScope(ItemFetchScope &fetchScope)
Sets the item fetch scope.
void setFetchScope(ItemFetchScope &fetchScope)
Sets the item fetch scope.
void setIgnoreJobFailure(KJob *job)
Sets which job of the sequence might fail without rolling back the complete transaction.
void deliveryDone()
Notify ItemSync that all remote items have been delivered.
ItemSync(const Collection &collection, QObject *parent=0)
Creates a new item synchronizer.
Job that creates a new item in the Akonadi storage.
Specifies which parts of an item should be fetched from the Akonadi storage.
void setTotalItems(int amount)
Set the amount of items which you are going to return in total by using the setFullSyncItems() method...
void setStreamingEnabled(bool enable)
Enable item streaming.
TransactionMode
Transaction mode used by ItemSync.
Base class for jobs that need to run a sequence of sub-jobs in a transaction.
void rollback()
Aborts the sync process and rolls back all not yet committed transactions.
Job that modifies an existing item in the Akonadi storage.
Job that fetches items from the Akonadi storage.
virtual bool removeSubjob(KJob *job)
Removes the given subjob of this job.
virtual QByteArray serialized() const =0
Returns a QByteArray representation of the attribute which will be storaged.
The user canceld this job.
void setTransactionMode(TransactionMode mode)
Set the transaction mode to use for this sync.
virtual QByteArray type() const =0
Returns the type of the attribute.
void setCacheOnly(bool cacheOnly)
Sets whether payload data should be requested from remote sources or just from the local cache...