11#include "collection.h" 
   13#include "itemcreatejob.h" 
   14#include "itemdeletejob.h" 
   15#include "itemfetchjob.h" 
   16#include "itemfetchscope.h" 
   18#include "protocol_p.h" 
   19#include "transactionsequence.h" 
   21#include "akonadicore_debug.h" 
   28class Akonadi::ItemSyncPrivate : 
public JobPrivate
 
   31    explicit ItemSyncPrivate(ItemSync *parent)
 
   33        , mTransactionMode(ItemSync::SingleTransaction)
 
   34        , mCurrentTransaction(nullptr)
 
   39        , mTotalItemsProcessed(0)
 
   42        , mDeliveryDone(false)
 
   44        , mFullListingDone(false)
 
   45        , mProcessingBatch(false)
 
   46        , mDisableAutomaticDeliveryDone(false)
 
   48        , mMergeMode(Akonadi::ItemSync::RIDMerge)
 
   52    void createOrMerge(
const Item &item);
 
   54    void slotItemsReceived(
const Item::List &items);
 
   55    void slotLocalListDone(KJob *job);
 
   56    void slotLocalDeleteDone(KJob *job);
 
   57    void slotLocalChangeDone(KJob *job);
 
   62    void slotTransactionResult(KJob *job);
 
   63    void requestTransaction();
 
   64    Job *subjobParent() 
const;
 
   65    void fetchLocalItemsToDelete();
 
   66    QString jobDebuggingString() 
const override;
 
   67    bool allProcessed() 
const;
 
   69    Q_DECLARE_PUBLIC(ItemSync)
 
   70    Collection mSyncCollection;
 
   71    QSet<QString> mListedItems;
 
   74    TransactionSequence *mCurrentTransaction = 
