• Skip to content
  • Skip to link menu
KDE API Reference
  • KDE API Reference
  • kdepimlibs API Reference
  • KDE Home
  • Contact Us
 

akonadi

  • sources
  • kde-4.14
  • kdepimlibs
  • akonadi
resourcescheduler.cpp
1 /*
2  Copyright (c) 2007 Volker Krause <vkrause@kde.org>
3 
4  This library is free software; you can redistribute it and/or modify it
5  under the terms of the GNU Library General Public License as published by
6  the Free Software Foundation; either version 2 of the License, or (at your
7  option) any later version.
8 
9  This library is distributed in the hope that it will be useful, but WITHOUT
10  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
12  License for more details.
13 
14  You should have received a copy of the GNU Library General Public License
15  along with this library; see the file COPYING.LIB. If not, write to the
16  Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17  02110-1301, USA.
18 */
19 
20 #include "resourcescheduler_p.h"
21 
22 #include "dbusconnectionpool.h"
23 #include "recursivemover_p.h"
24 
25 #include <kdebug.h>
26 #include <klocalizedstring.h>
27 
28 #include <QtCore/QTimer>
29 #include <QtDBus/QDBusInterface>
30 #include <QtDBus/QDBusConnectionInterface>
31 #include <boost/graph/graph_concepts.hpp>
32 
33 using namespace Akonadi;
34 
35 qint64 ResourceScheduler::Task::latestSerial = 0;
36 static QDBusAbstractInterface *s_resourcetracker = 0;
37 
38 //@cond PRIVATE
39 
40 ResourceScheduler::ResourceScheduler( QObject *parent ) :
41  QObject( parent ),
42  mCurrentTasksQueue( -1 ),
43  mOnline( false )
44 {
45 }
46 
47 void ResourceScheduler::scheduleFullSync()
48 {
49  Task t;
50  t.type = SyncAll;
51  TaskList& queue = queueForTaskType( t.type );
52  if ( queue.contains( t ) || mCurrentTask == t )
53  return;
54  queue << t;
55  signalTaskToTracker( t, "SyncAll" );
56  scheduleNext();
57 }
58 
59 void ResourceScheduler::scheduleCollectionTreeSync()
60 {
61  Task t;
62  t.type = SyncCollectionTree;
63  TaskList& queue = queueForTaskType( t.type );
64  if ( queue.contains( t ) || mCurrentTask == t )
65  return;
66  queue << t;
67  signalTaskToTracker( t, "SyncCollectionTree" );
68  scheduleNext();
69 }
70 
71 void ResourceScheduler::scheduleSync(const Collection & col)
72 {
73  Task t;
74  t.type = SyncCollection;
75  t.collection = col;
76  TaskList& queue = queueForTaskType( t.type );
77  if ( queue.contains( t ) || mCurrentTask == t )
78  return;
79  queue << t;
80  signalTaskToTracker( t, "SyncCollection", QString::number( col.id() ) );
81  scheduleNext();
82 }
83 
84 void ResourceScheduler::scheduleAttributesSync( const Collection &collection )
85 {
86  Task t;
87  t.type = SyncCollectionAttributes;
88  t.collection = collection;
89 
90  TaskList& queue = queueForTaskType( t.type );
91  if ( queue.contains( t ) || mCurrentTask == t )
92  return;
93  queue << t;
94  signalTaskToTracker( t, "SyncCollectionAttributes", QString::number( collection.id() ) );
95  scheduleNext();
96 }
97 
98 void ResourceScheduler::scheduleItemFetch(const Item & item, const QSet<QByteArray> &parts, const QDBusMessage & msg)
99 {
100  Task t;
101  t.type = FetchItem;
102  t.item = item;
103  t.itemParts = parts;
104 
105  // if the current task does already fetch the requested item, break here but
106  // keep the dbus message, so we can send the reply later on
107  if ( mCurrentTask == t ) {
108  mCurrentTask.dbusMsgs << msg;
109  return;
110  }
111 
112  // If this task is already in the queue, merge with it.
113  TaskList& queue = queueForTaskType( t.type );
114  const int idx = queue.indexOf( t );
115  if ( idx != -1 ) {
116  queue[ idx ].dbusMsgs << msg;
117  return;
118  }
119 
120  t.dbusMsgs << msg;
121  queue << t;
122  signalTaskToTracker( t, "FetchItem", QString::number( item.id() ) );
123  scheduleNext();
124 }
125 
126 void ResourceScheduler::scheduleResourceCollectionDeletion()
127 {
128  Task t;
129  t.type = DeleteResourceCollection;
130  TaskList& queue = queueForTaskType( t.type );
131  if ( queue.contains( t ) || mCurrentTask == t )
132  return;
133  queue << t;
134  signalTaskToTracker( t, "DeleteResourceCollection" );
135  scheduleNext();
136 }
137 
138 void ResourceScheduler::scheduleCacheInvalidation( const Collection &collection )
139 {
140  Task t;
141  t.type = InvalideCacheForCollection;
142  t.collection = collection;
143  TaskList& queue = queueForTaskType( t.type );
144  if ( queue.contains( t ) || mCurrentTask == t )
145  return;
146  queue << t;
147  signalTaskToTracker( t, "InvalideCacheForCollection", QString::number( collection.id() ) );
148  scheduleNext();
149 }
150 
151 void ResourceScheduler::scheduleChangeReplay()
152 {
153  Task t;
154  t.type = ChangeReplay;
155  TaskList& queue = queueForTaskType( t.type );
156  // see ResourceBase::changeProcessed() for why we do not check for mCurrentTask == t here like in the other tasks
157  if ( queue.contains( t ) )
158  return;
159  queue << t;
160  signalTaskToTracker( t, "ChangeReplay" );
161  scheduleNext();
162 }
163 
164 void ResourceScheduler::scheduleMoveReplay( const Collection &movedCollection, RecursiveMover *mover )
165 {
166  Task t;
167  t.type = RecursiveMoveReplay;
168  t.collection = movedCollection;
169  t.argument = QVariant::fromValue( mover );
170  TaskList &queue = queueForTaskType( t.type );
171 
172  if ( queue.contains( t ) || mCurrentTask == t )
173  return;
174 
175  queue << t;
176  signalTaskToTracker( t, "RecursiveMoveReplay", QString::number( t.collection.id() ) );
177  scheduleNext();
178 }
179 
180 void Akonadi::ResourceScheduler::scheduleFullSyncCompletion()
181 {
182  Task t;
183  t.type = SyncAllDone;
184  TaskList& queue = queueForTaskType( t.type );
185  // no compression here, all this does is emitting a D-Bus signal anyway, and compression can trigger races on the receiver side with the signal being lost
186  queue << t;
187  signalTaskToTracker( t, "SyncAllDone" );
188  scheduleNext();
189 }
190 
191 void Akonadi::ResourceScheduler::scheduleCollectionTreeSyncCompletion()
192 {
193  Task t;
194  t.type = SyncCollectionTreeDone;
195  TaskList& queue = queueForTaskType( t.type );
196  // no compression here, all this does is emitting a D-Bus signal anyway, and compression can trigger races on the receiver side with the signal being lost
197  queue << t;
198  signalTaskToTracker( t, "SyncCollectionTreeDone" );
199  scheduleNext();
200 }
201 
202 void Akonadi::ResourceScheduler::scheduleCustomTask( QObject *receiver, const char* methodName, const QVariant &argument, ResourceBase::SchedulePriority priority )
203 {
204  Task t;
205  t.type = Custom;
206  t.receiver = receiver;
207  t.methodName = methodName;
208  t.argument = argument;
209  QueueType queueType = GenericTaskQueue;
210  if ( priority == ResourceBase::AfterChangeReplay )
211  queueType = AfterChangeReplayQueue;
212  else if ( priority == ResourceBase::Prepend )
213  queueType = PrependTaskQueue;
214  TaskList& queue = mTaskList[ queueType ];
215 
216  if ( queue.contains( t ) )
217  return;
218 
219  switch (priority) {
220  case ResourceBase::Prepend:
221  queue.prepend( t );
222  break;
223  default:
224  queue.append(t);
225  break;
226  }
227 
228  signalTaskToTracker( t, "Custom-" + t.methodName );
229  scheduleNext();
230 }
231 
232 void ResourceScheduler::taskDone()
233 {
234  if ( isEmpty() )
235  emit status( AgentBase::Idle, i18nc( "@info:status Application ready for work", "Ready" ) );
236 
237  if ( s_resourcetracker ) {
238  QList<QVariant> argumentList;
239  argumentList << QString::number( mCurrentTask.serial )
240  << QString();
241  s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobEnded" ), argumentList);
242  }
243 
244  mCurrentTask = Task();
245  mCurrentTasksQueue = -1;
246  scheduleNext();
247 }
248 
249 void ResourceScheduler::deferTask()
250 {
251  if ( mCurrentTask.type == Invalid )
252  return;
253 
254  if ( s_resourcetracker ) {
255  QList<QVariant> argumentList;
256  argumentList << QString::number( mCurrentTask.serial )
257  << QString();
258  s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobEnded" ), argumentList);
259  }
260 
261  Task t = mCurrentTask;
262  mCurrentTask = Task();
263 
264  Q_ASSERT( mCurrentTasksQueue >= 0 && mCurrentTasksQueue < NQueueCount );
265  mTaskList[mCurrentTasksQueue].prepend( t );
266  mCurrentTasksQueue = -1;
267 
268  signalTaskToTracker( t, "DeferedTask" );
269 
270  scheduleNext();
271 }
272 
273 bool ResourceScheduler::isEmpty()
274 {
275  for ( int i = 0; i < NQueueCount; ++i ) {
276  if ( !mTaskList[i].isEmpty() )
277  return false;
278  }
279  return true;
280 }
281 
282 void ResourceScheduler::scheduleNext()
283 {
284  if ( mCurrentTask.type != Invalid || isEmpty() || !mOnline )
285  return;
286  QTimer::singleShot( 0, this, SLOT(executeNext()) );
287 }
288 
289 void ResourceScheduler::executeNext()
290 {
291  if ( mCurrentTask.type != Invalid || isEmpty() )
292  return;
293 
294  for ( int i = 0; i < NQueueCount; ++i ) {
295  if ( !mTaskList[ i ].isEmpty() ) {
296  mCurrentTask = mTaskList[ i ].takeFirst();
297  mCurrentTasksQueue = i;
298  break;
299  }
300  }
301 
302  if ( s_resourcetracker ) {
303  QList<QVariant> argumentList;
304  argumentList << QString::number( mCurrentTask.serial );
305  s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobStarted" ), argumentList);
306  }
307 
308  switch ( mCurrentTask.type ) {
309  case SyncAll:
310  emit executeFullSync();
311  break;
312  case SyncCollectionTree:
313  emit executeCollectionTreeSync();
314  break;
315  case SyncCollection:
316  emit executeCollectionSync( mCurrentTask.collection );
317  break;
318  case SyncCollectionAttributes:
319  emit executeCollectionAttributesSync( mCurrentTask.collection );
320  break;
321  case FetchItem:
322  emit executeItemFetch( mCurrentTask.item, mCurrentTask.itemParts );
323  break;
324  case DeleteResourceCollection:
325  emit executeResourceCollectionDeletion();
326  break;
327  case InvalideCacheForCollection:
328  emit executeCacheInvalidation( mCurrentTask.collection );
329  break;
330  case ChangeReplay:
331  emit executeChangeReplay();
332  break;
333  case RecursiveMoveReplay:
334  emit executeRecursiveMoveReplay( mCurrentTask.argument.value<RecursiveMover*>() );
335  break;
336  case SyncAllDone:
337  emit fullSyncComplete();
338  break;
339  case SyncCollectionTreeDone:
340  emit collectionTreeSyncComplete();
341  break;
342  case Custom:
343  {
344  const QByteArray methodSig = mCurrentTask.methodName + "(QVariant)";
345  const bool hasSlotWithVariant = mCurrentTask.receiver->metaObject()->indexOfMethod(methodSig) != -1;
346  bool success = false;
347  if ( hasSlotWithVariant ) {
348  success = QMetaObject::invokeMethod( mCurrentTask.receiver, mCurrentTask.methodName, Q_ARG(QVariant, mCurrentTask.argument) );
349  Q_ASSERT_X( success || !mCurrentTask.argument.isValid(), "ResourceScheduler::executeNext", "Valid argument was provided but the method wasn't found" );
350  }
351  if ( !success )
352  success = QMetaObject::invokeMethod( mCurrentTask.receiver, mCurrentTask.methodName );
353 
354  if ( !success )
355  kError() << "Could not invoke slot" << mCurrentTask.methodName << "on" << mCurrentTask.receiver << "with argument" << mCurrentTask.argument;
356  break;
357  }
358  default: {
359  kError() << "Unhandled task type" << mCurrentTask.type;
360  dump();
361  Q_ASSERT( false );
362  }
363  }
364 }
365 
366 ResourceScheduler::Task ResourceScheduler::currentTask() const
367 {
368  return mCurrentTask;
369 }
370 
371 void ResourceScheduler::setOnline(bool state)
372 {
373  if ( mOnline == state )
374  return;
375  mOnline = state;
376  if ( mOnline ) {
377  scheduleNext();
378  } else {
379  if ( mCurrentTask.type != Invalid ) {
380  // abort running task
381  queueForTaskType( mCurrentTask.type ).prepend( mCurrentTask );
382  mCurrentTask = Task();
383  mCurrentTasksQueue = -1;
384  }
385  // abort pending synchronous tasks, might take longer until the resource goes online again
386  TaskList& itemFetchQueue = queueForTaskType( FetchItem );
387  for ( QList< Task >::iterator it = itemFetchQueue.begin(); it != itemFetchQueue.end(); ) {
388  if ( (*it).type == FetchItem ) {
389  (*it).sendDBusReplies( i18nc( "@info", "Job canceled." ) );
390  it = itemFetchQueue.erase( it );
391  if ( s_resourcetracker ) {
392  QList<QVariant> argumentList;
393  argumentList << QString::number( mCurrentTask.serial )
394  << i18nc( "@info", "Job canceled." );
395  s_resourcetracker->asyncCallWithArgumentList( QLatin1String( "jobEnded" ), argumentList );
396  }
397  } else {
398  ++it;
399  }
400  }
401  }
402 }
403 
404 void ResourceScheduler::signalTaskToTracker( const Task &task, const QByteArray &taskType, const QString &debugString )
405 {
406  // if there's a job tracer running, tell it about the new job
407  if ( !s_resourcetracker && DBusConnectionPool::threadConnection().interface()->isServiceRegistered(QLatin1String( "org.kde.akonadiconsole" ) ) ) {
408  s_resourcetracker = new QDBusInterface( QLatin1String( "org.kde.akonadiconsole" ),
409  QLatin1String( "/resourcesJobtracker" ),
410  QLatin1String( "org.freedesktop.Akonadi.JobTracker" ),
411  DBusConnectionPool::threadConnection(), 0 );
412  }
413 
414  if ( s_resourcetracker ) {
415  QList<QVariant> argumentList;
416  argumentList << static_cast<AgentBase*>( parent() )->identifier() // "session" (in our case resource)
417  << QString::number( task.serial ) // "job"
418  << QString() // "parent job"
419  << QString::fromLatin1( taskType ) // "job type"
420  << debugString // "job debugging string"
421  ;
422  s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobCreated" ), argumentList);
423  }
424 }
425 
426 void ResourceScheduler::collectionRemoved( const Akonadi::Collection &collection )
427 {
428  if ( !collection.isValid() ) // should not happen, but you never know...
429  return;
430  TaskList& queue = queueForTaskType( SyncCollection );
431  for ( QList<Task>::iterator it = queue.begin(); it != queue.end(); ) {
432  if ( (*it).type == SyncCollection && (*it).collection == collection ) {
433  it = queue.erase( it );
434  kDebug() << " erasing";
435  } else
436  ++it;
437  }
438 }
439 
440 void ResourceScheduler::Task::sendDBusReplies( const QString &errorMsg )
441 {
442  Q_FOREACH( const QDBusMessage &msg, dbusMsgs ) {
443  QDBusMessage reply( msg.createReply() );
444  const QString methodName = msg.member();
445  if (methodName == QLatin1String("requestItemDelivery")) {
446  reply << errorMsg.isEmpty();
447  } else if (methodName == QLatin1String("requestItemDeliveryV2")) {
448  reply << errorMsg;
449  } else if (methodName.isEmpty()) {
450  continue; // unittest calls scheduleItemFetch with empty QDBusMessage
451  } else {
452  kFatal() << "Got unexpected member:" << methodName;
453  }
454  DBusConnectionPool::threadConnection().send( reply );
455  }
456 }
457 
458 ResourceScheduler::QueueType ResourceScheduler::queueTypeForTaskType( TaskType type )
459 {
460  switch ( type ) {
461  case ChangeReplay:
462  case RecursiveMoveReplay:
463  return ChangeReplayQueue;
464  case FetchItem:
465  case SyncCollectionAttributes:
466  return UserActionQueue;
467  default:
468  return GenericTaskQueue;
469  }
470 }
471 
472 ResourceScheduler::TaskList& ResourceScheduler::queueForTaskType( TaskType type )
473 {
474  const QueueType qt = queueTypeForTaskType( type );
475  return mTaskList[ qt ];
476 }
477 
478 void ResourceScheduler::dump()
479 {
480  kDebug() << dumpToString();
481 }
482 
483 QString ResourceScheduler::dumpToString() const
484 {
485  QString ret;
486  QTextStream str( &ret );
487  str << "ResourceScheduler: " << (mOnline?"Online":"Offline") << endl;
488  str << " current task: " << mCurrentTask << endl;
489  for ( int i = 0; i < NQueueCount; ++i ) {
490  const TaskList& queue = mTaskList[i];
491  if (queue.isEmpty()) {
492  str << " queue " << i << " is empty" << endl;
493  } else {
494  str << " queue " << i << " " << queue.size() << " tasks:" << endl;
495  for ( QList<Task>::const_iterator it = queue.begin(); it != queue.end(); ++it ) {
496  str << " " << (*it) << endl;
497  }
498  }
499  }
500  return ret;
501 }
502 
503 void ResourceScheduler::clear()
504 {
505  kDebug() << "Clearing ResourceScheduler queues:";
506  for ( int i = 0; i < NQueueCount; ++i ) {
507  TaskList& queue = mTaskList[i];
508  queue.clear();
509  }
510  mCurrentTask = Task();
511  mCurrentTasksQueue = -1;
512 }
513 
514 void Akonadi::ResourceScheduler::cancelQueues()
515 {
516  for ( int i = 0; i < NQueueCount; ++i ) {
517  TaskList& queue = mTaskList[i];
518  if ( s_resourcetracker ) {
519  foreach ( const Task &t, queue ) {
520  QList<QVariant> argumentList;
521  argumentList << QString::number( t.serial ) << QString();
522  s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobEnded" ), argumentList);
523  }
524  }
525  queue.clear();
526  }
527 }
528 
529 static const char s_taskTypes[][27] = {
530  "Invalid (no task)",
531  "SyncAll",
532  "SyncCollectionTree",
533  "SyncCollection",
534  "SyncCollectionAttributes",
535  "FetchItem",
536  "ChangeReplay",
537  "RecursiveMoveReplay",
538  "DeleteResourceCollection",
539  "InvalideCacheForCollection",
540  "SyncAllDone",
541  "SyncCollectionTreeDone",
542  "Custom"
543 };
544 
545 QTextStream& Akonadi::operator<<( QTextStream& d, const ResourceScheduler::Task& task )
546 {
547  d << task.serial << " " << s_taskTypes[task.type] << " ";
548  if ( task.type != ResourceScheduler::Invalid ) {
549  if ( task.collection.isValid() )
550  d << "collection " << task.collection.id() << " ";
551  if ( task.item.id() != -1 )
552  d << "item " << task.item.id() << " ";
553  if ( !task.methodName.isEmpty() )
554  d << task.methodName << " " << task.argument.toString();
555  }
556  return d;
557 }
558 
559 QDebug Akonadi::operator<<( QDebug d, const ResourceScheduler::Task& task )
560 {
561  QString s;
562  QTextStream str( &s );
563  str << task;
564  d << s;
565  return d;
566 }
567 
568 //@endcond
569 
570 #include "moc_resourcescheduler_p.cpp"
Akonadi::RecursiveMover
Helper class for expanding inter-resource collection moves inside ResourceBase.
Definition: recursivemover_p.h:37
QByteArray
Akonadi::Collection
Represents a collection of PIM items.
Definition: collection.h:75
QDBusMessage::member
QString member() const
QList::const_iterator
QTextStream
QString::clear
void clear()
Akonadi::ResourceBase::SchedulePriority
SchedulePriority
Describes the scheduling priority of a task that has been queued for execution.
Definition: resourcebase.h:704
QString::number
QString number(int n, int base)
QDBusMessage::createReply
QDBusMessage createReply(const QList< QVariant > &arguments) const
QObject
QString::isEmpty
bool isEmpty() const
Akonadi::ResourceBase::AfterChangeReplay
The task is scheduled after the last ChangeReplay task in the queue.
Definition: resourcebase.h:706
QSet< QByteArray >
QString
QList
Akonadi::Entity::id
Id id() const
Returns the unique identifier of the entity.
Definition: entity.cpp:72
QList::iterator
QDBusInterface
QVariant::fromValue
QVariant fromValue(const T &value)
QMetaObject::invokeMethod
bool invokeMethod(QObject *obj, const char *member, Qt::ConnectionType type, QGenericReturnArgument ret, QGenericArgument val0, QGenericArgument val1, QGenericArgument val2, QGenericArgument val3, QGenericArgument val4, QGenericArgument val5, QGenericArgument val6, QGenericArgument val7, QGenericArgument val8, QGenericArgument val9)
QDebug
QDBusMessage
QLatin1String
QDBusAbstractInterface::asyncCallWithArgumentList
QDBusPendingCall asyncCallWithArgumentList(const QString &method, const QList< QVariant > &args)
QString::fromLatin1
QString fromLatin1(const char *str, int size)
Akonadi::ResourceBase::Prepend
The task will be executed as soon as the current task has finished.
Definition: resourcebase.h:705
QDBusAbstractInterface
Akonadi::Entity::isValid
bool isValid() const
Returns whether the entity is valid.
Definition: entity.cpp:97
Akonadi::AgentBase::Idle
The agent does currently nothing.
Definition: agentbase.h:412
QTimer::singleShot
singleShot
QVariant
This file is part of the KDE documentation.
Documentation copyright © 1996-2020 The KDE developers.
Generated on Mon Jun 22 2020 13:38:03 by doxygen 1.8.7 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.

akonadi

Skip menu "akonadi"
  • Main Page
  • Namespace List
  • Namespace Members
  • Alphabetical List
  • Class List
  • Class Hierarchy
  • Class Members
  • File List
  • Modules
  • Related Pages

kdepimlibs API Reference

Skip menu "kdepimlibs API Reference"
  • akonadi
  •   contact
  •   kmime
  •   socialutils
  • kabc
  • kalarmcal
  • kblog
  • kcal
  • kcalcore
  • kcalutils
  • kholidays
  • kimap
  • kioslave
  •   imap4
  •   mbox
  •   nntp
  • kldap
  • kmbox
  • kmime
  • kontactinterface
  • kpimidentities
  • kpimtextedit
  • kpimutils
  • kresources
  • ktnef
  • kxmlrpcclient
  • mailtransport
  • microblog
  • qgpgme
  • syndication
  •   atom
  •   rdf
  •   rss2

Search



Report problems with this website to our bug tracking system.
Contact the specific authors with questions and comments about the page contents.

KDE® and the K Desktop Environment® logo are registered trademarks of KDE e.V. | Legal