Akonadi

preprocessormanager.cpp
1 /******************************************************************************
2  *
3  * File : preprocessormanager.cpp
4  * Creation date : Sat 18 Jul 2009 01:58:50
5  *
6  * SPDX-FileCopyrightText: 2009 Szymon Stefanek <s.stefanek at gmail dot com>
7  *
8  * SPDX-License-Identifier: LGPL-2.0-or-later
9  *
10  *****************************************************************************/
11 
12 #include "preprocessormanager.h"
13 #include "akonadiserver_debug.h"
14 
15 #include "akonadi.h"
16 #include "entities.h" // Akonadi::Server::PimItem
17 #include "storage/datastore.h"
18 #include "tracer.h"
19 
20 #include "preprocessormanageradaptor.h"
21 
22 namespace Akonadi
23 {
24 namespace Server
25 {
26 const int gHeartbeatTimeoutInMSecs = 30000; // 30 sec heartbeat
27 
28 // 2 minutes should be really enough to process an item.
29 // After this timeout elapses we assume that the preprocessor
30 // is "stuck" and we attempt to kick it by requesting an abort().
31 const int gWarningItemProcessingTimeInSecs = 120;
32 // After 3 minutes, if the preprocessor is still "stuck" then
33 // we attempt to restart it via AgentManager....
34 const int gMaximumItemProcessingTimeInSecs = 180;
35 // After 4 minutes, if the preprocessor is still "stuck" then
36 // we assume it's dead and just drop it's interface.
37 const int gDeadlineItemProcessingTimeInSecs = 240;
38 
39 } // namespace Server
40 } // namespace Akonadi
41 
42 using namespace Akonadi::Server;
43 
44 // The one and only PreprocessorManager object
46  : mEnabled(true)
47  , mTracer(tracer)
48 {
49  // Hook in our D-Bus interface "shell".
50  new PreprocessorManagerAdaptor(this);
51 
52  QDBusConnection::sessionBus().registerObject(QStringLiteral("/PreprocessorManager"), this, QDBusConnection::ExportAdaptors);
53 
54  mHeartbeatTimer = new QTimer(this);
55 
56  QObject::connect(mHeartbeatTimer, &QTimer::timeout, this, &PreprocessorManager::heartbeat);
57 
58  mHeartbeatTimer->start(gHeartbeatTimeoutInMSecs);
59 }
60 
62 {
64 
65  // FIXME: Explicitly interrupt pre-processing here ?
66  // Pre-Processors should auto-protect themselves from re-processing an item:
67  // they are "closer to the DB" from this point of view.
68 
69  qDeleteAll(mPreprocessorChain);
70  qDeleteAll(mTransactionWaitQueueHash); // this should also disconnect all the signals from the data store objects...
71 }
72 
74 {
75  QMutexLocker locker(&mMutex);
76 
77  if (!mEnabled) {
78  return false;
79  }
80  return !mPreprocessorChain.isEmpty();
81 }
82 
83 PreprocessorInstance *PreprocessorManager::lockedFindInstance(const QString &id)
84 {
85  for (PreprocessorInstance *instance : std::as_const(mPreprocessorChain)) {
86  if (instance->id() == id) {
87  return instance;
88  }
89  }
90 
91  return nullptr;
92 }
93 
95 {
96  QMutexLocker locker(&mMutex);
97 
98  qCDebug(AKONADISERVER_LOG) << "PreprocessorManager::registerInstance(" << id << ")";
99 
100  PreprocessorInstance *instance = lockedFindInstance(id);
101  if (instance) {
102  return; // already registered
103  }
104 
105  // The PreprocessorInstance objects are actually always added at the end of the queue
106  // TODO: Maybe we need some kind of ordering here ?
107  // In that case we'll need to fiddle with the items that are currently enqueued for processing...
108 
109  instance = new PreprocessorInstance(id, *this, mTracer);
110  if (!instance->init()) {
111  mTracer.warning(QStringLiteral("PreprocessorManager"), QStringLiteral("Could not initialize preprocessor instance '%1'").arg(id));
112  delete instance;
113  return;
114  }
115 
116  qCDebug(AKONADISERVER_LOG) << "Registering preprocessor instance " << id;
117 
118  mPreprocessorChain.append(instance);
119 }
120 
122 {
123  QMutexLocker locker(&mMutex);
124 
125  qCDebug(AKONADISERVER_LOG) << "PreprocessorManager::unregisterInstance(" << id << ")";
126 
127  lockedUnregisterInstance(id);
128 }
129 
130 void PreprocessorManager::lockedUnregisterInstance(const QString &id)
131 {
132  PreprocessorInstance *instance = lockedFindInstance(id);
133  if (!instance) {
134  return; // not our instance: don't complain (as we might be called for non-preprocessor agents too)
135  }
136 
137  // All of the preprocessor's waiting items must be queued to the next preprocessor (if there is one)
138 
139  std::deque<qint64> *itemList = instance->itemQueue();
140  Q_ASSERT(itemList);
141 
142  int idx = mPreprocessorChain.indexOf(instance);
143  Q_ASSERT(idx >= 0); // must be there!
144 
145  if (idx < (mPreprocessorChain.count() - 1)) {
146  // This wasn't the last preprocessor: trigger the next one.
147  PreprocessorInstance *nextPreprocessor = mPreprocessorChain[idx + 1];
148  Q_ASSERT(nextPreprocessor);
149  Q_ASSERT(nextPreprocessor != instance);
150 
151  for (qint64 itemId : *itemList) {
152  nextPreprocessor->enqueueItem(itemId);
153  }
154  } else {
155  // This was the last preprocessor: end handling the items
156  for (qint64 itemId : *itemList) {
157  lockedEndHandleItem(itemId);
158  }
159  }
160 
161  mPreprocessorChain.removeOne(instance);
162  delete instance;
163 }
164 
165 void PreprocessorManager::beginHandleItem(const PimItem &item, const DataStore *dataStore)
166 {
167  Q_ASSERT(dataStore);
168  Q_ASSERT(item.isValid());
169 
170  // This is the entry point of the pre-processing chain.
171  QMutexLocker locker(&mMutex);
172 
173  if (!mEnabled) {
174  // Preprocessing is disabled: immediately end handling the item.
175  // In fact we shouldn't even be here as the caller should
176  // have checked isActive() before calling this function.
177  // However, since setEnabled() may be called concurrently
178  // then this might not be the caller's fault. Just drop a warning.
179 
180  qCWarning(AKONADISERVER_LOG) << "PreprocessorManager::beginHandleItem(" << item.id() << ") called with a disabled preprocessor";
181 
182  lockedEndHandleItem(item.id());
183  return;
184  }
185 
186 #if 0
187  // Now the hidden flag is stored as a part.. too hard to assert its existence :D
188  Q_ASSERT_X(item.hidden(), "PreprocessorManager::beginHandleItem()", "The item you pass to this function should be hidden!");
189 #endif
190 
191  if (mPreprocessorChain.isEmpty()) {
192  // No preprocessors at all: immediately end handling the item.
193  lockedEndHandleItem(item.id());
194  return;
195  }
196 
197  if (dataStore->inTransaction()) {
198  qCDebug(AKONADISERVER_LOG) << "PreprocessorManager::beginHandleItem(" << item.id()
199  << "): the DataStore is in transaction, pushing item to a wait queue";
200 
201  // The calling thread data store is in a transaction: push the item into a wait queue
202  std::deque<qint64> *waitQueue = mTransactionWaitQueueHash.value(dataStore, nullptr);
203 
204  if (!waitQueue) {
205  // No wait queue for this transaction yet...
206  waitQueue = new std::deque<qint64>();
207 
208  mTransactionWaitQueueHash.insert(dataStore, waitQueue);
209 
210  // This will usually end up being a queued connection.
211  QObject::connect(dataStore, &QObject::destroyed, this, &PreprocessorManager::dataStoreDestroyed);
212  QObject::connect(dataStore, &DataStore::transactionCommitted, this, &PreprocessorManager::dataStoreTransactionCommitted);
213  QObject::connect(dataStore, &DataStore::transactionRolledBack, this, &PreprocessorManager::dataStoreTransactionRolledBack);
214  }
215 
216  waitQueue->push_back(item.id());
217 
218  // nothing more to do here
219  return;
220  }
221 
222  // The calling thread data store is NOT in a transaction: we can proceed directly.
223  lockedActivateFirstPreprocessor(item.id());
224 }
225 
226 void PreprocessorManager::lockedActivateFirstPreprocessor(qint64 itemId)
227 {
228  // Activate the first preprocessor.
229  PreprocessorInstance *preProcessor = mPreprocessorChain.first();
230  Q_ASSERT(preProcessor);
231 
232  preProcessor->enqueueItem(itemId);
233  // The preprocessor will call our "preProcessorFinishedHandlingItem() method"
234  // when done with the item.
235  //
236  // The call should be asynchronous, that is it should never happen that
237  // preProcessorFinishedHandlingItem() is called from "inside" enqueueItem()...
238  // FIXME: Am I *really* sure of this ? If I'm wrong for some obscure reason then we have a deadlock.
239 }
240 
241 void PreprocessorManager::lockedKillWaitQueue(const DataStore *dataStore, bool disconnectSlots)
242 {
243  std::deque<qint64> *waitQueue = mTransactionWaitQueueHash.value(dataStore, nullptr);
244  if (!waitQueue) {
245  qCWarning(AKONADISERVER_LOG) << "PreprocessorManager::lockedKillWaitQueue(): called for dataStore which has no wait queue";
246  return;
247  }
248 
249  mTransactionWaitQueueHash.remove(dataStore);
250 
251  delete waitQueue;
252 
253  if (!disconnectSlots) {
254  return;
255  }
256 
257  QObject::disconnect(dataStore, &QObject::destroyed, this, &PreprocessorManager::dataStoreDestroyed);
258  QObject::disconnect(dataStore, &DataStore::transactionCommitted, this, &PreprocessorManager::dataStoreTransactionCommitted);
259  QObject::disconnect(dataStore, &DataStore::transactionRolledBack, this, &PreprocessorManager::dataStoreTransactionRolledBack);
260 }
261 
262 void PreprocessorManager::dataStoreDestroyed()
263 {
264  QMutexLocker locker(&mMutex);
265 
266  qCDebug(AKONADISERVER_LOG) << "PreprocessorManager::dataStoreDestroyed(): killing the wait queue";
267 
268  const auto dataStore = dynamic_cast<const DataStore *>(sender());
269  if (!dataStore) {
270  qCWarning(AKONADISERVER_LOG) << "PreprocessorManager::dataStoreDestroyed(): got the signal from a non DataStore object";
271  return;
272  }
273 
274  lockedKillWaitQueue(dataStore, false); // no need to disconnect slots, qt will do that
275 }
276 
277 void PreprocessorManager::dataStoreTransactionCommitted()
278 {
279  QMutexLocker locker(&mMutex);
280 
281  qCDebug(AKONADISERVER_LOG) << "PreprocessorManager::dataStoreTransactionCommitted(): pushing items in wait queue to the preprocessing chain";
282 
283  const auto dataStore = dynamic_cast<const DataStore *>(sender());
284  if (!dataStore) {
285  qCWarning(AKONADISERVER_LOG) << "PreprocessorManager::dataStoreTransactionCommitted(): got the signal from a non DataStore object";
286  return;
287  }
288 
289  std::deque<qint64> *waitQueue = mTransactionWaitQueueHash.value(dataStore, nullptr);
290  if (!waitQueue) {
291  qCWarning(AKONADISERVER_LOG) << "PreprocessorManager::dataStoreTransactionCommitted(): called for dataStore which has no wait queue";
292  return;
293  }
294 
295  if (!mEnabled || mPreprocessorChain.isEmpty()) {
296  // Preprocessing has been disabled in the meantime or all the preprocessors died
297  for (qint64 id : *waitQueue) {
298  lockedEndHandleItem(id);
299  }
300  } else {
301  for (qint64 id : *waitQueue) {
302  lockedActivateFirstPreprocessor(id);
303  }
304  }
305 
306  lockedKillWaitQueue(dataStore, true); // disconnect slots this time
307 }
308 
309 void PreprocessorManager::dataStoreTransactionRolledBack()
310 {
311  QMutexLocker locker(&mMutex);
312 
313  qCDebug(AKONADISERVER_LOG) << "PreprocessorManager::dataStoreTransactionRolledBack(): killing the wait queue";
314 
315  const auto dataStore = dynamic_cast<const DataStore *>(sender());
316  if (!dataStore) {
317  qCWarning(AKONADISERVER_LOG) << "PreprocessorManager::dataStoreTransactionCommitted(): got the signal from a non DataStore object";
318  return;
319  }
320 
321  lockedKillWaitQueue(dataStore, true); // disconnect slots this time
322 }
323 
325 {
326  QMutexLocker locker(&mMutex);
327 
328  int idx = mPreprocessorChain.indexOf(preProcessor);
329  Q_ASSERT(idx >= 0); // must be there!
330 
331  if (idx < (mPreprocessorChain.count() - 1)) {
332  // This wasn't the last preprocessor: trigger the next one.
333  PreprocessorInstance *nextPreprocessor = mPreprocessorChain[idx + 1];
334  Q_ASSERT(nextPreprocessor);
335  Q_ASSERT(nextPreprocessor != preProcessor);
336 
337  nextPreprocessor->enqueueItem(itemId);
338  } else {
339  // This was the last preprocessor: end handling the item.
340  lockedEndHandleItem(itemId);
341  }
342 }
343 
344 void PreprocessorManager::lockedEndHandleItem(qint64 itemId)
345 {
346  // The exit point of the pre-processing chain.
347 
348  // Refetch the PimItem, the Collection and the MimeType now: preprocessing might have changed them.
349  PimItem item = PimItem::retrieveById(itemId);
350  if (!item.isValid()) {
351  // HUM... the preprocessor killed the item ?
352  // ... or retrieveById() failed ?
353  // Well.. if the preprocessor killed the item then this might be actually OK (spam?).
354  qCDebug(AKONADISERVER_LOG) << "Invalid PIM item id '" << itemId << "' passed to preprocessing chain termination function";
355  return;
356  }
357 
358 #if 0
359  if (!item.hidden()) {
360  // HUM... the item was already unhidden for some reason: we have nothing more to do here.
361  qCDebug(AKONADISERVER_LOG) << "The PIM item with id '" << itemId << "' reached the preprocessing chain termination function in unhidden state";
362  return;
363  }
364 #endif
365 
366  if (!DataStore::self()->unhidePimItem(item)) {
367  mTracer.warning(
368  QStringLiteral("PreprocessorManager"),
369  QStringLiteral("Failed to unhide the PIM item '%1': data is not lost but a server restart is required in order to unhide it").arg(itemId));
370  }
371 }
372 
373 void PreprocessorManager::heartbeat()
374 {
375  QMutexLocker locker(&mMutex);
376 
377  // Loop through the processor instances and check their current processing time.
378 
379  QList<PreprocessorInstance *> firedPreprocessors;
380 
381  for (PreprocessorInstance *instance : std::as_const(mPreprocessorChain)) {
382  // In this loop we check for "stuck" preprocessors.
383 
384  int elapsedTime = instance->currentProcessingTime();
385 
386  if (elapsedTime < gWarningItemProcessingTimeInSecs) {
387  continue; // ok, still in time.
388  }
389 
390  // Ooops... the preprocessor looks to be "stuck".
391  // This is a rather critical condition and the question is "what we can do about it ?".
392  // The fact is that it doesn't really make sense to push another item for
393  // processing as the slave process is either dead (silently ?) or stuck anyway.
394 
395  // We then proceed as following:
396  // - we first kindly ask the preprocessor to abort the job (via Agent.Control interface)
397  // - if it doesn't obey after some time we attempt to restart it (via AgentManager)
398  // - if it doesn't obey, we drop the interface and assume it's dead until
399  // it's effectively restarted.
400 
401  if (elapsedTime < gMaximumItemProcessingTimeInSecs) {
402  // Kindly ask the preprocessor to abort the job.
403 
404  mTracer.warning(QStringLiteral("PreprocessorManager"),
405  QStringLiteral("Preprocessor '%1' seems to be stuck... trying to abort its job.").arg(instance->id()));
406 
407  if (instance->abortProcessing()) {
408  continue;
409  }
410  // If we're here then abortProcessing() failed.
411  }
412 
413  if (elapsedTime < gDeadlineItemProcessingTimeInSecs) {
414  // Attempt to restart the preprocessor via AgentManager interface
415 
416  mTracer.warning(QStringLiteral("PreprocessorManager"), QStringLiteral("Preprocessor '%1' is stuck... trying to restart it").arg(instance->id()));
417 
418  if (instance->invokeRestart()) {
419  continue;
420  }
421  // If we're here then invokeRestart() failed.
422  }
423 
424  mTracer.warning(QStringLiteral("PreprocessorManager"), QStringLiteral("Preprocessor '%1' is broken... ignoring it from now on").arg(instance->id()));
425 
426  // You're fired! Go Away!
427  firedPreprocessors.append(instance);
428  }
429 
430  // Kill the fired preprocessors, if any.
431  for (PreprocessorInstance *instance : std::as_const(firedPreprocessors)) {
432  lockedUnregisterInstance(instance->id());
433  }
434 }
void append(const T &value)
QList< PreprocessorInstance * > mPreprocessorChain
The preprocessor chain.
This class handles all the database access.
Definition: datastore.h:94
QMutex mMutex
The mutex used to protect the internals of this class (mainly the mPreprocessorChain member).
bool init()
This is called by PreprocessorManager just after the construction in order to connect to the preproce...
bool disconnect(const QObject *sender, const char *signal, const QObject *receiver, const char *method)
static DataStore * self()
Per thread singleton.
Definition: datastore.cpp:215
bool invokeRestart()
Attempts to invoke the preprocessor slave restart via AgentManager.
bool inTransaction() const
Returns true if there is a transaction in progress.
Definition: datastore.cpp:1451
PreprocessorManager(Tracer &tracer)
Creates an instance of PreprocessorManager.
~PreprocessorManager() override
Destroys the instance of PreprocessorManager and frees all the relevant resources.
QObject * sender() const const
void unregisterInstance(const QString &id)
This is called via D-Bus from AgentManager to unregister a preprocessor instance.
The global tracer instance where all akonadi components can send their tracing information to.
Definition: tracer.h:37
bool registerObject(const QString &path, QObject *object, QDBusConnection::RegisterOptions options)
Definition: item.h:32
QMetaObject::Connection connect(const QObject *sender, const char *signal, const QObject *receiver, const char *method, Qt::ConnectionType type)
QTimer * mHeartbeatTimer
The heartbeat timer.
QHash< const DataStore *, std::deque< qint64 > * > mTransactionWaitQueueHash
The hashtable of transaction wait queues.
bool mEnabled
Is preprocessing enabled at all in this Akonadi server instance? This is true by default and can be s...
void destroyed(QObject *obj)
void start(int msec)
A single preprocessor (agent) instance.
QDBusConnection sessionBus()
void warning(const QString &componentName, const QString &msg) override
This method is called whenever a component wants to output a warning.
Definition: tracer.cpp:117
void beginHandleItem(const PimItem &item, const DataStore *dataStore)
Trigger the preprocessor chain for the specified item.
void timeout()
const QString & id() const
Returns the id of this preprocessor.
std::deque< qint64 > * itemQueue()
Returns a pointer to the internal preprocessor instance item queue.
void transactionRolledBack()
Emitted if a transaction has been aborted.
bool abortProcessing()
Attempts to abort the processing of the current item.
void transactionCommitted()
Emitted if a transaction has been successfully committed.
void registerInstance(const QString &id)
This is called via D-Bus from AgentManager to register a preprocessor instance.
void enqueueItem(qint64 itemId)
This is called by PreprocessorManager to enqueue a PimItem for processing by this preprocessor instan...
void stop()
qint64 currentProcessingTime()
Returns the time in seconds elapsed since the current item was submitted to the slave preprocessor in...
void preProcessorFinishedHandlingItem(PreprocessorInstance *preProcessor, qint64 itemId)
This is called by PreprocessorInstance to signal that a certain preprocessor has finished handling an...
bool isActive()
Returns true if preprocessing is active in this Akonadi server.
Helper integration between Akonadi and Qt.
This file is part of the KDE documentation.
Documentation copyright © 1996-2022 The KDE developers.
Generated on Sat Jul 2 2022 06:41:48 by doxygen 1.8.17 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.