7#include "resourcescheduler_p.h"
9#include "recursivemover_p.h"
10#include <QDBusConnection>
12#include "akonadiagentbase_debug.h"
13#include "private/instance_p.h"
14#include <KLocalizedString>
16#include <QDBusInterface>
20using namespace std::chrono_literals;
21qint64 ResourceScheduler::Task::latestSerial = 0;
26ResourceScheduler::ResourceScheduler(
QObject *parent)
31void ResourceScheduler::scheduleFullSync()
35 TaskList &queue = queueForTaskType(t.type);
36 if (queue.contains(t) || mCurrentTask == t) {
40 signalTaskToTracker(t,
"SyncAll");
44void ResourceScheduler::scheduleCollectionTreeSync()
47 t.type = SyncCollectionTree;
48 TaskList &queue = queueForTaskType(t.type);
49 if (queue.contains(t) || mCurrentTask == t) {
53 signalTaskToTracker(t,
"SyncCollectionTree");
57void ResourceScheduler::scheduleTagSync()
61 TaskList &queue = queueForTaskType(t.type);
62 if (queue.contains(t) || mCurrentTask == t) {
66 signalTaskToTracker(t,
"SyncTags");
70void ResourceScheduler::scheduleRelationSync()
73 t.type = SyncRelations;
74 TaskList &queue = queueForTaskType(t.type);
75 if (queue.contains(t) || mCurrentTask == t) {
79 signalTaskToTracker(t,
"SyncRelations");
83void ResourceScheduler::scheduleSync(
const Collection &col)
86 t.type = SyncCollection;
88 TaskList &queue = queueForTaskType(t.type);
89 if (queue.contains(t) || mCurrentTask == t) {
97void ResourceScheduler::scheduleAttributesSync(
const Collection &collection)
100 t.type = SyncCollectionAttributes;
101 t.collection = collection;
103 TaskList &queue = queueForTaskType(t.type);
104 if (queue.contains(t) || mCurrentTask == t) {
108 signalTaskToTracker(t,
"SyncCollectionAttributes",
QString::number(collection.id()));
120 t.argument = parentId;
122 TaskList &queue = queueForTaskType(t.type);
138 if (mCurrentTask == t) {
139 mCurrentTask.dbusMsgs << msg;
144 TaskList &queue = queueForTaskType(t.type);
145 const int idx = queue.indexOf(t);
147 queue[idx].dbusMsgs << msg;
156 for (
const auto &item : items) {
163void ResourceScheduler::scheduleResourceCollectionDeletion()
166 t.type = DeleteResourceCollection;
167 TaskList &queue = queueForTaskType(t.type);
168 if (queue.contains(t) || mCurrentTask == t) {
172 signalTaskToTracker(t,
"DeleteResourceCollection");
176void ResourceScheduler::scheduleCacheInvalidation(
const Collection &collection)
179 t.type = InvalideCacheForCollection;
180 t.collection = collection;
181 TaskList &queue = queueForTaskType(t.type);
182 if (queue.contains(t) || mCurrentTask == t) {
186 signalTaskToTracker(t,
"InvalideCacheForCollection",
QString::number(collection.id()));
190void ResourceScheduler::scheduleChangeReplay()
193 t.type = ChangeReplay;
194 TaskList &queue = queueForTaskType(t.type);
196 if (queue.contains(t)) {
200 signalTaskToTracker(t,
"ChangeReplay");
204void ResourceScheduler::scheduleMoveReplay(
const Collection &movedCollection, RecursiveMover *mover)
207 t.type = RecursiveMoveReplay;
208 t.collection = movedCollection;
210 TaskList &queue = queueForTaskType(t.type);
212 if (queue.contains(t) || mCurrentTask == t) {
217 signalTaskToTracker(t,
"RecursiveMoveReplay",
QString::number(t.collection.id()));
221void Akonadi::ResourceScheduler::scheduleFullSyncCompletion()
224 t.type = SyncAllDone;
225 TaskList &queue = queueForTaskType(t.type);
228 signalTaskToTracker(t,
"SyncAllDone");
232void Akonadi::ResourceScheduler::scheduleCollectionTreeSyncCompletion()
235 t.type = SyncCollectionTreeDone;
236 TaskList &queue = queueForTaskType(t.type);
239 signalTaskToTracker(t,
"SyncCollectionTreeDone");
243void Akonadi::ResourceScheduler::scheduleCustomTask(
QObject *receiver,
244 const char *methodName,
250 t.receiver = receiver;
251 t.methodName = methodName;
252 t.argument = argument;
253 QueueType queueType = GenericTaskQueue;
255 queueType = AfterChangeReplayQueue;
257 queueType = PrependTaskQueue;
259 TaskList &queue = mTaskList[queueType];
261 if (queue.contains(t)) {
274 signalTaskToTracker(t,
"Custom-" + t.methodName);
278void ResourceScheduler::taskDone()
284 if (s_resourcetracker) {
289 mCurrentTask = Task();
290 mCurrentTasksQueue = -1;
294void ResourceScheduler::itemFetchDone(
const QString &msg)
296 Q_ASSERT(mCurrentTask.type == FetchItem);
298 TaskList &queue = queueForTaskType(mCurrentTask.type);
300 const qint64 parentId = mCurrentTask.argument.toLongLong();
302 if (msg.
isEmpty() && !queue.isEmpty()) {
303 Task &nextTask = queue[0];
305 if (nextTask.type != mCurrentTask.type || nextTask.argument.toLongLong() != parentId) {
309 mCurrentTask.sendDBusReplies(msg);
314 auto iter = queue.begin();
315 while (iter != queue.end()) {
316 if (iter->type != mCurrentTask.type || iter->argument.toLongLong() == parentId) {
317 iter = queue.erase(iter);
325 mCurrentTask.sendDBusReplies(msg);
331void ResourceScheduler::deferTask()
333 if (mCurrentTask.type == Invalid) {
337 if (s_resourcetracker) {
342 Task t = mCurrentTask;
343 mCurrentTask = Task();
345 Q_ASSERT(mCurrentTasksQueue >= 0 && mCurrentTasksQueue < NQueueCount);
346 mTaskList[mCurrentTasksQueue].prepend(t);
347 mCurrentTasksQueue = -1;
349 signalTaskToTracker(t,
"DeferedTask");
354bool ResourceScheduler::isEmpty()
356 for (
int i = 0; i < NQueueCount; ++i) {
357 if (!mTaskList[i].isEmpty()) {
364void ResourceScheduler::scheduleNext()
366 if (mCurrentTask.type != Invalid || isEmpty() || !mOnline) {
372void ResourceScheduler::executeNext()
374 if (mCurrentTask.type != Invalid || isEmpty()) {
378 for (
int i = 0; i < NQueueCount; ++i) {
379 if (!mTaskList[i].isEmpty()) {
380 mCurrentTask = mTaskList[i].takeFirst();
381 mCurrentTasksQueue = i;
386 if (s_resourcetracker) {
391 switch (mCurrentTask.type) {
393 Q_EMIT executeFullSync();
395 case SyncCollectionTree:
396 Q_EMIT executeCollectionTreeSync();
399 Q_EMIT executeCollectionSync(mCurrentTask.collection);
401 case SyncCollectionAttributes:
402 Q_EMIT executeCollectionAttributesSync(mCurrentTask.collection);
405 Q_EMIT executeTagSync();
408 Q_EMIT executeItemFetch(mCurrentTask.items.at(0), mCurrentTask.itemParts);
411 Q_EMIT executeItemsFetch(mCurrentTask.items, mCurrentTask.itemParts);
413 case DeleteResourceCollection:
414 Q_EMIT executeResourceCollectionDeletion();
416 case InvalideCacheForCollection:
417 Q_EMIT executeCacheInvalidation(mCurrentTask.collection);
420 Q_EMIT executeChangeReplay();
422 case RecursiveMoveReplay:
423 Q_EMIT executeRecursiveMoveReplay(mCurrentTask.argument.value<RecursiveMover *>());
426 Q_EMIT fullSyncComplete();
428 case SyncCollectionTreeDone:
429 Q_EMIT collectionTreeSyncComplete();
432 Q_EMIT executeRelationSync();
436 const bool hasSlotWithVariant = mCurrentTask.receiver->metaObject()->indexOfMethod(methodSig.
constData()) != -1;
437 bool success =
false;
438 if (hasSlotWithVariant) {
440 Q_ASSERT_X(success || !mCurrentTask.argument.isValid(),
441 "ResourceScheduler::executeNext",
442 "Valid argument was provided but the method wasn't found");
449 qCCritical(AKONADIAGENTBASE_LOG) <<
"Could not invoke slot" << mCurrentTask.methodName <<
"on" << mCurrentTask.receiver <<
"with argument"
450 << mCurrentTask.argument;
455 qCCritical(AKONADIAGENTBASE_LOG) <<
"Unhandled task type" << mCurrentTask.type;
462ResourceScheduler::Task ResourceScheduler::currentTask()
const
467ResourceScheduler::Task &ResourceScheduler::currentTask()
472void ResourceScheduler::setOnline(
bool state)
474 if (mOnline == state) {
481 if (mCurrentTask.type != Invalid) {
483 queueForTaskType(mCurrentTask.type).prepend(mCurrentTask);
484 mCurrentTask = Task();
485 mCurrentTasksQueue = -1;
488 TaskList &itemFetchQueue = queueForTaskType(FetchItem);
489 qint64 parentId = -1;
492 if ((*it).type == FetchItem) {
493 qint64 idx = it->argument.toLongLong();
494 if (parentId == -1) {
497 if (idx != parentId) {
500 lastTask.sendDBusReplies(
i18nc(
"@info",
"Job canceled."));
504 it = itemFetchQueue.erase(it);
505 if (s_resourcetracker) {
516void ResourceScheduler::signalTaskToTracker(
const Task &task,
const QByteArray &taskType,
const QString &debugString)
519 if (!s_resourcetracker) {
523 QStringLiteral(
"/resourcesJobtracker"),
524 QStringLiteral(
"org.freedesktop.Akonadi.JobTracker"),
530 if (s_resourcetracker) {
543 if (!collection.isValid()) {
546 TaskList &queue = queueForTaskType(SyncCollection);
548 if ((*it).type == SyncCollection && (*it).collection == collection) {
549 it = queue.erase(it);
550 qCDebug(AKONADIAGENTBASE_LOG) <<
" erasing";
557void ResourceScheduler::Task::sendDBusReplies(
const QString &errorMsg)
560 qCDebug(AKONADIAGENTBASE_LOG) <<
"Sending dbus reply for method" << methodName <<
"with error" << errorMsg;
562 if (!errorMsg.isEmpty()) {
565 reply = msg.createReply();
566 }
else if (msg.member().
isEmpty()) {
569 qCCritical(AKONADIAGENTBASE_LOG) <<
"ResourceScheduler: got unexpected method name :" << msg.member();
575ResourceScheduler::QueueType ResourceScheduler::queueTypeForTaskType(TaskType type)
579 case RecursiveMoveReplay:
580 return ChangeReplayQueue;
583 case SyncCollectionAttributes:
584 return UserActionQueue;
586 return GenericTaskQueue;
590ResourceScheduler::TaskList &ResourceScheduler::queueForTaskType(TaskType type)
592 const QueueType qt = queueTypeForTaskType(type);
593 return mTaskList[qt];
596void ResourceScheduler::dump()
const
598 qCDebug(AKONADIAGENTBASE_LOG) << dumpToString();
601QString ResourceScheduler::dumpToString()
const
605 str <<
"ResourceScheduler: " << (mOnline ?
"Online" :
"Offline") <<
'\n';
606 str <<
" current task: " << mCurrentTask <<
'\n';
607 for (
int i = 0; i < NQueueCount; ++i) {
608 const TaskList &queue = mTaskList[i];
609 if (queue.isEmpty()) {
610 str <<
" queue " << i <<
" is empty" <<
'\n';
612 str <<
" queue " << i <<
" " << queue.size() <<
" tasks:\n";
615 str <<
" " << (*it) <<
'\n';
623void ResourceScheduler::clear()
625 qCDebug(AKONADIAGENTBASE_LOG) <<
"Clearing ResourceScheduler queues:";
626 for (
int i = 0; i < NQueueCount; ++i) {
627 TaskList &queue = mTaskList[i];
630 mCurrentTask = Task();
631 mCurrentTasksQueue = -1;
634void Akonadi::ResourceScheduler::cancelQueues()
636 for (
int i = 0; i < NQueueCount; ++i) {
637 TaskList &queue = mTaskList[i];
638 if (s_resourcetracker) {
639 for (
const Task &t : queue) {
648static const char s_taskTypes[][27] = {
"Invalid (no task)",
650 "SyncCollectionTree",
652 "SyncCollectionAttributes",
657 "RecursiveMoveReplay",
658 "DeleteResourceCollection",
659 "InvalideCacheForCollection",
661 "SyncCollectionTreeDone",
667 d <<
task.serial <<
" " << s_taskTypes[
task.type] <<
" ";
668 if (
task.type != ResourceScheduler::Invalid) {
669 if (
task.collection.isValid()) {
670 d <<
"collection " <<
task.collection.id() <<
" ";
672 if (!
task.items.isEmpty()) {
675 for (
const auto &item :
std::as_const(
task.items)) {
680 if (!
task.methodName.isEmpty()) {
681 d <<
task.methodName <<
" " <<
task.argument.toString();
687QDebug Akonadi::operator<<(
QDebug d,
const ResourceScheduler::Task &task)
698#include "moc_resourcescheduler_p.cpp"
The base class for all Akonadi agents and resources.
@ Idle
The agent does currently nothing.
Represents a collection of PIM items.
Represents a PIM item stored in Akonadi storage.
Id id() const
Returns the unique identifier of the item.
SchedulePriority
Describes the scheduling priority of a task that has been queued for execution.
@ Prepend
The task will be executed as soon as the current task has finished.
@ AfterChangeReplay
The task is scheduled after the last ChangeReplay task in the queue.
Q_SCRIPTABLE CaptureState status()
QString i18nc(const char *context, const char *text, const TYPE &arg...)
Helper integration between Akonadi and Qt.
A glue between Qt and the standard library.
const char * constData() const const
QDBusPendingCall asyncCallWithArgumentList(const QString &method, const QList< QVariant > &args)
bool send(const QDBusMessage &message) const const
QDBusConnection sessionBus()
void push_back(parameter_type value)
void reserve(qsizetype size)
qsizetype size() const const
QString fromLatin1(QByteArrayView str)
bool isEmpty() const const
QString number(double n, char format, int precision)
QString join(QChar separator) const const
QTaskBuilder< Task > task(Task &&task)
QVariant fromValue(T &&value)