20 #include "resourcescheduler_p.h" 
   22 #include "dbusconnectionpool.h" 
   23 #include "recursivemover_p.h" 
   26 #include <klocalizedstring.h> 
   28 #include <QtCore/QTimer> 
   29 #include <QtDBus/QDBusInterface> 
   30 #include <QtDBus/QDBusConnectionInterface> 
   31 #include <boost/graph/graph_concepts.hpp> 
   33 using namespace Akonadi;
 
   35 qint64 ResourceScheduler::Task::latestSerial = 0;
 
   40 ResourceScheduler::ResourceScheduler( 
QObject *parent ) :
 
   42     mCurrentTasksQueue( -1 ),
 
   47 void ResourceScheduler::scheduleFullSync()
 
   51   TaskList& queue = queueForTaskType( t.type );
 
   52   if ( queue.contains( t ) || mCurrentTask == t )
 
   55   signalTaskToTracker( t, 
"SyncAll" );
 
   59 void ResourceScheduler::scheduleCollectionTreeSync()
 
   62   t.type = SyncCollectionTree;
 
   63   TaskList& queue = queueForTaskType( t.type );
 
   64   if ( queue.contains( t ) || mCurrentTask == t )
 
   67   signalTaskToTracker( t, 
"SyncCollectionTree" );
 
   71 void ResourceScheduler::scheduleSync(
const Collection & col)
 
   74   t.type = SyncCollection;
 
   76   TaskList& queue = queueForTaskType( t.type );
 
   77   if ( queue.contains( t ) || mCurrentTask == t )
 
   84 void ResourceScheduler::scheduleAttributesSync( 
const Collection &collection )
 
   87   t.type = SyncCollectionAttributes;
 
   88   t.collection = collection;
 
   90   TaskList& queue = queueForTaskType( t.type );
 
   91   if ( queue.contains( t ) || mCurrentTask == t )
 
   94   signalTaskToTracker( t, 
"SyncCollectionAttributes", 
QString::number( collection.
id() ) );
 
  107   if ( mCurrentTask == t ) {
 
  108     mCurrentTask.dbusMsgs << msg;
 
  113   TaskList& queue = queueForTaskType( t.type );
 
  114   const int idx = queue.indexOf( t );
 
  116     queue[ idx ].dbusMsgs << msg;
 
  126 void ResourceScheduler::scheduleResourceCollectionDeletion()
 
  129   t.type = DeleteResourceCollection;
 
  130   TaskList& queue = queueForTaskType( t.type );
 
  131   if ( queue.contains( t ) || mCurrentTask == t )
 
  134   signalTaskToTracker( t, 
"DeleteResourceCollection" );
 
  138 void ResourceScheduler::scheduleCacheInvalidation( 
const Collection &collection )
 
  141   t.type = InvalideCacheForCollection;
 
  142   t.collection = collection;
 
  143   TaskList& queue = queueForTaskType( t.type );
 
  144   if ( queue.contains( t ) || mCurrentTask == t )
 
  147   signalTaskToTracker( t, 
"InvalideCacheForCollection", 
QString::number( collection.
id() ) );
 
  151 void ResourceScheduler::scheduleChangeReplay()
 
  154   t.type = ChangeReplay;
 
  155   TaskList& queue = queueForTaskType( t.type );
 
  157   if ( queue.contains( t ) )
 
  160   signalTaskToTracker( t, 
"ChangeReplay" );
 
  167   t.type = RecursiveMoveReplay;
 
  168   t.collection = movedCollection;
 
  170   TaskList &queue = queueForTaskType( t.type );
 
  172   if ( queue.contains( t ) || mCurrentTask == t )
 
  176   signalTaskToTracker( t, 
"RecursiveMoveReplay", 
QString::number( t.collection.id() ) );
 
  180 void Akonadi::ResourceScheduler::scheduleFullSyncCompletion()
 
  183   t.type = SyncAllDone;
 
  184   TaskList& queue = queueForTaskType( t.type );
 
  187   signalTaskToTracker( t, 
"SyncAllDone" );
 
  191 void Akonadi::ResourceScheduler::scheduleCollectionTreeSyncCompletion()
 
  194   t.type = SyncCollectionTreeDone;
 
  195   TaskList& queue = queueForTaskType( t.type );
 
  198   signalTaskToTracker( t, 
"SyncCollectionTreeDone" );
 
  206   t.receiver = receiver;
 
  207   t.methodName = methodName;
 
  208   t.argument = argument;
 
  209   QueueType queueType = GenericTaskQueue;
 
  211     queueType = AfterChangeReplayQueue;
 
  213     queueType = PrependTaskQueue;
 
  214   TaskList& queue = mTaskList[ queueType ];
 
  216   if ( queue.contains( t ) )
 
  228   signalTaskToTracker( t, 
"Custom-" + t.methodName );
 
  232 void ResourceScheduler::taskDone()
 
  235     emit status( 
AgentBase::Idle, i18nc( 
"@info:status Application ready for work", 
"Ready" ) );
 
  237   if ( s_resourcetracker ) {
 
  244   mCurrentTask = Task();
 
  245   mCurrentTasksQueue = -1;
 
  249 void ResourceScheduler::deferTask()
 
  251   if ( mCurrentTask.type == Invalid )
 
  254   if ( s_resourcetracker ) {
 
  261   Task t = mCurrentTask;
 
  262   mCurrentTask = Task();
 
  264   Q_ASSERT( mCurrentTasksQueue >= 0 && mCurrentTasksQueue < NQueueCount );
 
  265   mTaskList[mCurrentTasksQueue].prepend( t );
 
  266   mCurrentTasksQueue = -1;
 
  268   signalTaskToTracker( t, 
"DeferedTask" );
 
  273 bool ResourceScheduler::isEmpty()
 
  275   for ( 
int i = 0; i < NQueueCount; ++i ) {
 
  276     if ( !mTaskList[i].isEmpty() )
 
  282 void ResourceScheduler::scheduleNext()
 
  284   if ( mCurrentTask.type != Invalid || isEmpty() || !mOnline )
 
  289 void ResourceScheduler::executeNext()
 
  291   if ( mCurrentTask.type != Invalid || isEmpty() )
 
  294   for ( 
int i = 0; i < NQueueCount; ++i ) {
 
  295     if ( !mTaskList[ i ].isEmpty() ) {
 
  296       mCurrentTask = mTaskList[ i ].takeFirst();
 
  297       mCurrentTasksQueue = i;
 
  302   if ( s_resourcetracker ) {
 
  308   switch ( mCurrentTask.type ) {
 
  310       emit executeFullSync();
 
  312     case SyncCollectionTree:
 
  313       emit executeCollectionTreeSync();
 
  316       emit executeCollectionSync( mCurrentTask.collection );
 
  318     case SyncCollectionAttributes:
 
  319       emit executeCollectionAttributesSync( mCurrentTask.collection );
 
  322       emit executeItemFetch( mCurrentTask.item, mCurrentTask.itemParts );
 
  324     case DeleteResourceCollection:
 
  325       emit executeResourceCollectionDeletion();
 
  327     case InvalideCacheForCollection:
 
  328       emit executeCacheInvalidation( mCurrentTask.collection );
 
  331       emit executeChangeReplay();
 
  333     case RecursiveMoveReplay:
 
  334       emit executeRecursiveMoveReplay( mCurrentTask.argument.value<
RecursiveMover*>() );
 
  337       emit fullSyncComplete();
 
  339     case SyncCollectionTreeDone:
 
  340       emit collectionTreeSyncComplete();
 
  344       const QByteArray methodSig = mCurrentTask.methodName + 
"(QVariant)";
 
  345       const bool hasSlotWithVariant = mCurrentTask.receiver->metaObject()->indexOfMethod(methodSig) != -1;
 
  346       bool success = 
false;
 
  347       if ( hasSlotWithVariant ) {
 
  349         Q_ASSERT_X( success || !mCurrentTask.argument.isValid(), 
"ResourceScheduler::executeNext", 
"Valid argument was provided but the method wasn't found" );
 
  355         kError() << 
"Could not invoke slot" << mCurrentTask.methodName << 
"on" << mCurrentTask.receiver << 
"with argument" << mCurrentTask.argument;
 
  359       kError() << 
"Unhandled task type" << mCurrentTask.type;
 
  366 ResourceScheduler::Task ResourceScheduler::currentTask()
 const 
  371 void ResourceScheduler::setOnline(
bool state)
 
  373   if ( mOnline == state )
 
  379     if ( mCurrentTask.type != Invalid ) {
 
  381       queueForTaskType( mCurrentTask.type ).prepend( mCurrentTask );
 
  382       mCurrentTask = Task();
 
  383       mCurrentTasksQueue = -1;
 
  386     TaskList& itemFetchQueue = queueForTaskType( FetchItem );
 
  388       if ( (*it).type == FetchItem ) {
 
  389         (*it).sendDBusReplies( i18nc( 
"@info", 
"Job canceled." ) );
 
  390         it = itemFetchQueue.erase( it );
 
  391         if ( s_resourcetracker ) {
 
  394                        << i18nc( 
"@info", 
"Job canceled." );
 
  404 void ResourceScheduler::signalTaskToTracker( 
const Task &task, 
const QByteArray &taskType, 
const QString &debugString )
 
  407   if ( !s_resourcetracker && DBusConnectionPool::threadConnection().interface()->isServiceRegistered(
QLatin1String( 
"org.kde.akonadiconsole" ) ) ) {
 
  411                                        DBusConnectionPool::threadConnection(), 0 );
 
  414   if ( s_resourcetracker ) {
 
  416     argumentList << static_cast<AgentBase*>( parent() )->identifier()  
 
  430   TaskList& queue = queueForTaskType( SyncCollection );
 
  432     if ( (*it).type == SyncCollection && (*it).collection == collection ) {
 
  433       it = queue.erase( it );
 
  434       kDebug() << 
" erasing";
 
  440 void ResourceScheduler::Task::sendDBusReplies( 
const QString &errorMsg )
 
  447     } 
else if (methodName == 
QLatin1String(
"requestItemDeliveryV2")) {
 
  449     } 
else if (methodName.isEmpty()) {
 
  452       kFatal() << 
"Got unexpected member:" << methodName;
 
  454     DBusConnectionPool::threadConnection().send( reply );
 
  458 ResourceScheduler::QueueType ResourceScheduler::queueTypeForTaskType( TaskType type )
 
  462   case RecursiveMoveReplay:
 
  463     return ChangeReplayQueue;
 
  465   case SyncCollectionAttributes:
 
  466     return UserActionQueue;
 
  468     return GenericTaskQueue;
 
  472 ResourceScheduler::TaskList& ResourceScheduler::queueForTaskType( TaskType type )
 
  474   const QueueType qt = queueTypeForTaskType( type );
 
  475   return mTaskList[ qt ];
 
  478 void ResourceScheduler::dump()
 
  480   kDebug() << dumpToString();
 
  483 QString ResourceScheduler::dumpToString()
 const 
  487   str << 
"ResourceScheduler: " << (mOnline?
"Online":
"Offline") << endl;
 
  488   str << 
" current task: " << mCurrentTask << endl;
 
  489   for ( 
int i = 0; i < NQueueCount; ++i ) {
 
  490     const TaskList& queue = mTaskList[i];
 
  491     if (queue.isEmpty()) {
 
  492       str << 
" queue " << i << 
" is empty" << endl;
 
  494       str << 
" queue " << i << 
" " << queue.size() << 
" tasks:" << endl;
 
  496         str << 
"  " << (*it) << endl;
 
  503 void ResourceScheduler::clear()
 
  505   kDebug() << 
"Clearing ResourceScheduler queues:";
 
  506   for ( 
int i = 0; i < NQueueCount; ++i ) {
 
  507     TaskList& queue = mTaskList[i];
 
  510   mCurrentTask = Task();
 
  511   mCurrentTasksQueue = -1;
 
  514 void Akonadi::ResourceScheduler::cancelQueues()
 
  516   for ( 
int i = 0; i < NQueueCount; ++i ) {
 
  517     TaskList& queue = mTaskList[i];
 
  518     if ( s_resourcetracker ) {
 
  519       foreach ( 
const Task &t, queue ) {
 
  529 static const char s_taskTypes[][27] = {
 
  532       "SyncCollectionTree",
 
  534       "SyncCollectionAttributes",
 
  537       "RecursiveMoveReplay",
 
  538       "DeleteResourceCollection",
 
  539       "InvalideCacheForCollection",
 
  541       "SyncCollectionTreeDone",
 
  547   d << task.serial << 
" " << s_taskTypes[task.type] << 
" ";
 
  548   if ( task.type != ResourceScheduler::Invalid ) {
 
  549     if ( task.collection.isValid() )
 
  550       d << 
"collection " << task.collection.id() << 
" ";
 
  551     if ( task.item.id() != -1 )
 
  552       d << 
"item " << task.item.id() << 
" ";
 
  553     if ( !task.methodName.isEmpty() )
 
  554       d << task.methodName << 
" " << task.argument.toString();
 
  559 QDebug Akonadi::operator<<( 
QDebug d, 
const ResourceScheduler::Task& task )
 
  570 #include "moc_resourcescheduler_p.cpp" 
Helper class for expanding inter-resource collection moves inside ResourceBase. 
Represents a collection of PIM items. 
SchedulePriority
Describes the scheduling priority of a task that has been queued for execution. 
QString number(int n, int base)
QDBusMessage createReply(const QList< QVariant > &arguments) const
The task is scheduled after the last ChangeReplay task in the queue. 
Id id() const 
Returns the unique identifier of the entity. 
QVariant fromValue(const T &value)
QDBusPendingCall asyncCallWithArgumentList(const QString &method, const QList< QVariant > &args)
QString fromLatin1(const char *str, int size)
The task will be executed as soon as the current task has finished. 
bool isValid() const 
Returns whether the entity is valid. 
The agent does currently nothing.