7#include "resourcescheduler_p.h"
9#include "recursivemover_p.h"
10#include <QDBusConnection>
12#include "akonadiagentbase_debug.h"
13#include "private/instance_p.h"
14#include <KLocalizedString>
16#include <QDBusInterface>
20using namespace std::chrono_literals;
21qint64 ResourceScheduler::Task::latestSerial = 0;
26ResourceScheduler::ResourceScheduler(
QObject *parent)
31void ResourceScheduler::scheduleFullSync()
35 TaskList &queue = queueForTaskType(t.type);
36 if (queue.contains(t) || mCurrentTask == t) {
40 signalTaskToTracker(t,
"SyncAll");
44void ResourceScheduler::scheduleCollectionTreeSync()
47 t.type = SyncCollectionTree;
48 TaskList &queue = queueForTaskType(t.type);
49 if (queue.contains(t) || mCurrentTask == t) {
53 signalTaskToTracker(t,
"SyncCollectionTree");
57void ResourceScheduler::scheduleTagSync()
61 TaskList &queue = queueForTaskType(t.type);
62 if (queue.contains(t) || mCurrentTask == t) {
66 signalTaskToTracker(t,
"SyncTags");
70void ResourceScheduler::scheduleSync(
const Collection &col)
73 t.type = SyncCollection;
75 TaskList &queue = queueForTaskType(t.type);
76 if (queue.contains(t) || mCurrentTask == t) {
84void ResourceScheduler::scheduleAttributesSync(
const Collection &collection)
87 t.type = SyncCollectionAttributes;
88 t.collection = collection;
90 TaskList &queue = queueForTaskType(t.type);
91 if (queue.contains(t) || mCurrentTask == t) {
95 signalTaskToTracker(t,
"SyncCollectionAttributes",
QString::number(collection.id()));
107 t.argument = parentId;
109 TaskList &queue = queueForTaskType(t.type);
125 if (mCurrentTask == t) {
126 mCurrentTask.dbusMsgs << msg;
131 TaskList &queue = queueForTaskType(t.type);
132 const int idx = queue.indexOf(t);
134 queue[idx].dbusMsgs << msg;
143 for (
const auto &item : items) {
150void ResourceScheduler::scheduleResourceCollectionDeletion()
153 t.type = DeleteResourceCollection;
154 TaskList &queue = queueForTaskType(t.type);
155 if (queue.contains(t) || mCurrentTask == t) {
159 signalTaskToTracker(t,
"DeleteResourceCollection");
163void ResourceScheduler::scheduleCacheInvalidation(
const Collection &collection)
166 t.type = InvalideCacheForCollection;
167 t.collection = collection;
168 TaskList &queue = queueForTaskType(t.type);
169 if (queue.contains(t) || mCurrentTask == t) {
173 signalTaskToTracker(t,
"InvalideCacheForCollection",
QString::number(collection.id()));
177void ResourceScheduler::scheduleChangeReplay()
180 t.type = ChangeReplay;
181 TaskList &queue = queueForTaskType(t.type);
183 if (queue.contains(t)) {
187 signalTaskToTracker(t,
"ChangeReplay");
191void ResourceScheduler::scheduleMoveReplay(
const Collection &movedCollection, RecursiveMover *mover)
194 t.type = RecursiveMoveReplay;
195 t.collection = movedCollection;
197 TaskList &queue = queueForTaskType(t.type);
199 if (queue.contains(t) || mCurrentTask == t) {
204 signalTaskToTracker(t,
"RecursiveMoveReplay",
QString::number(t.collection.id()));
208void Akonadi::ResourceScheduler::scheduleFullSyncCompletion()
211 t.type = SyncAllDone;
212 TaskList &queue = queueForTaskType(t.type);
215 signalTaskToTracker(t,
"SyncAllDone");
219void Akonadi::ResourceScheduler::scheduleCollectionTreeSyncCompletion()
222 t.type = SyncCollectionTreeDone;
223 TaskList &queue = queueForTaskType(t.type);
226 signalTaskToTracker(t,
"SyncCollectionTreeDone");
230void Akonadi::ResourceScheduler::scheduleCustomTask(
QObject *receiver,
231 const char *methodName,
237 t.receiver = receiver;
238 t.methodName = methodName;
239 t.argument = argument;
240 QueueType queueType = GenericTaskQueue;
242 queueType = AfterChangeReplayQueue;
244 queueType = PrependTaskQueue;
246 TaskList &queue = mTaskList[queueType];
248 if (queue.contains(t)) {
261 signalTaskToTracker(t,
"Custom-" + t.methodName);
265void ResourceScheduler::taskDone()
271 if (s_resourcetracker) {
276 mCurrentTask = Task();
277 mCurrentTasksQueue = -1;
281void ResourceScheduler::itemFetchDone(
const QString &msg)
283 Q_ASSERT(mCurrentTask.type == FetchItem);
285 TaskList &queue = queueForTaskType(mCurrentTask.type);
287 const qint64 parentId = mCurrentTask.argument.toLongLong();
289 if (msg.
isEmpty() && !queue.isEmpty()) {
290 Task &nextTask = queue[0];
292 if (nextTask.type != mCurrentTask.type || nextTask.argument.toLongLong() != parentId) {
296 mCurrentTask.sendDBusReplies(msg);
301 auto iter = queue.begin();
302 while (iter != queue.end()) {
303 if (iter->type != mCurrentTask.type || iter->argument.toLongLong() == parentId) {
304 iter = queue.erase(iter);
312 mCurrentTask.sendDBusReplies(msg);
318void ResourceScheduler::deferTask()
320 if (mCurrentTask.type == Invalid) {
324 if (s_resourcetracker) {
329 Task t = mCurrentTask;
330 mCurrentTask = Task();
332 Q_ASSERT(mCurrentTasksQueue >= 0 && mCurrentTasksQueue < NQueueCount);
333 mTaskList[mCurrentTasksQueue].prepend(t);
334 mCurrentTasksQueue = -1;
336 signalTaskToTracker(t,
"DeferedTask");
341bool ResourceScheduler::isEmpty()
343 for (
int i = 0; i < NQueueCount; ++i) {
344 if (!mTaskList[i].isEmpty()) {
351void ResourceScheduler::scheduleNext()
353 if (mCurrentTask.type != Invalid || isEmpty() || !mOnline) {
359void ResourceScheduler::executeNext()
361 if (mCurrentTask.type != Invalid || isEmpty()) {
365 for (
int i = 0; i < NQueueCount; ++i) {
366 if (!mTaskList[i].isEmpty()) {
367 mCurrentTask = mTaskList[i].takeFirst();
368 mCurrentTasksQueue = i;
373 if (s_resourcetracker) {
378 switch (mCurrentTask.type) {
380 Q_EMIT executeFullSync();
382 case SyncCollectionTree:
383 Q_EMIT executeCollectionTreeSync();
386 Q_EMIT executeCollectionSync(mCurrentTask.collection);
388 case SyncCollectionAttributes:
389 Q_EMIT executeCollectionAttributesSync(mCurrentTask.collection);
392 Q_EMIT executeTagSync();
395 Q_EMIT executeItemFetch(mCurrentTask.items.at(0), mCurrentTask.itemParts);
398 Q_EMIT executeItemsFetch(mCurrentTask.items, mCurrentTask.itemParts);
400 case DeleteResourceCollection:
401 Q_EMIT executeResourceCollectionDeletion();
403 case InvalideCacheForCollection:
404 Q_EMIT executeCacheInvalidation(mCurrentTask.collection);
407 Q_EMIT executeChangeReplay();
409 case RecursiveMoveReplay:
410 Q_EMIT executeRecursiveMoveReplay(mCurrentTask.argument.value<RecursiveMover *>());
413 Q_EMIT fullSyncComplete();
415 case SyncCollectionTreeDone:
416 Q_EMIT collectionTreeSyncComplete();
420 const bool hasSlotWithVariant = mCurrentTask.receiver->metaObject()->indexOfMethod(methodSig.
constData()) != -1;
421 bool success =
false;
422 if (hasSlotWithVariant) {
424 Q_ASSERT_X(success || !mCurrentTask.argument.isValid(),
425 "ResourceScheduler::executeNext",
426 "Valid argument was provided but the method wasn't found");
433 qCCritical(AKONADIAGENTBASE_LOG) <<
"Could not invoke slot" << mCurrentTask.methodName <<
"on" << mCurrentTask.receiver <<
"with argument"
434 << mCurrentTask.argument;
439 qCCritical(AKONADIAGENTBASE_LOG) <<
"Unhandled task type" << mCurrentTask.type;
446ResourceScheduler::Task ResourceScheduler::currentTask()
const
451ResourceScheduler::Task &ResourceScheduler::currentTask()
456void ResourceScheduler::setOnline(
bool state)
458 if (mOnline == state) {
465 if (mCurrentTask.type != Invalid) {
467 queueForTaskType(mCurrentTask.type).prepend(mCurrentTask);
468 mCurrentTask = Task();
469 mCurrentTasksQueue = -1;
472 TaskList &itemFetchQueue = queueForTaskType(FetchItem);
473 qint64 parentId = -1;
476 if ((*it).type == FetchItem) {
477 qint64 idx = it->argument.toLongLong();
478 if (parentId == -1) {
481 if (idx != parentId) {
484 lastTask.sendDBusReplies(
i18nc(
"@info",
"Job canceled."));
488 it = itemFetchQueue.erase(it);
489 if (s_resourcetracker) {
500void ResourceScheduler::signalTaskToTracker(
const Task &task,
const QByteArray &taskType,
const QString &debugString)
503 if (!s_resourcetracker) {
507 QStringLiteral(
"/resourcesJobtracker"),
508 QStringLiteral(
"org.freedesktop.Akonadi.JobTracker"),
514 if (s_resourcetracker) {
527 if (!collection.isValid()) {
530 TaskList &queue = queueForTaskType(SyncCollection);
532 if ((*it).type == SyncCollection && (*it).collection == collection) {
533 it = queue.erase(it);
534 qCDebug(AKONADIAGENTBASE_LOG) <<
" erasing";
541void ResourceScheduler::Task::sendDBusReplies(
const QString &errorMsg)
543 for (
const QDBusMessage &msg : std::as_const(dbusMsgs)) {
544 qCDebug(AKONADIAGENTBASE_LOG) <<
"Sending dbus reply for method" << methodName <<
"with error" << errorMsg;
546 if (!errorMsg.isEmpty()) {
549 reply = msg.createReply();
550 }
else if (msg.member().
isEmpty()) {
553 qCCritical(AKONADIAGENTBASE_LOG) <<
"ResourceScheduler: got unexpected method name :" << msg.member();
559ResourceScheduler::QueueType ResourceScheduler::queueTypeForTaskType(TaskType type)
563 case RecursiveMoveReplay:
564 return ChangeReplayQueue;
567 case SyncCollectionAttributes:
568 return UserActionQueue;
570 return GenericTaskQueue;
574ResourceScheduler::TaskList &ResourceScheduler::queueForTaskType(TaskType type)
576 const QueueType qt = queueTypeForTaskType(type);
577 return mTaskList[qt];
580void ResourceScheduler::dump()
const
582 qCDebug(AKONADIAGENTBASE_LOG) << dumpToString();
585QString ResourceScheduler::dumpToString()
const
589 str <<
"ResourceScheduler: " << (mOnline ?
"Online" :
"Offline") <<
'\n';
590 str <<
" current task: " << mCurrentTask <<
'\n';
591 for (
int i = 0; i < NQueueCount; ++i) {
592 const TaskList &queue = mTaskList[i];
593 if (queue.isEmpty()) {
594 str <<
" queue " << i <<
" is empty" <<
'\n';
596 str <<
" queue " << i <<
" " << queue.size() <<
" tasks:\n";
599 str <<
" " << (*it) <<
'\n';
607void ResourceScheduler::clear()
609 qCDebug(AKONADIAGENTBASE_LOG) <<
"Clearing ResourceScheduler queues:";
610 for (
int i = 0; i < NQueueCount; ++i) {
611 TaskList &queue = mTaskList[i];
614 mCurrentTask = Task();
615 mCurrentTasksQueue = -1;
618void Akonadi::ResourceScheduler::cancelQueues()
620 for (
int i = 0; i < NQueueCount; ++i) {
621 TaskList &queue = mTaskList[i];
622 if (s_resourcetracker) {
623 for (
const Task &t : queue) {
632static const char s_taskTypes[][27] = {
"Invalid (no task)",
634 "SyncCollectionTree",
636 "SyncCollectionAttributes",
641 "RecursiveMoveReplay",
642 "DeleteResourceCollection",
643 "InvalideCacheForCollection",
645 "SyncCollectionTreeDone",
650 d <<
task.serial <<
" " << s_taskTypes[
task.type] <<
" ";
651 if (
task.type != ResourceScheduler::Invalid) {
652 if (
task.collection.isValid()) {
653 d <<
"collection " <<
task.collection.id() <<
" ";
655 if (!
task.items.isEmpty()) {
658 for (
const auto &item : std::as_const(
task.items)) {
663 if (!
task.methodName.isEmpty()) {
664 d <<
task.methodName <<
" " <<
task.argument.toString();
670QDebug Akonadi::operator<<(
QDebug d,
const ResourceScheduler::Task &task)
681#include "moc_resourcescheduler_p.cpp"
The base class for all Akonadi agents and resources.
@ Idle
The agent does currently nothing.
Represents a collection of PIM items.
Represents a PIM item stored in Akonadi storage.
Id id() const
Returns the unique identifier of the item.
SchedulePriority
Describes the scheduling priority of a task that has been queued for execution.
@ Prepend
The task will be executed as soon as the current task has finished.
@ AfterChangeReplay
The task is scheduled after the last ChangeReplay task in the queue.
Q_SCRIPTABLE CaptureState status()
QString i18nc(const char *context, const char *text, const TYPE &arg...)
Helper integration between Akonadi and Qt.
const char * constData() const const
QDBusPendingCall asyncCallWithArgumentList(const QString &method, const QList< QVariant > &args)
bool send(const QDBusMessage &message) const const
QDBusConnection sessionBus()
void push_back(parameter_type value)
void reserve(qsizetype size)
qsizetype size() const const
QString fromLatin1(QByteArrayView str)
bool isEmpty() const const
QString number(double n, char format, int precision)
QString join(QChar separator) const const
QTaskBuilder< Task > task(Task &&task)
QVariant fromValue(T &&value)