20 #include "resourcescheduler_p.h"
22 #include "dbusconnectionpool.h"
23 #include "recursivemover_p.h"
26 #include <klocalizedstring.h>
28 #include <QtCore/QTimer>
29 #include <QtDBus/QDBusInterface>
30 #include <QtDBus/QDBusConnectionInterface>
31 #include <boost/graph/graph_concepts.hpp>
33 using namespace Akonadi;
35 qint64 ResourceScheduler::Task::latestSerial = 0;
40 ResourceScheduler::ResourceScheduler(
QObject *parent ) :
42 mCurrentTasksQueue( -1 ),
47 void ResourceScheduler::scheduleFullSync()
51 TaskList& queue = queueForTaskType( t.type );
52 if ( queue.contains( t ) || mCurrentTask == t )
55 signalTaskToTracker( t,
"SyncAll" );
59 void ResourceScheduler::scheduleCollectionTreeSync()
62 t.type = SyncCollectionTree;
63 TaskList& queue = queueForTaskType( t.type );
64 if ( queue.contains( t ) || mCurrentTask == t )
67 signalTaskToTracker( t,
"SyncCollectionTree" );
71 void ResourceScheduler::scheduleSync(
const Collection & col)
74 t.type = SyncCollection;
76 TaskList& queue = queueForTaskType( t.type );
77 if ( queue.contains( t ) || mCurrentTask == t )
84 void ResourceScheduler::scheduleAttributesSync(
const Collection &collection )
87 t.type = SyncCollectionAttributes;
88 t.collection = collection;
90 TaskList& queue = queueForTaskType( t.type );
91 if ( queue.contains( t ) || mCurrentTask == t )
94 signalTaskToTracker( t,
"SyncCollectionAttributes",
QString::number( collection.
id() ) );
107 if ( mCurrentTask == t ) {
108 mCurrentTask.dbusMsgs << msg;
113 TaskList& queue = queueForTaskType( t.type );
114 const int idx = queue.indexOf( t );
116 queue[ idx ].dbusMsgs << msg;
126 void ResourceScheduler::scheduleResourceCollectionDeletion()
129 t.type = DeleteResourceCollection;
130 TaskList& queue = queueForTaskType( t.type );
131 if ( queue.contains( t ) || mCurrentTask == t )
134 signalTaskToTracker( t,
"DeleteResourceCollection" );
138 void ResourceScheduler::scheduleCacheInvalidation(
const Collection &collection )
141 t.type = InvalideCacheForCollection;
142 t.collection = collection;
143 TaskList& queue = queueForTaskType( t.type );
144 if ( queue.contains( t ) || mCurrentTask == t )
147 signalTaskToTracker( t,
"InvalideCacheForCollection",
QString::number( collection.
id() ) );
151 void ResourceScheduler::scheduleChangeReplay()
154 t.type = ChangeReplay;
155 TaskList& queue = queueForTaskType( t.type );
157 if ( queue.contains( t ) )
160 signalTaskToTracker( t,
"ChangeReplay" );
167 t.type = RecursiveMoveReplay;
168 t.collection = movedCollection;
170 TaskList &queue = queueForTaskType( t.type );
172 if ( queue.contains( t ) || mCurrentTask == t )
176 signalTaskToTracker( t,
"RecursiveMoveReplay",
QString::number( t.collection.id() ) );
180 void Akonadi::ResourceScheduler::scheduleFullSyncCompletion()
183 t.type = SyncAllDone;
184 TaskList& queue = queueForTaskType( t.type );
187 signalTaskToTracker( t,
"SyncAllDone" );
191 void Akonadi::ResourceScheduler::scheduleCollectionTreeSyncCompletion()
194 t.type = SyncCollectionTreeDone;
195 TaskList& queue = queueForTaskType( t.type );
198 signalTaskToTracker( t,
"SyncCollectionTreeDone" );
206 t.receiver = receiver;
207 t.methodName = methodName;
208 t.argument = argument;
209 QueueType queueType = GenericTaskQueue;
211 queueType = AfterChangeReplayQueue;
213 queueType = PrependTaskQueue;
214 TaskList& queue = mTaskList[ queueType ];
216 if ( queue.contains( t ) )
228 signalTaskToTracker( t,
"Custom-" + t.methodName );
232 void ResourceScheduler::taskDone()
235 emit status(
AgentBase::Idle, i18nc(
"@info:status Application ready for work",
"Ready" ) );
237 if ( s_resourcetracker ) {
244 mCurrentTask = Task();
245 mCurrentTasksQueue = -1;
249 void ResourceScheduler::deferTask()
251 if ( mCurrentTask.type == Invalid )
254 if ( s_resourcetracker ) {
261 Task t = mCurrentTask;
262 mCurrentTask = Task();
264 Q_ASSERT( mCurrentTasksQueue >= 0 && mCurrentTasksQueue < NQueueCount );
265 mTaskList[mCurrentTasksQueue].prepend( t );
266 mCurrentTasksQueue = -1;
268 signalTaskToTracker( t,
"DeferedTask" );
273 bool ResourceScheduler::isEmpty()
275 for (
int i = 0; i < NQueueCount; ++i ) {
276 if ( !mTaskList[i].isEmpty() )
282 void ResourceScheduler::scheduleNext()
284 if ( mCurrentTask.type != Invalid || isEmpty() || !mOnline )
289 void ResourceScheduler::executeNext()
291 if ( mCurrentTask.type != Invalid || isEmpty() )
294 for (
int i = 0; i < NQueueCount; ++i ) {
295 if ( !mTaskList[ i ].isEmpty() ) {
296 mCurrentTask = mTaskList[ i ].takeFirst();
297 mCurrentTasksQueue = i;
302 if ( s_resourcetracker ) {
308 switch ( mCurrentTask.type ) {
310 emit executeFullSync();
312 case SyncCollectionTree:
313 emit executeCollectionTreeSync();
316 emit executeCollectionSync( mCurrentTask.collection );
318 case SyncCollectionAttributes:
319 emit executeCollectionAttributesSync( mCurrentTask.collection );
322 emit executeItemFetch( mCurrentTask.item, mCurrentTask.itemParts );
324 case DeleteResourceCollection:
325 emit executeResourceCollectionDeletion();
327 case InvalideCacheForCollection:
328 emit executeCacheInvalidation( mCurrentTask.collection );
331 emit executeChangeReplay();
333 case RecursiveMoveReplay:
334 emit executeRecursiveMoveReplay( mCurrentTask.argument.value<
RecursiveMover*>() );
337 emit fullSyncComplete();
339 case SyncCollectionTreeDone:
340 emit collectionTreeSyncComplete();
344 const QByteArray methodSig = mCurrentTask.methodName +
"(QVariant)";
345 const bool hasSlotWithVariant = mCurrentTask.receiver->metaObject()->indexOfMethod(methodSig) != -1;
346 bool success =
false;
347 if ( hasSlotWithVariant ) {
349 Q_ASSERT_X( success || !mCurrentTask.argument.isValid(),
"ResourceScheduler::executeNext",
"Valid argument was provided but the method wasn't found" );
355 kError() <<
"Could not invoke slot" << mCurrentTask.methodName <<
"on" << mCurrentTask.receiver <<
"with argument" << mCurrentTask.argument;
359 kError() <<
"Unhandled task type" << mCurrentTask.type;
366 ResourceScheduler::Task ResourceScheduler::currentTask()
const
371 void ResourceScheduler::setOnline(
bool state)
373 if ( mOnline == state )
379 if ( mCurrentTask.type != Invalid ) {
381 queueForTaskType( mCurrentTask.type ).prepend( mCurrentTask );
382 mCurrentTask = Task();
383 mCurrentTasksQueue = -1;
386 TaskList& itemFetchQueue = queueForTaskType( FetchItem );
388 if ( (*it).type == FetchItem ) {
389 (*it).sendDBusReplies( i18nc(
"@info",
"Job canceled." ) );
390 it = itemFetchQueue.erase( it );
391 if ( s_resourcetracker ) {
394 << i18nc(
"@info",
"Job canceled." );
404 void ResourceScheduler::signalTaskToTracker(
const Task &task,
const QByteArray &taskType,
const QString &debugString )
407 if ( !s_resourcetracker && DBusConnectionPool::threadConnection().interface()->isServiceRegistered(
QLatin1String(
"org.kde.akonadiconsole" ) ) ) {
411 DBusConnectionPool::threadConnection(), 0 );
414 if ( s_resourcetracker ) {
416 argumentList << static_cast<AgentBase*>( parent() )->identifier()
430 TaskList& queue = queueForTaskType( SyncCollection );
432 if ( (*it).type == SyncCollection && (*it).collection == collection ) {
433 it = queue.erase( it );
434 kDebug() <<
" erasing";
440 void ResourceScheduler::Task::sendDBusReplies(
const QString &errorMsg )
447 }
else if (methodName ==
QLatin1String(
"requestItemDeliveryV2")) {
449 }
else if (methodName.isEmpty()) {
452 kFatal() <<
"Got unexpected member:" << methodName;
454 DBusConnectionPool::threadConnection().send( reply );
458 ResourceScheduler::QueueType ResourceScheduler::queueTypeForTaskType( TaskType type )
462 case RecursiveMoveReplay:
463 return ChangeReplayQueue;
465 case SyncCollectionAttributes:
466 return UserActionQueue;
468 return GenericTaskQueue;
472 ResourceScheduler::TaskList& ResourceScheduler::queueForTaskType( TaskType type )
474 const QueueType qt = queueTypeForTaskType( type );
475 return mTaskList[ qt ];
478 void ResourceScheduler::dump()
480 kDebug() << dumpToString();
483 QString ResourceScheduler::dumpToString()
const
487 str <<
"ResourceScheduler: " << (mOnline?
"Online":
"Offline") << endl;
488 str <<
" current task: " << mCurrentTask << endl;
489 for (
int i = 0; i < NQueueCount; ++i ) {
490 const TaskList& queue = mTaskList[i];
491 if (queue.isEmpty()) {
492 str <<
" queue " << i <<
" is empty" << endl;
494 str <<
" queue " << i <<
" " << queue.size() <<
" tasks:" << endl;
496 str <<
" " << (*it) << endl;
503 void ResourceScheduler::clear()
505 kDebug() <<
"Clearing ResourceScheduler queues:";
506 for (
int i = 0; i < NQueueCount; ++i ) {
507 TaskList& queue = mTaskList[i];
510 mCurrentTask = Task();
511 mCurrentTasksQueue = -1;
514 void Akonadi::ResourceScheduler::cancelQueues()
516 for (
int i = 0; i < NQueueCount; ++i ) {
517 TaskList& queue = mTaskList[i];
518 if ( s_resourcetracker ) {
519 foreach (
const Task &t, queue ) {
529 static const char s_taskTypes[][27] = {
532 "SyncCollectionTree",
534 "SyncCollectionAttributes",
537 "RecursiveMoveReplay",
538 "DeleteResourceCollection",
539 "InvalideCacheForCollection",
541 "SyncCollectionTreeDone",
547 d << task.serial <<
" " << s_taskTypes[task.type] <<
" ";
548 if ( task.type != ResourceScheduler::Invalid ) {
549 if ( task.collection.isValid() )
550 d <<
"collection " << task.collection.id() <<
" ";
551 if ( task.item.id() != -1 )
552 d <<
"item " << task.item.id() <<
" ";
553 if ( !task.methodName.isEmpty() )
554 d << task.methodName <<
" " << task.argument.toString();
559 QDebug Akonadi::operator<<(
QDebug d,
const ResourceScheduler::Task& task )
570 #include "moc_resourcescheduler_p.cpp"
Helper class for expanding inter-resource collection moves inside ResourceBase.
Represents a collection of PIM items.
SchedulePriority
Describes the scheduling priority of a task that has been queued for execution.
QString number(int n, int base)
QDBusMessage createReply(const QList< QVariant > &arguments) const
The task is scheduled after the last ChangeReplay task in the queue.
Id id() const
Returns the unique identifier of the entity.
QVariant fromValue(const T &value)
QDBusPendingCall asyncCallWithArgumentList(const QString &method, const QList< QVariant > &args)
QString fromLatin1(const char *str, int size)
The task will be executed as soon as the current task has finished.
bool isValid() const
Returns whether the entity is valid.
The agent does currently nothing.