7#include "resourcescheduler_p.h" 
    9#include "recursivemover_p.h" 
   10#include <QDBusConnection> 
   12#include "akonadiagentbase_debug.h" 
   13#include "private/instance_p.h" 
   14#include <KLocalizedString> 
   16#include <QDBusInterface> 
   20using namespace std::chrono_literals;
 
   21qint64 ResourceScheduler::Task::latestSerial = 0;
 
   26ResourceScheduler::ResourceScheduler(
QObject *parent)
 
   31void ResourceScheduler::scheduleFullSync()
 
   35    TaskList &queue = queueForTaskType(t.type);
 
   36    if (queue.contains(t) || mCurrentTask == t) {
 
   40    signalTaskToTracker(t, 
"SyncAll");
 
   44void ResourceScheduler::scheduleCollectionTreeSync()
 
   47    t.type = SyncCollectionTree;
 
   48    TaskList &queue = queueForTaskType(t.type);
 
   49    if (queue.contains(t) || mCurrentTask == t) {
 
   53    signalTaskToTracker(t, 
"SyncCollectionTree");
 
   57void ResourceScheduler::scheduleTagSync()
 
   61    TaskList &queue = queueForTaskType(t.type);
 
   62    if (queue.contains(t) || mCurrentTask == t) {
 
   66    signalTaskToTracker(t, 
"SyncTags");
 
   70void ResourceScheduler::scheduleSync(
const Collection &col)
 
   73    t.type = SyncCollection;
 
   75    TaskList &queue = queueForTaskType(t.type);
 
   76    if (queue.contains(t) || mCurrentTask == t) {
 
   84void ResourceScheduler::scheduleAttributesSync(
const Collection &collection)
 
   87    t.type = SyncCollectionAttributes;
 
   88    t.collection = collection;
 
   90    TaskList &queue = queueForTaskType(t.type);
 
   91    if (queue.contains(t) || mCurrentTask == t) {
 
   95    signalTaskToTracker(t, 
"SyncCollectionAttributes", 
QString::number(collection.id()));
 
  107    t.argument = parentId;
 
  109    TaskList &queue = queueForTaskType(t.type);
 
  125    if (mCurrentTask == t) {
 
  126        mCurrentTask.dbusMsgs << msg;
 
  131    TaskList &queue = queueForTaskType(t.type);
 
  132    const int idx = queue.indexOf(t);
 
  134        queue[idx].dbusMsgs << msg;
 
  143    for (
const auto &item : items) {
 
  150void ResourceScheduler::scheduleResourceCollectionDeletion()
 
  153    t.type = DeleteResourceCollection;
 
  154    TaskList &queue = queueForTaskType(t.type);
 
  155    if (queue.contains(t) || mCurrentTask == t) {
 
  159    signalTaskToTracker(t, 
"DeleteResourceCollection");
 
  163void ResourceScheduler::scheduleCacheInvalidation(
const Collection &collection)
 
  166    t.type = InvalideCacheForCollection;
 
  167    t.collection = collection;
 
  168    TaskList &queue = queueForTaskType(t.type);
 
  169    if (queue.contains(t) || mCurrentTask == t) {
 
  173    signalTaskToTracker(t, 
"InvalideCacheForCollection", 
QString::number(collection.id()));
 
  177void ResourceScheduler::scheduleChangeReplay()
 
  180    t.type = ChangeReplay;
 
  181    TaskList &queue = queueForTaskType(t.type);
 
  183    if (queue.contains(t)) {
 
  187    signalTaskToTracker(t, 
"ChangeReplay");
 
  191void ResourceScheduler::scheduleMoveReplay(
const Collection &movedCollection, RecursiveMover *mover)
 
  194    t.type = RecursiveMoveReplay;
 
  195    t.collection = movedCollection;
 
  197    TaskList &queue = queueForTaskType(t.type);
 
  199    if (queue.contains(t) || mCurrentTask == t) {
 
  204    signalTaskToTracker(t, 
"RecursiveMoveReplay", 
QString::number(t.collection.id()));
 
  208void Akonadi::ResourceScheduler::scheduleFullSyncCompletion()
 
  211    t.type = SyncAllDone;
 
  212    TaskList &queue = queueForTaskType(t.type);
 
  215    signalTaskToTracker(t, 
"SyncAllDone");
 
  219void Akonadi::ResourceScheduler::scheduleCollectionTreeSyncCompletion()
 
  222    t.type = SyncCollectionTreeDone;
 
  223    TaskList &queue = queueForTaskType(t.type);
 
  226    signalTaskToTracker(t, 
"SyncCollectionTreeDone");
 
  230void Akonadi::ResourceScheduler::scheduleCustomTask(
QObject *receiver,
 
  231                                                    const char *methodName,
 
  237    t.receiver = receiver;
 
  238    t.methodName = methodName;
 
  239    t.argument = argument;
 
  240    QueueType queueType = GenericTaskQueue;
 
  242        queueType = AfterChangeReplayQueue;
 
  244        queueType = PrependTaskQueue;
 
  246    TaskList &queue = mTaskList[queueType];
 
  248    if (queue.contains(t)) {
 
  261    signalTaskToTracker(t, 
"Custom-" + t.methodName);
 
  265void ResourceScheduler::taskDone()
 
  271    if (s_resourcetracker) {
 
  273        s_resourcetracker->asyncCallWithArgumentList(QStringLiteral(
"jobEnded"), argumentList);
 
  276    mCurrentTask = Task();
 
  277    mCurrentTasksQueue = -1;
 
  281void ResourceScheduler::itemFetchDone(
const QString &msg)
 
  283    Q_ASSERT(mCurrentTask.type == FetchItem);
 
  285    TaskList &queue = queueForTaskType(mCurrentTask.type);
 
  287    const qint64 parentId = mCurrentTask.argument.toLongLong();
 
  289    if (msg.
isEmpty() && !queue.isEmpty()) {
 
  290        Task &nextTask = queue[0];
 
  292        if (nextTask.type != mCurrentTask.type || nextTask.argument.toLongLong() != parentId) {
 
  296            mCurrentTask.sendDBusReplies(msg);
 
  301        auto iter = queue.begin();
 
  302        while (iter != queue.end()) {
 
  303            if (iter->type != mCurrentTask.type || iter->argument.toLongLong() == parentId) {
 
  304                iter = queue.erase(iter);
 
  312        mCurrentTask.sendDBusReplies(msg);
 
  318void ResourceScheduler::deferTask()
 
  320    if (mCurrentTask.type == Invalid) {
 
  324    if (s_resourcetracker) {
 
  326        s_resourcetracker->asyncCallWithArgumentList(QStringLiteral(
"jobEnded"), argumentList);
 
  329    Task t = mCurrentTask;
 
  330    mCurrentTask = Task();
 
  332    Q_ASSERT(mCurrentTasksQueue >= 0 && mCurrentTasksQueue < NQueueCount);
 
  333    mTaskList[mCurrentTasksQueue].prepend(t);
 
  334    mCurrentTasksQueue = -1;
 
  336    signalTaskToTracker(t, 
"DeferedTask");
 
  341bool ResourceScheduler::isEmpty()
 
  343    for (
int i = 0; i < NQueueCount; ++i) {
 
  344        if (!mTaskList[i].isEmpty()) {
 
  351void ResourceScheduler::scheduleNext()
 
  353    if (mCurrentTask.type != Invalid || isEmpty() || !mOnline) {
 
  359void ResourceScheduler::executeNext()
 
  361    if (mCurrentTask.type != Invalid || isEmpty()) {
 
  365    for (
int i = 0; i < NQueueCount; ++i) {
 
  366        if (!mTaskList[i].isEmpty()) {
 
  367            mCurrentTask = mTaskList[i].takeFirst();
 
  368            mCurrentTasksQueue = i;
 
  373    if (s_resourcetracker) {
 
  375        s_resourcetracker->asyncCallWithArgumentList(QStringLiteral(
"jobStarted"), argumentList);
 
  378    switch (mCurrentTask.type) {
 
  380        Q_EMIT executeFullSync();
 
  382    case SyncCollectionTree:
 
  383        Q_EMIT executeCollectionTreeSync();
 
  386        Q_EMIT executeCollectionSync(mCurrentTask.collection);
 
  388    case SyncCollectionAttributes:
 
  389        Q_EMIT executeCollectionAttributesSync(mCurrentTask.collection);
 
  392        Q_EMIT executeTagSync();
 
  395        Q_EMIT executeItemFetch(mCurrentTask.items.at(0), mCurrentTask.itemParts);
 
  398        Q_EMIT executeItemsFetch(mCurrentTask.items, mCurrentTask.itemParts);
 
  400    case DeleteResourceCollection:
 
  401        Q_EMIT executeResourceCollectionDeletion();
 
  403    case InvalideCacheForCollection:
 
  404        Q_EMIT executeCacheInvalidation(mCurrentTask.collection);
 
  407        Q_EMIT executeChangeReplay();
 
  409    case RecursiveMoveReplay:
 
  410        Q_EMIT executeRecursiveMoveReplay(mCurrentTask.argument.value<RecursiveMover *>());
 
  413        Q_EMIT fullSyncComplete();
 
  415    case SyncCollectionTreeDone:
 
  416        Q_EMIT collectionTreeSyncComplete();
 
  420        const bool hasSlotWithVariant = mCurrentTask.receiver->metaObject()->indexOfMethod(methodSig.
constData()) != -1;
 
  421        bool success = 
false;
 
  422        if (hasSlotWithVariant) {
 
  424            Q_ASSERT_X(success || !mCurrentTask.argument.isValid(),
 
  425                       "ResourceScheduler::executeNext",
 
  426                       "Valid argument was provided but the method wasn't found");
 
  433            qCCritical(AKONADIAGENTBASE_LOG) << 
"Could not invoke slot" << mCurrentTask.methodName << 
"on" << mCurrentTask.receiver << 
"with argument" 
  434                                             << mCurrentTask.argument;
 
  439        qCCritical(AKONADIAGENTBASE_LOG) << 
"Unhandled task type" << mCurrentTask.type;
 
  446ResourceScheduler::Task ResourceScheduler::currentTask()
 const 
  451ResourceScheduler::Task &ResourceScheduler::currentTask()
 
  456void ResourceScheduler::setOnline(
bool state)
 
  458    if (mOnline == state) {
 
  465        if (mCurrentTask.type != Invalid) {
 
  467            queueForTaskType(mCurrentTask.type).prepend(mCurrentTask);
 
  468            mCurrentTask = Task();
 
  469            mCurrentTasksQueue = -1;
 
  472        TaskList &itemFetchQueue = queueForTaskType(FetchItem);
 
  473        qint64 parentId = -1;
 
  476            if ((*it).type == FetchItem) {
 
  477                qint64 idx = it->argument.toLongLong();
 
  478                if (parentId == -1) {
 
  481                if (idx != parentId) {
 
  484                    lastTask.sendDBusReplies(
i18nc(
"@info", 
"Job canceled."));
 
  488                it = itemFetchQueue.erase(it);
 
  489                if (s_resourcetracker) {
 
  491                    s_resourcetracker->asyncCallWithArgumentList(QStringLiteral(
"jobEnded"), argumentList);
 
  500void ResourceScheduler::signalTaskToTracker(
const Task &task, 
const QByteArray &taskType, 
const QString &debugString)
 
  503    if (!s_resourcetracker) {
 
  507                                                   QStringLiteral(
"/resourcesJobtracker"),
 
  508                                                   QStringLiteral(
"org.freedesktop.Akonadi.JobTracker"),
 
  514    if (s_resourcetracker) {
 
  521        s_resourcetracker->asyncCallWithArgumentList(QStringLiteral(
"jobCreated"), argumentList);
 
  527    if (!collection.isValid()) { 
 
  530    TaskList &queue = queueForTaskType(SyncCollection);
 
  532        if ((*it).type == SyncCollection && (*it).collection == collection) {
 
  533            it = queue.erase(it);
 
  534            qCDebug(AKONADIAGENTBASE_LOG) << 
" erasing";
 
  541void ResourceScheduler::Task::sendDBusReplies(
const QString &errorMsg)
 
  543    for (
const QDBusMessage &msg : std::as_const(dbusMsgs)) {
 
  544        qCDebug(AKONADIAGENTBASE_LOG) << 
"Sending dbus reply for method" << methodName << 
"with error" << errorMsg;
 
  546        if (!errorMsg.isEmpty()) {
 
  549            reply = msg.createReply();
 
  550        } 
else if (msg.member().
isEmpty()) {
 
  553            qCCritical(AKONADIAGENTBASE_LOG) << 
"ResourceScheduler: got unexpected method name :" << msg.member();
 
  559ResourceScheduler::QueueType ResourceScheduler::queueTypeForTaskType(TaskType type)
 
  563    case RecursiveMoveReplay:
 
  564        return ChangeReplayQueue;
 
  567    case SyncCollectionAttributes:
 
  568        return UserActionQueue;
 
  570        return GenericTaskQueue;
 
  574ResourceScheduler::TaskList &ResourceScheduler::queueForTaskType(TaskType type)
 
  576    const QueueType qt = queueTypeForTaskType(type);
 
  577    return mTaskList[qt];
 
  580void ResourceScheduler::dump()
 const 
  582    qCDebug(AKONADIAGENTBASE_LOG) << dumpToString();
 
  585QString ResourceScheduler::dumpToString()
 const 
  589    str << 
"ResourceScheduler: " << (mOnline ? 
"Online" : 
"Offline") << 
'\n';
 
  590    str << 
" current task: " << mCurrentTask << 
'\n';
 
  591    for (
int i = 0; i < NQueueCount; ++i) {
 
  592        const TaskList &queue = mTaskList[i];
 
  593        if (queue.isEmpty()) {
 
  594            str << 
" queue " << i << 
" is empty" << 
'\n';
 
  596            str << 
" queue " << i << 
" " << queue.size() << 
" tasks:\n";
 
  599                str << 
"  " << (*it) << 
'\n';
 
  607void ResourceScheduler::clear()
 
  609    qCDebug(AKONADIAGENTBASE_LOG) << 
"Clearing ResourceScheduler queues:";
 
  610    for (
int i = 0; i < NQueueCount; ++i) {
 
  611        TaskList &queue = mTaskList[i];
 
  614    mCurrentTask = Task();
 
  615    mCurrentTasksQueue = -1;
 
  618void Akonadi::ResourceScheduler::cancelQueues()
 
  620    for (
int i = 0; i < NQueueCount; ++i) {
 
  621        TaskList &queue = mTaskList[i];
 
  622        if (s_resourcetracker) {
 
  623            for (
const Task &t : queue) {
 
  625                s_resourcetracker->asyncCallWithArgumentList(QStringLiteral(
"jobEnded"), argumentList);
 
  632static const char s_taskTypes[][27] = {
"Invalid (no task)",
 
  634                                       "SyncCollectionTree",
 
  636                                       "SyncCollectionAttributes",
 
  641                                       "RecursiveMoveReplay",
 
  642                                       "DeleteResourceCollection",
 
  643                                       "InvalideCacheForCollection",
 
  645                                       "SyncCollectionTreeDone",
 
  650    d << 
task.serial << 
" " << s_taskTypes[
task.type] << 
" ";
 
  651    if (
task.type != ResourceScheduler::Invalid) {
 
  652        if (
task.collection.isValid()) {
 
  653            d << 
"collection " << 
task.collection.id() << 
" ";
 
  655        if (!
task.items.isEmpty()) {
 
  658            for (
const auto &item : std::as_const(
task.items)) {
 
  663        if (!
task.methodName.isEmpty()) {
 
  664            d << 
task.methodName << 
" " << 
task.argument.toString();
 
  670QDebug Akonadi::operator<<(
QDebug d, 
const ResourceScheduler::Task &task)
 
  681#include "moc_resourcescheduler_p.cpp" 
The base class for all Akonadi agents and resources.
 
@ Idle
The agent does currently nothing.
 
Represents a collection of PIM items.
 
Represents a PIM item stored in Akonadi storage.
 
Id id() const
Returns the unique identifier of the item.
 
QList< Item > List
Describes a list of items.
 
SchedulePriority
Describes the scheduling priority of a task that has been queued for execution.
 
@ Prepend
The task will be executed as soon as the current task has finished.
 
@ AfterChangeReplay
The task is scheduled after the last ChangeReplay task in the queue.
 
Q_SCRIPTABLE CaptureState status()
 
QString i18nc(const char *context, const char *text, const TYPE &arg...)
 
Helper integration between Akonadi and Qt.
 
const char * constData() const const
 
bool send(const QDBusMessage &message) const const
 
QDBusConnection sessionBus()
 
void push_back(parameter_type value)
 
void reserve(qsizetype size)
 
qsizetype size() const const
 
QString fromLatin1(QByteArrayView str)
 
bool isEmpty() const const
 
QString number(double n, char format, int precision)
 
QString join(QChar separator) const const
 
QTaskBuilder< Task > task(Task &&task)
 
QVariant fromValue(T &&value)