• Skip to content
  • Skip to link menu
KDE API Reference
  • KDE API Reference
  • kdepimlibs API Reference
  • KDE Home
  • Contact Us
 

akonadi

  • sources
  • kde-4.12
  • kdepimlibs
  • akonadi
resourcescheduler.cpp
1 /*
2  Copyright (c) 2007 Volker Krause <vkrause@kde.org>
3 
4  This library is free software; you can redistribute it and/or modify it
5  under the terms of the GNU Library General Public License as published by
6  the Free Software Foundation; either version 2 of the License, or (at your
7  option) any later version.
8 
9  This library is distributed in the hope that it will be useful, but WITHOUT
10  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
12  License for more details.
13 
14  You should have received a copy of the GNU Library General Public License
15  along with this library; see the file COPYING.LIB. If not, write to the
16  Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17  02110-1301, USA.
18 */
19 
20 #include "resourcescheduler_p.h"
21 
22 #include "dbusconnectionpool.h"
23 #include "recursivemover_p.h"
24 
25 #include <kdebug.h>
26 #include <klocalizedstring.h>
27 
28 #include <QtCore/QTimer>
29 #include <QtDBus/QDBusInterface>
30 #include <QtDBus/QDBusConnectionInterface>
31 #include <boost/graph/graph_concepts.hpp>
32 
33 using namespace Akonadi;
34 
35 qint64 ResourceScheduler::Task::latestSerial = 0;
36 static QDBusAbstractInterface *s_resourcetracker = 0;
37 
38 //@cond PRIVATE
39 
40 ResourceScheduler::ResourceScheduler( QObject *parent ) :
41  QObject( parent ),
42  mCurrentTasksQueue( -1 ),
43  mOnline( false )
44 {
45 }
46 
47 void ResourceScheduler::scheduleFullSync()
48 {
49  Task t;
50  t.type = SyncAll;
51  TaskList& queue = queueForTaskType( t.type );
52  if ( queue.contains( t ) || mCurrentTask == t )
53  return;
54  queue << t;
55  signalTaskToTracker( t, "SyncAll" );
56  scheduleNext();
57 }
58 
59 void ResourceScheduler::scheduleCollectionTreeSync()
60 {
61  Task t;
62  t.type = SyncCollectionTree;
63  TaskList& queue = queueForTaskType( t.type );
64  if ( queue.contains( t ) || mCurrentTask == t )
65  return;
66  queue << t;
67  signalTaskToTracker( t, "SyncCollectionTree" );
68  scheduleNext();
69 }
70 
71 void ResourceScheduler::scheduleSync(const Collection & col)
72 {
73  Task t;
74  t.type = SyncCollection;
75  t.collection = col;
76  TaskList& queue = queueForTaskType( t.type );
77  if ( queue.contains( t ) || mCurrentTask == t )
78  return;
79  queue << t;
80  signalTaskToTracker( t, "SyncCollection", QString::number( col.id() ) );
81  scheduleNext();
82 }
83 
84 void ResourceScheduler::scheduleAttributesSync( const Collection &collection )
85 {
86  Task t;
87  t.type = SyncCollectionAttributes;
88  t.collection = collection;
89 
90  TaskList& queue = queueForTaskType( t.type );
91  if ( queue.contains( t ) || mCurrentTask == t )
92  return;
93  queue << t;
94  signalTaskToTracker( t, "SyncCollectionAttributes", QString::number( collection.id() ) );
95  scheduleNext();
96 }
97 
98 void ResourceScheduler::scheduleItemFetch(const Item & item, const QSet<QByteArray> &parts, const QDBusMessage & msg)
99 {
100  Task t;
101  t.type = FetchItem;
102  t.item = item;
103  t.itemParts = parts;
104 
105  // if the current task does already fetch the requested item, break here but
106  // keep the dbus message, so we can send the reply later on
107  if ( mCurrentTask == t ) {
108  mCurrentTask.dbusMsgs << msg;
109  return;
110  }
111 
112  // If this task is already in the queue, merge with it.
113  TaskList& queue = queueForTaskType( t.type );
114  const int idx = queue.indexOf( t );
115  if ( idx != -1 ) {
116  queue[ idx ].dbusMsgs << msg;
117  return;
118  }
119 
120  t.dbusMsgs << msg;
121  queue << t;
122  signalTaskToTracker( t, "FetchItem", QString::number( item.id() ) );
123  scheduleNext();
124 }
125 
126 void ResourceScheduler::scheduleResourceCollectionDeletion()
127 {
128  Task t;
129  t.type = DeleteResourceCollection;
130  TaskList& queue = queueForTaskType( t.type );
131  if ( queue.contains( t ) || mCurrentTask == t )
132  return;
133  queue << t;
134  signalTaskToTracker( t, "DeleteResourceCollection" );
135  scheduleNext();
136 }
137 
138 void ResourceScheduler::scheduleCacheInvalidation( const Collection &collection )
139 {
140  Task t;
141  t.type = InvalideCacheForCollection;
142  t.collection = collection;
143  TaskList& queue = queueForTaskType( t.type );
144  if ( queue.contains( t ) || mCurrentTask == t )
145  return;
146  queue << t;
147  signalTaskToTracker( t, "InvalideCacheForCollection", QString::number( collection.id() ) );
148  scheduleNext();
149 }
150 
151 void ResourceScheduler::scheduleChangeReplay()
152 {
153  Task t;
154  t.type = ChangeReplay;
155  TaskList& queue = queueForTaskType( t.type );
156  // see ResourceBase::changeProcessed() for why we do not check for mCurrentTask == t here like in the other tasks
157  if ( queue.contains( t ) )
158  return;
159  queue << t;
160  signalTaskToTracker( t, "ChangeReplay" );
161  scheduleNext();
162 }
163 
164 void ResourceScheduler::scheduleMoveReplay( const Collection &movedCollection, RecursiveMover *mover )
165 {
166  Task t;
167  t.type = RecursiveMoveReplay;
168  t.collection = movedCollection;
169  t.argument = QVariant::fromValue( mover );
170  TaskList &queue = queueForTaskType( t.type );
171 
172  if ( queue.contains( t ) || mCurrentTask == t )
173  return;
174 
175  queue << t;
176  signalTaskToTracker( t, "RecursiveMoveReplay", QString::number( t.collection.id() ) );
177  scheduleNext();
178 }
179 
180 void Akonadi::ResourceScheduler::scheduleFullSyncCompletion()
181 {
182  Task t;
183  t.type = SyncAllDone;
184  TaskList& queue = queueForTaskType( t.type );
185  // no compression here, all this does is emitting a D-Bus signal anyway, and compression can trigger races on the receiver side with the signal being lost
186  queue << t;
187  signalTaskToTracker( t, "SyncAllDone" );
188  scheduleNext();
189 }
190 
191 void Akonadi::ResourceScheduler::scheduleCollectionTreeSyncCompletion()
192 {
193  Task t;
194  t.type = SyncCollectionTreeDone;
195  TaskList& queue = queueForTaskType( t.type );
196  // no compression here, all this does is emitting a D-Bus signal anyway, and compression can trigger races on the receiver side with the signal being lost
197  queue << t;
198  signalTaskToTracker( t, "SyncCollectionTreeDone" );
199  scheduleNext();
200 }
201 
202 void Akonadi::ResourceScheduler::scheduleCustomTask( QObject *receiver, const char* methodName, const QVariant &argument, ResourceBase::SchedulePriority priority )
203 {
204  Task t;
205  t.type = Custom;
206  t.receiver = receiver;
207  t.methodName = methodName;
208  t.argument = argument;
209  QueueType queueType = GenericTaskQueue;
210  if ( priority == ResourceBase::AfterChangeReplay )
211  queueType = AfterChangeReplayQueue;
212  else if ( priority == ResourceBase::Prepend )
213  queueType = PrependTaskQueue;
214  TaskList& queue = mTaskList[ queueType ];
215 
216  if ( queue.contains( t ) )
217  return;
218 
219  switch (priority) {
220  case ResourceBase::Prepend:
221  queue.prepend( t );
222  break;
223  default:
224  queue.append(t);
225  break;
226  }
227 
228  signalTaskToTracker( t, "Custom-" + t.methodName );
229  scheduleNext();
230 }
231 
232 void ResourceScheduler::taskDone()
233 {
234  if ( isEmpty() )
235  emit status( AgentBase::Idle, i18nc( "@info:status Application ready for work", "Ready" ) );
236 
237  if ( s_resourcetracker ) {
238  QList<QVariant> argumentList;
239  argumentList << QString::number( mCurrentTask.serial )
240  << QString();
241  s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobEnded" ), argumentList);
242  }
243 
244  mCurrentTask = Task();
245  mCurrentTasksQueue = -1;
246  scheduleNext();
247 }
248 
249 void ResourceScheduler::deferTask()
250 {
251  if ( mCurrentTask.type == Invalid )
252  return;
253 
254  if ( s_resourcetracker ) {
255  QList<QVariant> argumentList;
256  argumentList << QString::number( mCurrentTask.serial )
257  << QString();
258  s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobEnded" ), argumentList);
259  }
260 
261  Task t = mCurrentTask;
262  mCurrentTask = Task();
263 
264  Q_ASSERT( mCurrentTasksQueue >= 0 && mCurrentTasksQueue < NQueueCount );
265  mTaskList[mCurrentTasksQueue].prepend( t );
266  mCurrentTasksQueue = -1;
267 
268  signalTaskToTracker( t, "DeferedTask" );
269 
270  scheduleNext();
271 }
272 
273 bool ResourceScheduler::isEmpty()
274 {
275  for ( int i = 0; i < NQueueCount; ++i ) {
276  if ( !mTaskList[i].isEmpty() )
277  return false;
278  }
279  return true;
280 }
281 
282 void ResourceScheduler::scheduleNext()
283 {
284  if ( mCurrentTask.type != Invalid || isEmpty() || !mOnline )
285  return;
286  QTimer::singleShot( 0, this, SLOT(executeNext()) );
287 }
288 
289 void ResourceScheduler::executeNext()
290 {
291  if ( mCurrentTask.type != Invalid || isEmpty() )
292  return;
293 
294  for ( int i = 0; i < NQueueCount; ++i ) {
295  if ( !mTaskList[ i ].isEmpty() ) {
296  mCurrentTask = mTaskList[ i ].takeFirst();
297  mCurrentTasksQueue = i;
298  break;
299  }
300  }
301 
302  if ( s_resourcetracker ) {
303  QList<QVariant> argumentList;
304  argumentList << QString::number( mCurrentTask.serial );
305  s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobStarted" ), argumentList);
306  }
307 
308  switch ( mCurrentTask.type ) {
309  case SyncAll:
310  emit executeFullSync();
311  break;
312  case SyncCollectionTree:
313  emit executeCollectionTreeSync();
314  break;
315  case SyncCollection:
316  emit executeCollectionSync( mCurrentTask.collection );
317  break;
318  case SyncCollectionAttributes:
319  emit executeCollectionAttributesSync( mCurrentTask.collection );
320  break;
321  case FetchItem:
322  emit executeItemFetch( mCurrentTask.item, mCurrentTask.itemParts );
323  break;
324  case DeleteResourceCollection:
325  emit executeResourceCollectionDeletion();
326  break;
327  case InvalideCacheForCollection:
328  emit executeCacheInvalidation( mCurrentTask.collection );
329  break;
330  case ChangeReplay:
331  emit executeChangeReplay();
332  break;
333  case RecursiveMoveReplay:
334  emit executeRecursiveMoveReplay( mCurrentTask.argument.value<RecursiveMover*>() );
335  break;
336  case SyncAllDone:
337  emit fullSyncComplete();
338  break;
339  case SyncCollectionTreeDone:
340  emit collectionTreeSyncComplete();
341  break;
342  case Custom:
343  {
344  const QByteArray methodSig = mCurrentTask.methodName + "(QVariant)";
345  const bool hasSlotWithVariant = mCurrentTask.receiver->metaObject()->indexOfMethod(methodSig) != -1;
346  bool success = false;
347  if ( hasSlotWithVariant ) {
348  success = QMetaObject::invokeMethod( mCurrentTask.receiver, mCurrentTask.methodName, Q_ARG(QVariant, mCurrentTask.argument) );
349  Q_ASSERT_X( success || !mCurrentTask.argument.isValid(), "ResourceScheduler::executeNext", "Valid argument was provided but the method wasn't found" );
350  }
351  if ( !success )
352  success = QMetaObject::invokeMethod( mCurrentTask.receiver, mCurrentTask.methodName );
353 
354  if ( !success )
355  kError() << "Could not invoke slot" << mCurrentTask.methodName << "on" << mCurrentTask.receiver << "with argument" << mCurrentTask.argument;
356  break;
357  }
358  default: {
359  kError() << "Unhandled task type" << mCurrentTask.type;
360  dump();
361  Q_ASSERT( false );
362  }
363  }
364 }
365 
366 ResourceScheduler::Task ResourceScheduler::currentTask() const
367 {
368  return mCurrentTask;
369 }
370 
371 void ResourceScheduler::setOnline(bool state)
372 {
373  if ( mOnline == state )
374  return;
375  mOnline = state;
376  if ( mOnline ) {
377  scheduleNext();
378  } else {
379  if ( mCurrentTask.type != Invalid ) {
380  // abort running task
381  queueForTaskType( mCurrentTask.type ).prepend( mCurrentTask );
382  mCurrentTask = Task();
383  mCurrentTasksQueue = -1;
384  }
385  // abort pending synchronous tasks, might take longer until the resource goes online again
386  TaskList& itemFetchQueue = queueForTaskType( FetchItem );
387  for ( QList< Task >::iterator it = itemFetchQueue.begin(); it != itemFetchQueue.end(); ) {
388  if ( (*it).type == FetchItem ) {
389  (*it).sendDBusReplies( i18nc( "@info", "Job canceled." ) );
390  it = itemFetchQueue.erase( it );
391  if ( s_resourcetracker ) {
392  QList<QVariant> argumentList;
393  argumentList << QString::number( mCurrentTask.serial )
394  << i18nc( "@info", "Job canceled." );
395  s_resourcetracker->asyncCallWithArgumentList( QLatin1String( "jobEnded" ), argumentList );
396  }
397  } else {
398  ++it;
399  }
400  }
401  }
402 }
403 
404 void ResourceScheduler::signalTaskToTracker( const Task &task, const QByteArray &taskType, const QString &debugString )
405 {
406  // if there's a job tracer running, tell it about the new job
407  if ( !s_resourcetracker && DBusConnectionPool::threadConnection().interface()->isServiceRegistered(QLatin1String( "org.kde.akonadiconsole" ) ) ) {
408  s_resourcetracker = new QDBusInterface( QLatin1String( "org.kde.akonadiconsole" ),
409  QLatin1String( "/resourcesJobtracker" ),
410  QLatin1String( "org.freedesktop.Akonadi.JobTracker" ),
411  DBusConnectionPool::threadConnection(), 0 );
412  }
413 
414  if ( s_resourcetracker ) {
415  QList<QVariant> argumentList;
416  argumentList << static_cast<AgentBase*>( parent() )->identifier() // "session" (in our case resource)
417  << QString::number( task.serial ) // "job"
418  << QString() // "parent job"
419  << QString::fromLatin1( taskType ) // "job type"
420  << debugString // "job debugging string"
421  ;
422  s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobCreated" ), argumentList);
423  }
424 }
425 
426 void ResourceScheduler::collectionRemoved( const Akonadi::Collection &collection )
427 {
428  if ( !collection.isValid() ) // should not happen, but you never know...
429  return;
430  TaskList& queue = queueForTaskType( SyncCollection );
431  for ( QList<Task>::iterator it = queue.begin(); it != queue.end(); ) {
432  if ( (*it).type == SyncCollection && (*it).collection == collection ) {
433  it = queue.erase( it );
434  kDebug() << " erasing";
435  } else
436  ++it;
437  }
438 }
439 
440 void ResourceScheduler::Task::sendDBusReplies( const QString &errorMsg )
441 {
442  Q_FOREACH( const QDBusMessage &msg, dbusMsgs ) {
443  QDBusMessage reply( msg.createReply() );
444  const QString methodName = msg.member();
445  if (methodName == QLatin1String("requestItemDelivery")) {
446  reply << errorMsg.isEmpty();
447  } else if (methodName == QLatin1String("requestItemDeliveryV2")) {
448  reply << errorMsg;
449  } else if (methodName.isEmpty()) {
450  continue; // unittest calls scheduleItemFetch with empty QDBusMessage
451  } else {
452  kFatal() << "Got unexpected member:" << methodName;
453  }
454  DBusConnectionPool::threadConnection().send( reply );
455  }
456 }
457 
458 ResourceScheduler::QueueType ResourceScheduler::queueTypeForTaskType( TaskType type )
459 {
460  switch ( type ) {
461  case ChangeReplay:
462  case RecursiveMoveReplay:
463  return ChangeReplayQueue;
464  case FetchItem:
465  return ItemFetchQueue;
466  default:
467  return GenericTaskQueue;
468  }
469 }
470 
471 ResourceScheduler::TaskList& ResourceScheduler::queueForTaskType( TaskType type )
472 {
473  const QueueType qt = queueTypeForTaskType( type );
474  return mTaskList[ qt ];
475 }
476 
477 void ResourceScheduler::dump()
478 {
479  kDebug() << dumpToString();
480 }
481 
482 QString ResourceScheduler::dumpToString() const
483 {
484  QString ret;
485  QTextStream str( &ret );
486  str << "ResourceScheduler: " << (mOnline?"Online":"Offline") << endl;
487  str << " current task: " << mCurrentTask << endl;
488  for ( int i = 0; i < NQueueCount; ++i ) {
489  const TaskList& queue = mTaskList[i];
490  if (queue.isEmpty()) {
491  str << " queue " << i << " is empty" << endl;
492  } else {
493  str << " queue " << i << " " << queue.size() << " tasks:" << endl;
494  for ( QList<Task>::const_iterator it = queue.begin(); it != queue.end(); ++it ) {
495  str << " " << (*it) << endl;
496  }
497  }
498  }
499  return ret;
500 }
501 
502 void ResourceScheduler::clear()
503 {
504  kDebug() << "Clearing ResourceScheduler queues:";
505  for ( int i = 0; i < NQueueCount; ++i ) {
506  TaskList& queue = mTaskList[i];
507  queue.clear();
508  }
509  mCurrentTask = Task();
510  mCurrentTasksQueue = -1;
511 }
512 
513 void Akonadi::ResourceScheduler::cancelQueues()
514 {
515  for ( int i = 0; i < NQueueCount; ++i ) {
516  TaskList& queue = mTaskList[i];
517  if ( s_resourcetracker ) {
518  foreach ( const Task &t, queue ) {
519  QList<QVariant> argumentList;
520  argumentList << QString::number( t.serial ) << QString();
521  s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobEnded" ), argumentList);
522  }
523  }
524  queue.clear();
525  }
526 }
527 
528 static const char s_taskTypes[][27] = {
529  "Invalid (no task)",
530  "SyncAll",
531  "SyncCollectionTree",
532  "SyncCollection",
533  "SyncCollectionAttributes",
534  "FetchItem",
535  "ChangeReplay",
536  "RecursiveMoveReplay",
537  "DeleteResourceCollection",
538  "InvalideCacheForCollection",
539  "SyncAllDone",
540  "SyncCollectionTreeDone",
541  "Custom"
542 };
543 
544 QTextStream& Akonadi::operator<<( QTextStream& d, const ResourceScheduler::Task& task )
545 {
546  d << task.serial << " " << s_taskTypes[task.type] << " ";
547  if ( task.type != ResourceScheduler::Invalid ) {
548  if ( task.collection.isValid() )
549  d << "collection " << task.collection.id() << " ";
550  if ( task.item.id() != -1 )
551  d << "item " << task.item.id() << " ";
552  if ( !task.methodName.isEmpty() )
553  d << task.methodName << " " << task.argument.toString();
554  }
555  return d;
556 }
557 
558 QDebug Akonadi::operator<<( QDebug d, const ResourceScheduler::Task& task )
559 {
560  QString s;
561  QTextStream str( &s );
562  str << task;
563  d << s;
564  return d;
565 }
566 
567 //@endcond
568 
569 #include "moc_resourcescheduler_p.cpp"
Akonadi::RecursiveMover
Helper class for expanding inter-resource collection moves inside ResourceBase.
Definition: recursivemover_p.h:37
Akonadi::Collection
Represents a collection of PIM items.
Definition: collection.h:75
Akonadi::ResourceBase::SchedulePriority
SchedulePriority
Describes the scheduling priority of a task that has been queued for execution.
Definition: resourcebase.h:602
Akonadi::ResourceBase::AfterChangeReplay
The task is scheduled after the last ChangeReplay task in the queue.
Definition: resourcebase.h:604
Akonadi::Entity::id
Id id() const
Returns the unique identifier of the entity.
Definition: entity.cpp:72
Akonadi::ResourceBase::Prepend
The task will be executed as soon as the current task has finished.
Definition: resourcebase.h:603
Akonadi::Entity::isValid
bool isValid() const
Returns whether the entity is valid.
Definition: entity.cpp:97
Akonadi::AgentBase::Idle
The agent does currently nothing.
Definition: agentbase.h:364
This file is part of the KDE documentation.
Documentation copyright © 1996-2014 The KDE developers.
Generated on Tue Oct 14 2014 23:00:27 by doxygen 1.8.7 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.

akonadi

Skip menu "akonadi"
  • Main Page
  • Namespace List
  • Namespace Members
  • Alphabetical List
  • Class List
  • Class Hierarchy
  • Class Members
  • File List
  • Modules
  • Related Pages

kdepimlibs API Reference

Skip menu "kdepimlibs API Reference"
  • akonadi
  •   contact
  •   kmime
  •   socialutils
  • kabc
  • kalarmcal
  • kblog
  • kcal
  • kcalcore
  • kcalutils
  • kholidays
  • kimap
  • kldap
  • kmbox
  • kmime
  • kpimidentities
  • kpimtextedit
  • kresources
  • ktnef
  • kxmlrpcclient
  • microblog

Search



Report problems with this website to our bug tracking system.
Contact the specific authors with questions and comments about the page contents.

KDE® and the K Desktop Environment® logo are registered trademarks of KDE e.V. | Legal