nullptr;
 
   83    QDateTime mItemSyncStart;
 
   89    int mTotalItemsProcessed;
 
   95    bool mFullListingDone;
 
   96    bool mProcessingBatch;
 
   97    bool mDisableAutomaticDeliveryDone;
 
  100    Akonadi::ItemSync::MergeMode mMergeMode;
 
  103void ItemSyncPrivate::createOrMerge(
const Item &item)
 
  111    Item modifiedItem = item;
 
  112    if (mItemSyncStart.isValid()) {
 
  113        modifiedItem.setModificationTime(mItemSyncStart);
 
  115    auto create = 
new ItemCreateJob(modifiedItem, mSyncCollection, subjobParent());
 
  117    if (mMergeMode == ItemSync::GIDMerge && !item.gid().isEmpty()) {
 
  124        slotLocalChangeDone(job);
 
  128bool ItemSyncPrivate::allProcessed()
 const 
  130    return mDeliveryDone && mCurrentBatchRemoteItems.isEmpty() && mRemoteItemQueue.isEmpty() && mRemovedRemoteItemQueue.isEmpty()
 
  131        && mCurrentBatchRemovedRemoteItems.isEmpty();
 
  134void ItemSyncPrivate::checkDone()
 
  138    if (mPendingJobs > 0) {
 
  142    if (mTransactionJobs > 0) {
 
  146            if (mCurrentTransaction) {
 
  149                Q_EMIT q->transactionCommitted();
 
  150                mCurrentTransaction->commit();
 
  151                mCurrentTransaction = 
nullptr;
 
  156    mProcessingBatch = 
false;
 
  159        qCDebug(AKONADICORE_LOG) << 
"ItemSync of collection" << mSyncCollection.id() << 
"finished due to user cancelling";
 
  165    if (!mRemoteItemQueue.isEmpty()) {
 
  168        if (!mProcessingBatch) {
 
  169            Q_EMIT q->readyForNextBatch(mBatchSize - mRemoteItemQueue.size());
 
  173    Q_EMIT q->readyForNextBatch(mBatchSize);
 
  175    if (allProcessed() && !mFinished) {
 
  177        qCDebug(AKONADICORE_LOG) << 
"ItemSync of collection" << mSyncCollection.id() << 
"finished";
 
  186    qCDebug(AKONADICORE_LOG) << 
"Created ItemSync(colId=" << collection.id() << 
", timestamp=" << timestamp << 
")";
 
  188    d->mSyncCollection = collection;
 
  190        d->mItemSyncStart = timestamp;
 
 
  209    Q_ASSERT(!d->mIncremental);
 
  210    if (!d->mStreaming) {
 
  211        d->mDeliveryDone = 
true;
 
  213    d->mRemoteItemQueue += items;
 
  214    d->mTotalItemsProcessed += items.
count();
 
  215    qCDebug(AKONADICORE_LOG) << 
"Received batch: " << items.
count() << 
"Already processed: " << d->mTotalItemsProcessed
 
  216                             << 
"Expected total amount: " << d->mTotalItems;
 
  217    if (!d->mDisableAutomaticDeliveryDone && (d->mTotalItemsProcessed == d->mTotalItems)) {
 
  218        d->mDeliveryDone = 
true;
 
 
  226    Q_ASSERT(!d->mIncremental);
 
  227    Q_ASSERT(amount >= 0);
 
  229    qCDebug(AKONADICORE_LOG) << 
"Expected total amount:" << amount;
 
  230    d->mTotalItems = amount;
 
  232    if (!d->mDisableAutomaticDeliveryDone && (d->mTotalItems == 0)) {
 
  233        d->mDeliveryDone = 
true;
 
 
  241    d->mDisableAutomaticDeliveryDone = disable;
 
 
  254    d->mIncremental = 
true;
 
  255    if (!d->mStreaming) {
 
  256        d->mDeliveryDone = 
true;
 
  258    d->mRemoteItemQueue += changedItems;
 
  259    d->mRemovedRemoteItemQueue += removedItems;
 
  260    d->mTotalItemsProcessed += changedItems.
count() + removedItems.
count();
 
  261    qCDebug(AKONADICORE_LOG) << 
"Received: " << changedItems.
count() << 
"Removed: " << removedItems.
count() << 
"In total: " << d->mTotalItemsProcessed
 
  262                             << 
" Wanted: " << d->mTotalItems;
 
  263    if (!d->mDisableAutomaticDeliveryDone && (d->mTotalItemsProcessed == d->mTotalItems)) {
 
  264        d->mDeliveryDone = 
true;
 
 
  273void ItemSyncPrivate::fetchLocalItemsToDelete()
 
  277        qFatal(
"This must not be called while in incremental mode");
 
  280    auto job = 
new ItemFetchJob(mSyncCollection, subjobParent());
 
  281    job->fetchScope().setFetchRemoteIdentification(
true);
 
  282    job->fetchScope().setFetchModificationTime(
false);
 
  285    job->fetchScope().setCacheOnly(
true);
 
  288        slotItemsReceived(lst);
 
  291        slotLocalListDone(job);
 
  296void ItemSyncPrivate::slotItemsReceived(
const Item::List &items)
 
  298    for (
const Akonadi::Item &item : items) {
 
  300        if (item.remoteId().isEmpty()) {
 
  303        if (!mListedItems.contains(item.remoteId())) {
 
  304            mItemsToDelete << Item(item.id());
 
  309void ItemSyncPrivate::slotLocalListDone(KJob *job)
 
  315    deleteItems(mItemsToDelete);
 
  319QString ItemSyncPrivate::jobDebuggingString()
 const 
  323    return QStringLiteral(
"Collection %1 (%2)").arg(mSyncCollection.id()).arg(mSyncCollection.name());
 
  326void ItemSyncPrivate::execute()
 
  330        qCWarning(AKONADICORE_LOG) << 
"Call to execute() on finished job.";
 
  335    if (!mProcessingBatch) {
 
  336        if (mRemoteItemQueue.size() >= mBatchSize || mDeliveryDone) {
 
  338            const int num = qMin(mBatchSize, mRemoteItemQueue.size());
 
  339            mCurrentBatchRemoteItems.reserve(mBatchSize);
 
  340            std::move(mRemoteItemQueue.begin(), mRemoteItemQueue.begin() + num, std::back_inserter(mCurrentBatchRemoteItems));
 
  341            mRemoteItemQueue.erase(mRemoteItemQueue.begin(), mRemoteItemQueue.begin() + num);
 
  343            mCurrentBatchRemovedRemoteItems += mRemovedRemoteItemQueue;
 
  344            mRemovedRemoteItemQueue.clear();
 
  349        mProcessingBatch = 
true;
 
  357void ItemSyncPrivate::processBatch()
 
  360    if (mCurrentBatchRemoteItems.isEmpty() && !mDeliveryDone) {
 
  369    requestTransaction();
 
  374    if (!mIncremental && allProcessed()) {
 
  376        fetchLocalItemsToDelete();
 
  378        deleteItems(mCurrentBatchRemovedRemoteItems);
 
  379        mCurrentBatchRemovedRemoteItems.clear();
 
  385void ItemSyncPrivate::processItems()
 
  388    for (
const Item &remoteItem : std::as_const(mCurrentBatchRemoteItems)) {
 
  389        if (remoteItem.remoteId().isEmpty()) {
 
  390            qCWarning(AKONADICORE_LOG) << 
"Item " << remoteItem.id() << 
" does not have a remote identifier";
 
  394            mListedItems << remoteItem.remoteId();
 
  396        createOrMerge(remoteItem);
 
  398    mCurrentBatchRemoteItems.clear();
 
  401void ItemSyncPrivate::deleteItems(
const Item::List &itemsToDelete)
 
  414    auto job = 
new ItemDeleteJob(itemsToDelete, subjobParent());
 
  416        slotLocalDeleteDone(job);
 
  423    auto transaction = qobject_cast<TransactionSequence *>(subjobParent());
 
  425        transaction->setIgnoreJobFailure(job);
 
  429void ItemSyncPrivate::slotLocalDeleteDone(KJob *job)
 
  432        qCWarning(AKONADICORE_LOG) << 
"Deleting items from the akonadi database failed:" << job->
errorString();
 
  440void ItemSyncPrivate::slotLocalChangeDone(KJob *job)
 
  442    if (job->
error() && job->
error() != Job::KilledJobError) {
 
  443        qCWarning(AKONADICORE_LOG) << 
"Creating/updating items from the akonadi database failed:" << job->
errorString();
 
  444        mRemoteItemQueue.
clear(); 
 
  452void ItemSyncPrivate::slotTransactionResult(KJob *job)
 
  455    if (mCurrentTransaction == job) {
 
  456        mCurrentTransaction = 
nullptr;
 
  462void ItemSyncPrivate::requestTransaction()
 
  466    if (!mCurrentTransaction) {
 
  468        mCurrentTransaction = 
new TransactionSequence(q);
 
  469        mCurrentTransaction->setAutomaticCommittingEnabled(
false);
 
  471            slotTransactionResult(job);
 
  476Job *ItemSyncPrivate::subjobParent()
 const 
  480        return mCurrentTransaction;
 
  482    return const_cast<ItemSync *
>(q);
 
  488    d->mStreaming = enable;
 
 
  494    Q_ASSERT(d->mStreaming);
 
  495    d->mDeliveryDone = 
true;
 
 
  499void ItemSync::slotResult(
KJob *job)
 
  502        qCWarning(AKONADICORE_LOG) << 
"Error during ItemSync: " << job->
errorString();
 
  511        Akonadi::Job::slotResult(job);
 
  518    qCDebug(AKONADICORE_LOG) << 
"The item sync is being rolled-back.";
 
  520    if (d->mCurrentTransaction) {
 
  521        d->mCurrentTransaction->rollback();
 
  523    d->mDeliveryDone = 
true; 
 
 
  530    d->mTransactionMode = mode;
 
 
  536    return d->mBatchSize;
 
 
  542    d->mBatchSize = size;
 
 
  548    return d->mMergeMode;
 
 
  557#include "moc_itemsync.cpp" 
Represents a collection of PIM items.
 
@ Silent
Only return the id of the merged/created item.
 
Job that fetches items from the Akonadi storage.
 
void itemsReceived(const Akonadi::Item::List &items)
This signal is emitted whenever new items have been fetched completely.
 
@ EmitItemsIndividually
emitted via signal upon reception
 
Syncs between items known to a client (usually a resource) and the Akonadi storage.
 
ItemSync(const Collection &collection, const QDateTime ×tamp={}, QObject *parent=nullptr)
Creates a new item synchronizer.
 
void setTotalItems(int amount)
Set the amount of items which you are going to return in total by using the setFullSyncItems()/setInc...
 
TransactionMode
Transaction mode used by ItemSync.
 
@ NoTransaction
Use no transaction at all, provides highest responsiveness (might therefore feel faster even when act...
 
@ MultipleTransactions
Use one transaction per chunk of delivered items, good compromise between the other two when using st...
 
void setDisableAutomaticDeliveryDone(bool disable)
Disables the automatic completion of the item sync, based on the number of delivered items.
 
void setIncrementalSyncItems(const Item::List &changedItems, const Item::List &removedItems)
Sets the item lists for incrementally syncing the collection.
 
~ItemSync() override
Destroys the item synchronizer.
 
MergeMode mergeMode() const
Returns current merge mode.
 
void setBatchSize(int)
Set the batch size.
 
void setFullSyncItems(const Item::List &items)
Sets the full item list for the collection.
 
void doStart() override
This method must be reimplemented in the concrete jobs.
 
void rollback()
Aborts the sync process and rolls back all not yet committed transactions.
 
void deliveryDone()
Notify ItemSync that all remote items have been delivered.
 
int batchSize() const
Minimum number of items required to start processing in streaming mode.
 
void setStreamingEnabled(bool enable)
Enable item streaming.
 
void setMergeMode(MergeMode mergeMode)
Set what merge method should be used for next ItemSync run.
 
void setTransactionMode(TransactionMode mode)
Set the transaction mode to use for this sync.
 
QList< Item > List
Describes a list of items.
 
Base class for all actions in the Akonadi storage.
 
Job(QObject *parent=nullptr)
Creates a new job.
 
@ UserCanceled
The user canceled this job.
 
bool removeSubjob(KJob *job) override
Removes the given subjob of this job.
 
void setErrorText(const QString &errorText)
 
virtual QString errorString() const
 
void setTotalAmount(Unit unit, qulonglong amount)
 
void setError(int errorCode)
 
QString errorText() const
 
Helper integration between Akonadi and Qt.
 
QStringView merge(QStringView lhs, QStringView rhs)
 
QAction * create(StandardAction id, const Receiver *recvr, Func slot, QObject *parent, std::optional< Qt::ConnectionType > connectionType=std::nullopt)
 
bool isValid() const const
 
qsizetype count() const const
 
bool isEmpty() const const
 
QMetaObject::Connection connect(const QObject *sender, PointerToMemberFunction signal, Functor functor)
 
QObject * parent() const const