20 #include "resourcescheduler_p.h"
22 #include "dbusconnectionpool.h"
23 #include "recursivemover_p.h"
26 #include <klocalizedstring.h>
28 #include <QtCore/QTimer>
29 #include <QtDBus/QDBusInterface>
30 #include <QtDBus/QDBusConnectionInterface>
31 #include <boost/graph/graph_concepts.hpp>
33 using namespace Akonadi;
35 qint64 ResourceScheduler::Task::latestSerial = 0;
36 static QDBusAbstractInterface *s_resourcetracker = 0;
40 ResourceScheduler::ResourceScheduler( QObject *parent ) :
42 mCurrentTasksQueue( -1 ),
47 void ResourceScheduler::scheduleFullSync()
51 TaskList& queue = queueForTaskType( t.type );
52 if ( queue.contains( t ) || mCurrentTask == t )
55 signalTaskToTracker( t,
"SyncAll" );
59 void ResourceScheduler::scheduleCollectionTreeSync()
62 t.type = SyncCollectionTree;
63 TaskList& queue = queueForTaskType( t.type );
64 if ( queue.contains( t ) || mCurrentTask == t )
67 signalTaskToTracker( t,
"SyncCollectionTree" );
71 void ResourceScheduler::scheduleSync(
const Collection & col)
74 t.type = SyncCollection;
76 TaskList& queue = queueForTaskType( t.type );
77 if ( queue.contains( t ) || mCurrentTask == t )
80 signalTaskToTracker( t,
"SyncCollection", QString::number( col.
id() ) );
84 void ResourceScheduler::scheduleAttributesSync(
const Collection &collection )
87 t.type = SyncCollectionAttributes;
88 t.collection = collection;
90 TaskList& queue = queueForTaskType( t.type );
91 if ( queue.contains( t ) || mCurrentTask == t )
94 signalTaskToTracker( t,
"SyncCollectionAttributes", QString::number( collection.
id() ) );
98 void ResourceScheduler::scheduleItemFetch(
const Item & item,
const QSet<QByteArray> &parts,
const QDBusMessage & msg)
107 if ( mCurrentTask == t ) {
108 mCurrentTask.dbusMsgs << msg;
113 TaskList& queue = queueForTaskType( t.type );
114 const int idx = queue.indexOf( t );
116 queue[ idx ].dbusMsgs << msg;
122 signalTaskToTracker( t,
"FetchItem", QString::number( item.id() ) );
126 void ResourceScheduler::scheduleResourceCollectionDeletion()
129 t.type = DeleteResourceCollection;
130 TaskList& queue = queueForTaskType( t.type );
131 if ( queue.contains( t ) || mCurrentTask == t )
134 signalTaskToTracker( t,
"DeleteResourceCollection" );
138 void ResourceScheduler::scheduleCacheInvalidation(
const Collection &collection )
141 t.type = InvalideCacheForCollection;
142 t.collection = collection;
143 TaskList& queue = queueForTaskType( t.type );
144 if ( queue.contains( t ) || mCurrentTask == t )
147 signalTaskToTracker( t,
"InvalideCacheForCollection", QString::number( collection.
id() ) );
151 void ResourceScheduler::scheduleChangeReplay()
154 t.type = ChangeReplay;
155 TaskList& queue = queueForTaskType( t.type );
157 if ( queue.contains( t ) )
160 signalTaskToTracker( t,
"ChangeReplay" );
167 t.type = RecursiveMoveReplay;
168 t.collection = movedCollection;
169 t.argument = QVariant::fromValue( mover );
170 TaskList &queue = queueForTaskType( t.type );
172 if ( queue.contains( t ) || mCurrentTask == t )
176 signalTaskToTracker( t,
"RecursiveMoveReplay", QString::number( t.collection.id() ) );
180 void Akonadi::ResourceScheduler::scheduleFullSyncCompletion()
183 t.type = SyncAllDone;
184 TaskList& queue = queueForTaskType( t.type );
187 signalTaskToTracker( t,
"SyncAllDone" );
191 void Akonadi::ResourceScheduler::scheduleCollectionTreeSyncCompletion()
194 t.type = SyncCollectionTreeDone;
195 TaskList& queue = queueForTaskType( t.type );
198 signalTaskToTracker( t,
"SyncCollectionTreeDone" );
202 void Akonadi::ResourceScheduler::scheduleCustomTask( QObject *receiver,
const char* methodName,
const QVariant &argument,
ResourceBase::SchedulePriority priority )
206 t.receiver = receiver;
207 t.methodName = methodName;
208 t.argument = argument;
209 QueueType queueType = GenericTaskQueue;
211 queueType = AfterChangeReplayQueue;
213 queueType = PrependTaskQueue;
214 TaskList& queue = mTaskList[ queueType ];
216 if ( queue.contains( t ) )
228 signalTaskToTracker( t,
"Custom-" + t.methodName );
232 void ResourceScheduler::taskDone()
235 emit status(
AgentBase::Idle, i18nc(
"@info:status Application ready for work",
"Ready" ) );
237 if ( s_resourcetracker ) {
238 QList<QVariant> argumentList;
239 argumentList << QString::number( mCurrentTask.serial )
241 s_resourcetracker->asyncCallWithArgumentList(QLatin1String(
"jobEnded" ), argumentList);
244 mCurrentTask = Task();
245 mCurrentTasksQueue = -1;
249 void ResourceScheduler::deferTask()
251 if ( mCurrentTask.type == Invalid )
254 if ( s_resourcetracker ) {
255 QList<QVariant> argumentList;
256 argumentList << QString::number( mCurrentTask.serial )
258 s_resourcetracker->asyncCallWithArgumentList(QLatin1String(
"jobEnded" ), argumentList);
261 Task t = mCurrentTask;
262 mCurrentTask = Task();
264 Q_ASSERT( mCurrentTasksQueue >= 0 && mCurrentTasksQueue < NQueueCount );
265 mTaskList[mCurrentTasksQueue].prepend( t );
266 mCurrentTasksQueue = -1;
268 signalTaskToTracker( t,
"DeferedTask" );
273 bool ResourceScheduler::isEmpty()
275 for (
int i = 0; i < NQueueCount; ++i ) {
276 if ( !mTaskList[i].isEmpty() )
282 void ResourceScheduler::scheduleNext()
284 if ( mCurrentTask.type != Invalid || isEmpty() || !mOnline )
286 QTimer::singleShot( 0,
this, SLOT(executeNext()) );
289 void ResourceScheduler::executeNext()
291 if ( mCurrentTask.type != Invalid || isEmpty() )
294 for (
int i = 0; i < NQueueCount; ++i ) {
295 if ( !mTaskList[ i ].isEmpty() ) {
296 mCurrentTask = mTaskList[ i ].takeFirst();
297 mCurrentTasksQueue = i;
302 if ( s_resourcetracker ) {
303 QList<QVariant> argumentList;
304 argumentList << QString::number( mCurrentTask.serial );
305 s_resourcetracker->asyncCallWithArgumentList(QLatin1String(
"jobStarted" ), argumentList);
308 switch ( mCurrentTask.type ) {
310 emit executeFullSync();
312 case SyncCollectionTree:
313 emit executeCollectionTreeSync();
316 emit executeCollectionSync( mCurrentTask.collection );
318 case SyncCollectionAttributes:
319 emit executeCollectionAttributesSync( mCurrentTask.collection );
322 emit executeItemFetch( mCurrentTask.item, mCurrentTask.itemParts );
324 case DeleteResourceCollection:
325 emit executeResourceCollectionDeletion();
327 case InvalideCacheForCollection:
328 emit executeCacheInvalidation( mCurrentTask.collection );
331 emit executeChangeReplay();
333 case RecursiveMoveReplay:
334 emit executeRecursiveMoveReplay( mCurrentTask.argument.value<
RecursiveMover*>() );
337 emit fullSyncComplete();
339 case SyncCollectionTreeDone:
340 emit collectionTreeSyncComplete();
344 const QByteArray methodSig = mCurrentTask.methodName +
"(QVariant)";
345 const bool hasSlotWithVariant = mCurrentTask.receiver->metaObject()->indexOfMethod(methodSig) != -1;
346 bool success =
false;
347 if ( hasSlotWithVariant ) {
348 success = QMetaObject::invokeMethod( mCurrentTask.receiver, mCurrentTask.methodName, Q_ARG(QVariant, mCurrentTask.argument) );
349 Q_ASSERT_X( success || !mCurrentTask.argument.isValid(),
"ResourceScheduler::executeNext",
"Valid argument was provided but the method wasn't found" );
352 success = QMetaObject::invokeMethod( mCurrentTask.receiver, mCurrentTask.methodName );
355 kError() <<
"Could not invoke slot" << mCurrentTask.methodName <<
"on" << mCurrentTask.receiver <<
"with argument" << mCurrentTask.argument;
359 kError() <<
"Unhandled task type" << mCurrentTask.type;
366 ResourceScheduler::Task ResourceScheduler::currentTask()
const
371 void ResourceScheduler::setOnline(
bool state)
373 if ( mOnline == state )
379 if ( mCurrentTask.type != Invalid ) {
381 queueForTaskType( mCurrentTask.type ).prepend( mCurrentTask );
382 mCurrentTask = Task();
383 mCurrentTasksQueue = -1;
386 TaskList& itemFetchQueue = queueForTaskType( FetchItem );
387 for ( QList< Task >::iterator it = itemFetchQueue.begin(); it != itemFetchQueue.end(); ) {
388 if ( (*it).type == FetchItem ) {
389 (*it).sendDBusReplies( i18nc(
"@info",
"Job canceled." ) );
390 it = itemFetchQueue.erase( it );
391 if ( s_resourcetracker ) {
392 QList<QVariant> argumentList;
393 argumentList << QString::number( mCurrentTask.serial )
394 << i18nc(
"@info",
"Job canceled." );
395 s_resourcetracker->asyncCallWithArgumentList( QLatin1String(
"jobEnded" ), argumentList );
404 void ResourceScheduler::signalTaskToTracker(
const Task &task,
const QByteArray &taskType,
const QString &debugString )
407 if ( !s_resourcetracker && DBusConnectionPool::threadConnection().interface()->isServiceRegistered(QLatin1String(
"org.kde.akonadiconsole" ) ) ) {
408 s_resourcetracker =
new QDBusInterface( QLatin1String(
"org.kde.akonadiconsole" ),
409 QLatin1String(
"/resourcesJobtracker" ),
410 QLatin1String(
"org.freedesktop.Akonadi.JobTracker" ),
411 DBusConnectionPool::threadConnection(), 0 );
414 if ( s_resourcetracker ) {
415 QList<QVariant> argumentList;
416 argumentList << static_cast<AgentBase*>( parent() )->identifier()
417 << QString::number( task.serial )
419 << QString::fromLatin1( taskType )
422 s_resourcetracker->asyncCallWithArgumentList(QLatin1String(
"jobCreated" ), argumentList);
430 TaskList& queue = queueForTaskType( SyncCollection );
431 for ( QList<Task>::iterator it = queue.begin(); it != queue.end(); ) {
432 if ( (*it).type == SyncCollection && (*it).collection == collection ) {
433 it = queue.erase( it );
434 kDebug() <<
" erasing";
440 void ResourceScheduler::Task::sendDBusReplies(
const QString &errorMsg )
442 Q_FOREACH(
const QDBusMessage &msg, dbusMsgs ) {
443 QDBusMessage reply( msg.createReply() );
444 const QString methodName = msg.member();
445 if (methodName == QLatin1String(
"requestItemDelivery")) {
446 reply << errorMsg.isEmpty();
447 }
else if (methodName == QLatin1String(
"requestItemDeliveryV2")) {
449 }
else if (methodName.isEmpty()) {
452 kFatal() <<
"Got unexpected member:" << methodName;
454 DBusConnectionPool::threadConnection().send( reply );
458 ResourceScheduler::QueueType ResourceScheduler::queueTypeForTaskType( TaskType type )
462 case RecursiveMoveReplay:
463 return ChangeReplayQueue;
465 return ItemFetchQueue;
467 return GenericTaskQueue;
471 ResourceScheduler::TaskList& ResourceScheduler::queueForTaskType( TaskType type )
473 const QueueType qt = queueTypeForTaskType( type );
474 return mTaskList[ qt ];
477 void ResourceScheduler::dump()
479 kDebug() << dumpToString();
482 QString ResourceScheduler::dumpToString()
const
485 QTextStream str( &ret );
486 str <<
"ResourceScheduler: " << (mOnline?
"Online":
"Offline") << endl;
487 str <<
" current task: " << mCurrentTask << endl;
488 for (
int i = 0; i < NQueueCount; ++i ) {
489 const TaskList& queue = mTaskList[i];
490 if (queue.isEmpty()) {
491 str <<
" queue " << i <<
" is empty" << endl;
493 str <<
" queue " << i <<
" " << queue.size() <<
" tasks:" << endl;
494 for ( QList<Task>::const_iterator it = queue.begin(); it != queue.end(); ++it ) {
495 str <<
" " << (*it) << endl;
502 void ResourceScheduler::clear()
504 kDebug() <<
"Clearing ResourceScheduler queues:";
505 for (
int i = 0; i < NQueueCount; ++i ) {
506 TaskList& queue = mTaskList[i];
509 mCurrentTask = Task();
510 mCurrentTasksQueue = -1;
513 void Akonadi::ResourceScheduler::cancelQueues()
515 for (
int i = 0; i < NQueueCount; ++i ) {
516 TaskList& queue = mTaskList[i];
517 if ( s_resourcetracker ) {
518 foreach (
const Task &t, queue ) {
519 QList<QVariant> argumentList;
520 argumentList << QString::number( t.serial ) << QString();
521 s_resourcetracker->asyncCallWithArgumentList(QLatin1String(
"jobEnded" ), argumentList);
528 static const char s_taskTypes[][27] = {
531 "SyncCollectionTree",
533 "SyncCollectionAttributes",
536 "RecursiveMoveReplay",
537 "DeleteResourceCollection",
538 "InvalideCacheForCollection",
540 "SyncCollectionTreeDone",
544 QTextStream& Akonadi::operator<<( QTextStream& d,
const ResourceScheduler::Task& task )
546 d << task.serial <<
" " << s_taskTypes[task.type] <<
" ";
547 if ( task.type != ResourceScheduler::Invalid ) {
548 if ( task.collection.isValid() )
549 d <<
"collection " << task.collection.id() <<
" ";
550 if ( task.item.id() != -1 )
551 d <<
"item " << task.item.id() <<
" ";
552 if ( !task.methodName.isEmpty() )
553 d << task.methodName <<
" " << task.argument.toString();
558 QDebug Akonadi::operator<<( QDebug d,
const ResourceScheduler::Task& task )
561 QTextStream str( &s );
569 #include "moc_resourcescheduler_p.cpp"
Helper class for expanding inter-resource collection moves inside ResourceBase.
Represents a collection of PIM items.
SchedulePriority
Describes the scheduling priority of a task that has been queued for execution.
The task is scheduled after the last ChangeReplay task in the queue.
Id id() const
Returns the unique identifier of the entity.
The task will be executed as soon as the current task has finished.
bool isValid() const
Returns whether the entity is valid.
The agent does currently nothing.