Akonadi

preprocessormanager.cpp
1 /******************************************************************************
2  *
3  * File : preprocessormanager.cpp
4  * Creation date : Sat 18 Jul 2009 01:58:50
5  *
6  * SPDX-FileCopyrightText: 2009 Szymon Stefanek <s.stefanek at gmail dot com>
7  *
8  * SPDX-License-Identifier: LGPL-2.0-or-later
9  *
10  *****************************************************************************/
11 
12 #include "preprocessormanager.h"
13 #include "akonadiserver_debug.h"
14 
15 #include "akonadi.h"
16 #include "entities.h" // Akonadi::Server::PimItem
17 #include "storage/datastore.h"
18 #include "tracer.h"
19 
20 #include "preprocessormanageradaptor.h"
21 
22 namespace Akonadi
23 {
24 namespace Server
25 {
26 
27 const int gHeartbeatTimeoutInMSecs = 30000; // 30 sec heartbeat
28 
29 // 2 minutes should be really enough to process an item.
30 // After this timeout elapses we assume that the preprocessor
31 // is "stuck" and we attempt to kick it by requesting an abort().
32 const int gWarningItemProcessingTimeInSecs = 120;
33 // After 3 minutes, if the preprocessor is still "stuck" then
34 // we attempt to restart it via AgentManager....
35 const int gMaximumItemProcessingTimeInSecs = 180;
36 // After 4 minutes, if the preprocessor is still "stuck" then
37 // we assume it's dead and just drop it's interface.
38 const int gDeadlineItemProcessingTimeInSecs = 240;
39 
40 } // namespace Server
41 } // namespace Akonadi
42 
43 using namespace Akonadi::Server;
44 
45 // The one and only PreprocessorManager object
47  : mEnabled(true)
48  , mTracer(tracer)
49 {
50  // Hook in our D-Bus interface "shell".
51  new PreprocessorManagerAdaptor(this);
52 
54  QStringLiteral("/PreprocessorManager"),
55  this,
57 
58  mHeartbeatTimer = new QTimer(this);
59 
60  QObject::connect(mHeartbeatTimer, &QTimer::timeout, this, &PreprocessorManager::heartbeat);
61 
62  mHeartbeatTimer->start(gHeartbeatTimeoutInMSecs);
63 }
64 
66 {
68 
69  // FIXME: Explicitly interrupt pre-processing here ?
70  // Pre-Processors should auto-protect themselves from re-processing an item:
71  // they are "closer to the DB" from this point of view.
72 
73  qDeleteAll(mPreprocessorChain);
74  qDeleteAll(mTransactionWaitQueueHash); // this should also disconnect all the signals from the data store objects...
75 }
76 
78 {
79  QMutexLocker locker(&mMutex);
80 
81  if (!mEnabled) {
82  return false;
83  }
84  return !mPreprocessorChain.isEmpty();
85 }
86 
87 PreprocessorInstance *PreprocessorManager::lockedFindInstance(const QString &id)
88 {
89  for (PreprocessorInstance *instance : qAsConst(mPreprocessorChain)) {
90  if (instance->id() == id) {
91  return instance;
92  }
93  }
94 
95  return nullptr;
96 }
97 
99 {
100  QMutexLocker locker(&mMutex);
101 
102  qCDebug(AKONADISERVER_LOG) << "PreprocessorManager::registerInstance(" << id << ")";
103 
104  PreprocessorInstance *instance = lockedFindInstance(id);
105  if (instance) {
106  return; // already registered
107  }
108 
109  // The PreprocessorInstance objects are actually always added at the end of the queue
110  // TODO: Maybe we need some kind of ordering here ?
111  // In that case we'll need to fiddle with the items that are currently enqueued for processing...
112 
113  instance = new PreprocessorInstance(id, *this, mTracer);
114  if (!instance->init()) {
115  mTracer.warning(
116  QStringLiteral("PreprocessorManager"),
117  QStringLiteral("Could not initialize preprocessor instance '%1'")
118  .arg(id));
119  delete instance;
120  return;
121  }
122 
123  qCDebug(AKONADISERVER_LOG) << "Registering preprocessor instance " << id;
124 
125  mPreprocessorChain.append(instance);
126 }
127 
129 {
130  QMutexLocker locker(&mMutex);
131 
132  qCDebug(AKONADISERVER_LOG) << "PreprocessorManager::unregisterInstance(" << id << ")";
133 
134  lockedUnregisterInstance(id);
135 }
136 
137 void PreprocessorManager::lockedUnregisterInstance(const QString &id)
138 {
139  PreprocessorInstance *instance = lockedFindInstance(id);
140  if (!instance) {
141  return; // not our instance: don't complain (as we might be called for non-preprocessor agents too)
142  }
143 
144  // All of the preprocessor's waiting items must be queued to the next preprocessor (if there is one)
145 
146  std::deque< qint64 > *itemList = instance->itemQueue();
147  Q_ASSERT(itemList);
148 
149  int idx = mPreprocessorChain.indexOf(instance);
150  Q_ASSERT(idx >= 0); // must be there!
151 
152  if (idx < (mPreprocessorChain.count() - 1)) {
153  // This wasn't the last preprocessor: trigger the next one.
154  PreprocessorInstance *nextPreprocessor = mPreprocessorChain[idx + 1];
155  Q_ASSERT(nextPreprocessor);
156  Q_ASSERT(nextPreprocessor != instance);
157 
158  for (qint64 itemId : *itemList) {
159  nextPreprocessor->enqueueItem(itemId);
160  }
161  } else {
162  // This was the last preprocessor: end handling the items
163  for (qint64 itemId : *itemList) {
164  lockedEndHandleItem(itemId);
165  }
166  }
167 
168  mPreprocessorChain.removeOne(instance);
169  delete instance;
170 }
171 
172 void PreprocessorManager::beginHandleItem(const PimItem &item, const DataStore *dataStore)
173 {
174  Q_ASSERT(dataStore);
175  Q_ASSERT(item.isValid());
176 
177  // This is the entry point of the pre-processing chain.
178  QMutexLocker locker(&mMutex);
179 
180  if (!mEnabled) {
181  // Preprocessing is disabled: immediately end handling the item.
182  // In fact we shouldn't even be here as the caller should
183  // have checked isActive() before calling this function.
184  // However, since setEnabled() may be called concurrently
185  // then this might not be the caller's fault. Just drop a warning.
186 
187  qCWarning(AKONADISERVER_LOG) << "PreprocessorManager::beginHandleItem(" << item.id() << ") called with a disabled preprocessor";
188 
189  lockedEndHandleItem(item.id());
190  return;
191  }
192 
193 #if 0
194  // Now the hidden flag is stored as a part.. too hard to assert its existence :D
195  Q_ASSERT_X(item.hidden(), "PreprocessorManager::beginHandleItem()", "The item you pass to this function should be hidden!");
196 #endif
197 
198  if (mPreprocessorChain.isEmpty()) {
199  // No preprocessors at all: immediately end handling the item.
200  lockedEndHandleItem(item.id());
201  return;
202  }
203 
204  if (dataStore->inTransaction()) {
205  qCDebug(AKONADISERVER_LOG) << "PreprocessorManager::beginHandleItem(" << item.id() << "): the DataStore is in transaction, pushing item to a wait queue";
206 
207  // The calling thread data store is in a transaction: push the item into a wait queue
208  std::deque< qint64 > *waitQueue = mTransactionWaitQueueHash.value(dataStore, nullptr);
209 
210  if (!waitQueue) {
211  // No wait queue for this transaction yet...
212  waitQueue = new std::deque< qint64 >();
213 
214  mTransactionWaitQueueHash.insert(dataStore, waitQueue);
215 
216  // This will usually end up being a queued connection.
217  QObject::connect(dataStore, &QObject::destroyed, this, &PreprocessorManager::dataStoreDestroyed);
218  QObject::connect(dataStore, &DataStore::transactionCommitted, this, &PreprocessorManager::dataStoreTransactionCommitted);
219  QObject::connect(dataStore, &DataStore::transactionRolledBack, this, &PreprocessorManager::dataStoreTransactionRolledBack);
220  }
221 
222  waitQueue->push_back(item.id());
223 
224  // nothing more to do here
225  return;
226  }
227 
228  // The calling thread data store is NOT in a transaction: we can proceed directly.
229  lockedActivateFirstPreprocessor(item.id());
230 }
231 
232 void PreprocessorManager::lockedActivateFirstPreprocessor(qint64 itemId)
233 {
234  // Activate the first preprocessor.
235  PreprocessorInstance *preProcessor = mPreprocessorChain.first();
236  Q_ASSERT(preProcessor);
237 
238  preProcessor->enqueueItem(itemId);
239  // The preprocessor will call our "preProcessorFinishedHandlingItem() method"
240  // when done with the item.
241  //
242  // The call should be asynchronous, that is it should never happen that
243  // preProcessorFinishedHandlingItem() is called from "inside" enqueueItem()...
244  // FIXME: Am I *really* sure of this ? If I'm wrong for some obscure reason then we have a deadlock.
245 }
246 
247 void PreprocessorManager::lockedKillWaitQueue(const DataStore *dataStore, bool disconnectSlots)
248 {
249  std::deque< qint64 > *waitQueue = mTransactionWaitQueueHash.value(dataStore, nullptr);
250  if (!waitQueue) {
251  qCWarning(AKONADISERVER_LOG) << "PreprocessorManager::lockedKillWaitQueue(): called for dataStore which has no wait queue";
252  return;
253  }
254 
255  mTransactionWaitQueueHash.remove(dataStore);
256 
257  delete waitQueue;
258 
259  if (!disconnectSlots) {
260  return;
261  }
262 
263  QObject::disconnect(dataStore, &QObject::destroyed, this, &PreprocessorManager::dataStoreDestroyed);
264  QObject::disconnect(dataStore, &DataStore::transactionCommitted, this, &PreprocessorManager::dataStoreTransactionCommitted);
265  QObject::disconnect(dataStore, &DataStore::transactionRolledBack, this, &PreprocessorManager::dataStoreTransactionRolledBack);
266 
267 }
268 
269 void PreprocessorManager::dataStoreDestroyed()
270 {
271  QMutexLocker locker(&mMutex);
272 
273  qCDebug(AKONADISERVER_LOG) << "PreprocessorManager::dataStoreDestroyed(): killing the wait queue";
274 
275  const DataStore *dataStore = dynamic_cast< const DataStore *>(sender());
276  if (!dataStore) {
277  qCWarning(AKONADISERVER_LOG) << "PreprocessorManager::dataStoreDestroyed(): got the signal from a non DataStore object";
278  return;
279  }
280 
281  lockedKillWaitQueue(dataStore, false); // no need to disconnect slots, qt will do that
282 }
283 
284 void PreprocessorManager::dataStoreTransactionCommitted()
285 {
286  QMutexLocker locker(&mMutex);
287 
288  qCDebug(AKONADISERVER_LOG) << "PreprocessorManager::dataStoreTransactionCommitted(): pushing items in wait queue to the preprocessing chain";
289 
290  const DataStore *dataStore = dynamic_cast< const DataStore *>(sender());
291  if (!dataStore) {
292  qCWarning(AKONADISERVER_LOG) << "PreprocessorManager::dataStoreTransactionCommitted(): got the signal from a non DataStore object";
293  return;
294  }
295 
296  std::deque< qint64 > *waitQueue = mTransactionWaitQueueHash.value(dataStore, nullptr);
297  if (!waitQueue) {
298  qCWarning(AKONADISERVER_LOG) << "PreprocessorManager::dataStoreTransactionCommitted(): called for dataStore which has no wait queue";
299  return;
300  }
301 
302  if (!mEnabled || mPreprocessorChain.isEmpty()) {
303  // Preprocessing has been disabled in the meantime or all the preprocessors died
304  for (qint64 id : *waitQueue) {
305  lockedEndHandleItem(id);
306  }
307  } else {
308  for (qint64 id : *waitQueue) {
309  lockedActivateFirstPreprocessor(id);
310  }
311  }
312 
313  lockedKillWaitQueue(dataStore, true); // disconnect slots this time
314 }
315 
316 void PreprocessorManager::dataStoreTransactionRolledBack()
317 {
318  QMutexLocker locker(&mMutex);
319 
320  qCDebug(AKONADISERVER_LOG) << "PreprocessorManager::dataStoreTransactionRolledBack(): killing the wait queue";
321 
322  const DataStore *dataStore = dynamic_cast< const DataStore *>(sender());
323  if (!dataStore) {
324  qCWarning(AKONADISERVER_LOG) << "PreprocessorManager::dataStoreTransactionCommitted(): got the signal from a non DataStore object";
325  return;
326  }
327 
328  lockedKillWaitQueue(dataStore, true); // disconnect slots this time
329 }
330 
332 {
333  QMutexLocker locker(&mMutex);
334 
335  int idx = mPreprocessorChain.indexOf(preProcessor);
336  Q_ASSERT(idx >= 0); // must be there!
337 
338  if (idx < (mPreprocessorChain.count() - 1)) {
339  // This wasn't the last preprocessor: trigger the next one.
340  PreprocessorInstance *nextPreprocessor = mPreprocessorChain[idx + 1];
341  Q_ASSERT(nextPreprocessor);
342  Q_ASSERT(nextPreprocessor != preProcessor);
343 
344  nextPreprocessor->enqueueItem(itemId);
345  } else {
346  // This was the last preprocessor: end handling the item.
347  lockedEndHandleItem(itemId);
348  }
349 }
350 
351 void PreprocessorManager::lockedEndHandleItem(qint64 itemId)
352 {
353  // The exit point of the pre-processing chain.
354 
355  // Refetch the PimItem, the Collection and the MimeType now: preprocessing might have changed them.
356  PimItem item = PimItem::retrieveById(itemId);
357  if (!item.isValid()) {
358  // HUM... the preprocessor killed the item ?
359  // ... or retrieveById() failed ?
360  // Well.. if the preprocessor killed the item then this might be actually OK (spam?).
361  qCDebug(AKONADISERVER_LOG) << "Invalid PIM item id '" << itemId << "' passed to preprocessing chain termination function";
362  return;
363  }
364 
365 #if 0
366  if (!item.hidden()) {
367  // HUM... the item was already unhidden for some reason: we have nothing more to do here.
368  qCDebug(AKONADISERVER_LOG) << "The PIM item with id '" << itemId << "' reached the preprocessing chain termination function in unhidden state";
369  return;
370  }
371 #endif
372 
373  if (!DataStore::self()->unhidePimItem(item)) {
374  mTracer.warning(
375  QStringLiteral("PreprocessorManager"),
376  QStringLiteral("Failed to unhide the PIM item '%1': data is not lost but a server restart is required in order to unhide it")
377  .arg(itemId));
378  }
379 }
380 
381 void PreprocessorManager::heartbeat()
382 {
383  QMutexLocker locker(&mMutex);
384 
385  // Loop through the processor instances and check their current processing time.
386 
387  QList< PreprocessorInstance *> firedPreprocessors;
388 
389  for (PreprocessorInstance *instance : qAsConst(mPreprocessorChain)) {
390  // In this loop we check for "stuck" preprocessors.
391 
392  int elapsedTime = instance->currentProcessingTime();
393 
394  if (elapsedTime < gWarningItemProcessingTimeInSecs) {
395  continue; // ok, still in time.
396  }
397 
398  // Ooops... the preprocessor looks to be "stuck".
399  // This is a rather critical condition and the question is "what we can do about it ?".
400  // The fact is that it doesn't really make sense to push another item for
401  // processing as the slave process is either dead (silently ?) or stuck anyway.
402 
403  // We then proceed as following:
404  // - we first kindly ask the preprocessor to abort the job (via Agent.Control interface)
405  // - if it doesn't obey after some time we attempt to restart it (via AgentManager)
406  // - if it doesn't obey, we drop the interface and assume it's dead until
407  // it's effectively restarted.
408 
409  if (elapsedTime < gMaximumItemProcessingTimeInSecs) {
410  // Kindly ask the preprocessor to abort the job.
411 
412  mTracer.warning(
413  QStringLiteral("PreprocessorManager"),
414  QStringLiteral("Preprocessor '%1' seems to be stuck... trying to abort its job.")
415  .arg(instance->id()));
416 
417  if (instance->abortProcessing()) {
418  continue;
419  }
420  // If we're here then abortProcessing() failed.
421  }
422 
423  if (elapsedTime < gDeadlineItemProcessingTimeInSecs) {
424  // Attempt to restart the preprocessor via AgentManager interface
425 
426  mTracer.warning(
427  QStringLiteral("PreprocessorManager"),
428  QStringLiteral("Preprocessor '%1' is stuck... trying to restart it")
429  .arg(instance->id()));
430 
431  if (instance->invokeRestart()) {
432  continue;
433  }
434  // If we're here then invokeRestart() failed.
435  }
436 
437  mTracer.warning(
438  QStringLiteral("PreprocessorManager"),
439  QStringLiteral("Preprocessor '%1' is broken... ignoring it from now on")
440  .arg(instance->id()));
441 
442  // You're fired! Go Away!
443  firedPreprocessors.append(instance);
444  }
445 
446  // Kill the fired preprocessors, if any.
447  for (PreprocessorInstance *instance : qAsConst(firedPreprocessors)) {
448  lockedUnregisterInstance(instance->id());
449  }
450 }
bool init()
This is called by PreprocessorManager just after the construction in order to connect to the preproce...
void unregisterInstance(const QString &id)
This is called via D-Bus from AgentManager to unregister a preprocessor instance. ...
QObject * sender() const const
void preProcessorFinishedHandlingItem(PreprocessorInstance *preProcessor, qint64 itemId)
This is called by PreprocessorInstance to signal that a certain preprocessor has finished handling an...
bool registerObject(const QString &path, QObject *object, QDBusConnection::RegisterOptions options)
QHash< const DataStore *, std::deque< qint64 > * > mTransactionWaitQueueHash
The hashtable of transaction wait queues.
QDBusConnection sessionBus()
The global tracer instance where all akonadi components can send their tracing information to...
Definition: tracer.h:40
bool disconnect(const QObject *sender, const char *signal, const QObject *receiver, const char *method)
void beginHandleItem(const PimItem &item, const DataStore *dataStore)
Trigger the preprocessor chain for the specified item.
PreprocessorManager(Tracer &tracer)
Creates an instance of PreprocessorManager.
std::deque< qint64 > * itemQueue()
Returns a pointer to the internal preprocessor instance item queue.
bool isActive()
Returns true if preprocessing is active in this Akonadi server.
void transactionCommitted()
Emitted if a transaction has been successfully committed.
void registerInstance(const QString &id)
This is called via D-Bus from AgentManager to register a preprocessor instance.
void timeout()
void append(const T &value)
bool inTransaction() const
Returns true if there is a transaction in progress.
Definition: datastore.cpp:1483
void warning(const QString &componentName, const QString &msg) override
This method is called whenever a component wants to output a warning.
Definition: tracer.cpp:118
A single preprocessor (agent) instance.
~PreprocessorManager()
Destroys the instance of PreprocessorManager and frees all the relevant resources.
void stop()
static DataStore * self()
Per thread singleton.
Definition: datastore.cpp:219
Helper integration between Akonadi and Qt.
QMutex mMutex
The mutex used to protect the internals of this class (mainly the mPreprocessorChain member)...
void enqueueItem(qint64 itemId)
This is called by PreprocessorManager to enqueue a PimItem for processing by this preprocessor instan...
QList< PreprocessorInstance * > mPreprocessorChain
The preprocessor chain.
void transactionRolledBack()
Emitted if a transaction has been aborted.
void start(int msec)
QTimer * mHeartbeatTimer
The heartbeat timer.
This class handles all the database access.
Definition: datastore.h:95
QMetaObject::Connection connect(const QObject *sender, const char *signal, const QObject *receiver, const char *method, Qt::ConnectionType type)
bool mEnabled
Is preprocessing enabled at all in this Akonadi server instance? This is true by default and can be s...
void destroyed(QObject *obj)
This file is part of the KDE documentation.
Documentation copyright © 1996-2020 The KDE developers.
Generated on Sun Jul 12 2020 23:16:57 by doxygen 1.8.11 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.