Akonadi

preprocessormanager.cpp
1/******************************************************************************
2 *
3 * File : preprocessormanager.cpp
4 * Creation date : Sat 18 Jul 2009 01:58:50
5 *
6 * SPDX-FileCopyrightText: 2009 Szymon Stefanek <s.stefanek at gmail dot com>
7 *
8 * SPDX-License-Identifier: LGPL-2.0-or-later
9 *
10 *****************************************************************************/
11
12#include "preprocessormanager.h"
13#include "akonadiserver_debug.h"
14
15#include "akonadi.h"
16#include "entities.h" // Akonadi::Server::PimItem
17#include "storage/datastore.h"
18#include "tracer.h"
19
20#include "preprocessormanageradaptor.h"
21
22namespace Akonadi
23{
24namespace Server
25{
26const int gHeartbeatTimeoutInMSecs = 30000; // 30 sec heartbeat
27
28// 2 minutes should be really enough to process an item.
29// After this timeout elapses we assume that the preprocessor
30// is "stuck" and we attempt to kick it by requesting an abort().
31const int gWarningItemProcessingTimeInSecs = 120;
32// After 3 minutes, if the preprocessor is still "stuck" then
33// we attempt to restart it via AgentManager....
34const int gMaximumItemProcessingTimeInSecs = 180;
35// After 4 minutes, if the preprocessor is still "stuck" then
36// we assume it's dead and just drop it's interface.
37const int gDeadlineItemProcessingTimeInSecs = 240;
38
39} // namespace Server
40} // namespace Akonadi
41
42using namespace Akonadi::Server;
43
44// The one and only PreprocessorManager object
46 : mEnabled(true)
47 , mTracer(tracer)
48{
49 // Hook in our D-Bus interface "shell".
50 new PreprocessorManagerAdaptor(this);
51
52 QDBusConnection::sessionBus().registerObject(QStringLiteral("/PreprocessorManager"), this, QDBusConnection::ExportAdaptors);
53
54 mHeartbeatTimer = new QTimer(this);
55
56 QObject::connect(mHeartbeatTimer, &QTimer::timeout, this, &PreprocessorManager::heartbeat);
57
58 mHeartbeatTimer->start(gHeartbeatTimeoutInMSecs);
59}
60
62{
64
65 // FIXME: Explicitly interrupt pre-processing here ?
66 // Pre-Processors should auto-protect themselves from re-processing an item:
67 // they are "closer to the DB" from this point of view.
68
69 qDeleteAll(mPreprocessorChain);
70 qDeleteAll(mTransactionWaitQueueHash); // this should also disconnect all the signals from the data store objects...
71}
72
74{
75 QMutexLocker locker(&mMutex);
76
77 if (!mEnabled) {
78 return false;
79 }
80 return !mPreprocessorChain.isEmpty();
81}
82
83PreprocessorInstance *PreprocessorManager::lockedFindInstance(const QString &id)
84{
85 for (PreprocessorInstance *instance : std::as_const(mPreprocessorChain)) {
86 if (instance->id() == id) {
87 return instance;
88 }
89 }
90
91 return nullptr;
92}
93
95{
96 QMutexLocker locker(&mMutex);
97
98 qCDebug(AKONADISERVER_LOG) << "PreprocessorManager::registerInstance(" << id << ")";
99
100 PreprocessorInstance *instance = lockedFindInstance(id);
101 if (instance) {
102 return; // already registered
103 }
104
105 // The PreprocessorInstance objects are actually always added at the end of the queue
106 // TODO: Maybe we need some kind of ordering here ?
107 // In that case we'll need to fiddle with the items that are currently enqueued for processing...
108
109 instance = new PreprocessorInstance(id, *this, mTracer);
110 if (!instance->init()) {
111 mTracer.warning(QStringLiteral("PreprocessorManager"), QStringLiteral("Could not initialize preprocessor instance '%1'").arg(id));
112 delete instance;
113 return;
114 }
115
116 qCDebug(AKONADISERVER_LOG) << "Registering preprocessor instance " << id;
117
118 mPreprocessorChain.append(instance);
119}
120
122{
123 QMutexLocker locker(&mMutex);
124
125 qCDebug(AKONADISERVER_LOG) << "PreprocessorManager::unregisterInstance(" << id << ")";
126
127 lockedUnregisterInstance(id);
128}
129
130void PreprocessorManager::lockedUnregisterInstance(const QString &id)
131{
132 PreprocessorInstance *instance = lockedFindInstance(id);
133 if (!instance) {
134 return; // not our instance: don't complain (as we might be called for non-preprocessor agents too)
135 }
136
137 // All of the preprocessor's waiting items must be queued to the next preprocessor (if there is one)
138
139 std::deque<qint64> *itemList = instance->itemQueue();
140 Q_ASSERT(itemList);
141
142 int idx = mPreprocessorChain.indexOf(instance);
143 Q_ASSERT(idx >= 0); // must be there!
144
145 if (idx < (mPreprocessorChain.count() - 1)) {
146 // This wasn't the last preprocessor: trigger the next one.
147 PreprocessorInstance *nextPreprocessor = mPreprocessorChain[idx + 1];
148 Q_ASSERT(nextPreprocessor);
149 Q_ASSERT(nextPreprocessor != instance);
150
151 for (qint64 itemId : *itemList) {
152 nextPreprocessor->enqueueItem(itemId);
153 }
154 } else {
155 // This was the last preprocessor: end handling the items
156 for (qint64 itemId : *itemList) {
157 lockedEndHandleItem(itemId);
158 }
159 }
160
161 mPreprocessorChain.removeOne(instance);
162 delete instance;
163}
164
165void PreprocessorManager::beginHandleItem(const PimItem &item, const DataStore *dataStore)
166{
167 Q_ASSERT(dataStore);
168 Q_ASSERT(item.isValid());
169
170 // This is the entry point of the pre-processing chain.
171 QMutexLocker locker(&mMutex);
172
173 if (!mEnabled) {
174 // Preprocessing is disabled: immediately end handling the item.
175 // In fact we shouldn't even be here as the caller should
176 // have checked isActive() before calling this function.
177 // However, since setEnabled() may be called concurrently
178 // then this might not be the caller's fault. Just drop a warning.
179
180 qCWarning(AKONADISERVER_LOG) << "PreprocessorManager::beginHandleItem(" << item.id() << ") called with a disabled preprocessor";
181
182 lockedEndHandleItem(item.id());
183 return;
184 }
185
186#if 0
187 // Now the hidden flag is stored as a part.. too hard to assert its existence :D
188 Q_ASSERT_X(item.hidden(), "PreprocessorManager::beginHandleItem()", "The item you pass to this function should be hidden!");
189#endif
190
191 if (mPreprocessorChain.isEmpty()) {
192 // No preprocessors at all: immediately end handling the item.
193 lockedEndHandleItem(item.id());
194 return;
195 }
196
197 if (dataStore->inTransaction()) {
198 qCDebug(AKONADISERVER_LOG) << "PreprocessorManager::beginHandleItem(" << item.id()
199 << "): the DataStore is in transaction, pushing item to a wait queue";
200
201 // The calling thread data store is in a transaction: push the item into a wait queue
202 std::deque<qint64> *waitQueue = mTransactionWaitQueueHash.value(dataStore, nullptr);
203
204 if (!waitQueue) {
205 // No wait queue for this transaction yet...
206 waitQueue = new std::deque<qint64>();
207
208 mTransactionWaitQueueHash.insert(dataStore, waitQueue);
209
210 // This will usually end up being a queued connection.
211 QObject::connect(dataStore, &QObject::destroyed, this, &PreprocessorManager::dataStoreDestroyed);
212 QObject::connect(dataStore, &DataStore::transactionCommitted, this, &PreprocessorManager::dataStoreTransactionCommitted);
213 QObject::connect(dataStore, &DataStore::transactionRolledBack, this, &PreprocessorManager::dataStoreTransactionRolledBack);
214 }
215
216 waitQueue->push_back(item.id());
217
218 // nothing more to do here
219 return;
220 }
221
222 // The calling thread data store is NOT in a transaction: we can proceed directly.
223 lockedActivateFirstPreprocessor(item.id());
224}
225
226void PreprocessorManager::lockedActivateFirstPreprocessor(qint64 itemId)
227{
228 // Activate the first preprocessor.
229 PreprocessorInstance *preProcessor = mPreprocessorChain.first();
230 Q_ASSERT(preProcessor);
231
232 preProcessor->enqueueItem(itemId);
233 // The preprocessor will call our "preProcessorFinishedHandlingItem() method"
234 // when done with the item.
235 //
236 // The call should be asynchronous, that is it should never happen that
237 // preProcessorFinishedHandlingItem() is called from "inside" enqueueItem()...
238 // FIXME: Am I *really* sure of this ? If I'm wrong for some obscure reason then we have a deadlock.
239}
240
241void PreprocessorManager::lockedKillWaitQueue(const DataStore *dataStore, bool disconnectSlots)
242{
243 std::deque<qint64> *waitQueue = mTransactionWaitQueueHash.value(dataStore, nullptr);
244 if (!waitQueue) {
245 qCWarning(AKONADISERVER_LOG) << "PreprocessorManager::lockedKillWaitQueue(): called for dataStore which has no wait queue";
246 return;
247 }
248
249 mTransactionWaitQueueHash.remove(dataStore);
250
251 delete waitQueue;
252
253 if (!disconnectSlots) {
254 return;
255 }
256
257 QObject::disconnect(dataStore, &QObject::destroyed, this, &PreprocessorManager::dataStoreDestroyed);
258 QObject::disconnect(dataStore, &DataStore::transactionCommitted, this, &PreprocessorManager::dataStoreTransactionCommitted);
259 QObject::disconnect(dataStore, &DataStore::transactionRolledBack, this, &PreprocessorManager::dataStoreTransactionRolledBack);
260}
261
262void PreprocessorManager::dataStoreDestroyed()
263{
264 QMutexLocker locker(&mMutex);
265
266 qCDebug(AKONADISERVER_LOG) << "PreprocessorManager::dataStoreDestroyed(): killing the wait queue";
267
268 const auto dataStore = dynamic_cast<const DataStore *>(sender());
269 if (!dataStore) {
270 qCWarning(AKONADISERVER_LOG) << "PreprocessorManager::dataStoreDestroyed(): got the signal from a non DataStore object";
271 return;
272 }
273
274 lockedKillWaitQueue(dataStore, false); // no need to disconnect slots, qt will do that
275}
276
277void PreprocessorManager::dataStoreTransactionCommitted()
278{
279 QMutexLocker locker(&mMutex);
280
281 qCDebug(AKONADISERVER_LOG) << "PreprocessorManager::dataStoreTransactionCommitted(): pushing items in wait queue to the preprocessing chain";
282
283 const auto dataStore = dynamic_cast<const DataStore *>(sender());
284 if (!dataStore) {
285 qCWarning(AKONADISERVER_LOG) << "PreprocessorManager::dataStoreTransactionCommitted(): got the signal from a non DataStore object";
286 return;
287 }
288
289 std::deque<qint64> *waitQueue = mTransactionWaitQueueHash.value(dataStore, nullptr);
290 if (!waitQueue) {
291 qCWarning(AKONADISERVER_LOG) << "PreprocessorManager::dataStoreTransactionCommitted(): called for dataStore which has no wait queue";
292 return;
293 }
294
295 if (!mEnabled || mPreprocessorChain.isEmpty()) {
296 // Preprocessing has been disabled in the meantime or all the preprocessors died
297 for (qint64 id : *waitQueue) {
298 lockedEndHandleItem(id);
299 }
300 } else {
301 for (qint64 id : *waitQueue) {
302 lockedActivateFirstPreprocessor(id);
303 }
304 }
305
306 lockedKillWaitQueue(dataStore, true); // disconnect slots this time
307}
308
309void PreprocessorManager::dataStoreTransactionRolledBack()
310{
311 QMutexLocker locker(&mMutex);
312
313 qCDebug(AKONADISERVER_LOG) << "PreprocessorManager::dataStoreTransactionRolledBack(): killing the wait queue";
314
315 const auto dataStore = dynamic_cast<const DataStore *>(sender());
316 if (!dataStore) {
317 qCWarning(AKONADISERVER_LOG) << "PreprocessorManager::dataStoreTransactionCommitted(): got the signal from a non DataStore object";
318 return;
319 }
320
321 lockedKillWaitQueue(dataStore, true); // disconnect slots this time
322}
323
325{
326 QMutexLocker locker(&mMutex);
327
328 int idx = mPreprocessorChain.indexOf(preProcessor);
329 Q_ASSERT(idx >= 0); // must be there!
330
331 if (idx < (mPreprocessorChain.count() - 1)) {
332 // This wasn't the last preprocessor: trigger the next one.
333 PreprocessorInstance *nextPreprocessor = mPreprocessorChain[idx + 1];
334 Q_ASSERT(nextPreprocessor);
335 Q_ASSERT(nextPreprocessor != preProcessor);
336
337 nextPreprocessor->enqueueItem(itemId);
338 } else {
339 // This was the last preprocessor: end handling the item.
340 lockedEndHandleItem(itemId);
341 }
342}
343
344void PreprocessorManager::lockedEndHandleItem(qint64 itemId)
345{
346 // The exit point of the pre-processing chain.
347
348 // Refetch the PimItem, the Collection and the MimeType now: preprocessing might have changed them.
349 PimItem item = PimItem::retrieveById(itemId);
350 if (!item.isValid()) {
351 // HUM... the preprocessor killed the item ?
352 // ... or retrieveById() failed ?
353 // Well.. if the preprocessor killed the item then this might be actually OK (spam?).
354 qCDebug(AKONADISERVER_LOG) << "Invalid PIM item id '" << itemId << "' passed to preprocessing chain termination function";
355 return;
356 }
357
358#if 0
359 if (!item.hidden()) {
360 // HUM... the item was already unhidden for some reason: we have nothing more to do here.
361 qCDebug(AKONADISERVER_LOG) << "The PIM item with id '" << itemId << "' reached the preprocessing chain termination function in unhidden state";
362 return;
363 }
364#endif
365
366 if (!DataStore::self()->unhidePimItem(item)) {
367 mTracer.warning(
368 QStringLiteral("PreprocessorManager"),
369 QStringLiteral("Failed to unhide the PIM item '%1': data is not lost but a server restart is required in order to unhide it").arg(itemId));
370 }
371}
372
373void PreprocessorManager::heartbeat()
374{
375 QMutexLocker locker(&mMutex);
376
377 // Loop through the processor instances and check their current processing time.
378
379 QList<PreprocessorInstance *> firedPreprocessors;
380
381 for (PreprocessorInstance *instance : std::as_const(mPreprocessorChain)) {
382 // In this loop we check for "stuck" preprocessors.
383
384 int elapsedTime = instance->currentProcessingTime();
385
386 if (elapsedTime < gWarningItemProcessingTimeInSecs) {
387 continue; // ok, still in time.
388 }
389
390 // Ooops... the preprocessor looks to be "stuck".
391 // This is a rather critical condition and the question is "what we can do about it ?".
392 // The fact is that it doesn't really make sense to push another item for
393 // processing as the slave process is either dead (silently ?) or stuck anyway.
394
395 // We then proceed as following:
396 // - we first kindly ask the preprocessor to abort the job (via Agent.Control interface)
397 // - if it doesn't obey after some time we attempt to restart it (via AgentManager)
398 // - if it doesn't obey, we drop the interface and assume it's dead until
399 // it's effectively restarted.
400
401 if (elapsedTime < gMaximumItemProcessingTimeInSecs) {
402 // Kindly ask the preprocessor to abort the job.
403
404 mTracer.warning(QStringLiteral("PreprocessorManager"),
405 QStringLiteral("Preprocessor '%1' seems to be stuck... trying to abort its job.").arg(instance->id()));
406
407 if (instance->abortProcessing()) {
408 continue;
409 }
410 // If we're here then abortProcessing() failed.
411 }
412
413 if (elapsedTime < gDeadlineItemProcessingTimeInSecs) {
414 // Attempt to restart the preprocessor via AgentManager interface
415
416 mTracer.warning(QStringLiteral("PreprocessorManager"), QStringLiteral("Preprocessor '%1' is stuck... trying to restart it").arg(instance->id()));
417
418 if (instance->invokeRestart()) {
419 continue;
420 }
421 // If we're here then invokeRestart() failed.
422 }
423
424 mTracer.warning(QStringLiteral("PreprocessorManager"), QStringLiteral("Preprocessor '%1' is broken... ignoring it from now on").arg(instance->id()));
425
426 // You're fired! Go Away!
427 firedPreprocessors.append(instance);
428 }
429
430 // Kill the fired preprocessors, if any.
431 for (PreprocessorInstance *instance : std::as_const(firedPreprocessors)) {
432 lockedUnregisterInstance(instance->id());
433 }
434}
435
436#include "moc_preprocessormanager.cpp"
This class handles all the database access.
Definition datastore.h:95
bool inTransaction() const
Returns true if there is a transaction in progress.
static DataStore * self()
Per thread singleton.
void transactionRolledBack()
Emitted if a transaction has been aborted.
void transactionCommitted()
Emitted if a transaction has been successfully committed.
A single preprocessor (agent) instance.
std::deque< qint64 > * itemQueue()
Returns a pointer to the internal preprocessor instance item queue.
qint64 currentProcessingTime()
Returns the time in seconds elapsed since the current item was submitted to the slave preprocessor in...
const QString & id() const
Returns the id of this preprocessor.
bool abortProcessing()
Attempts to abort the processing of the current item.
void enqueueItem(qint64 itemId)
This is called by PreprocessorManager to enqueue a PimItem for processing by this preprocessor instan...
bool init()
This is called by PreprocessorManager just after the construction in order to connect to the preproce...
bool invokeRestart()
Attempts to invoke the preprocessor slave restart via AgentManager.
void unregisterInstance(const QString &id)
This is called via D-Bus from AgentManager to unregister a preprocessor instance.
void preProcessorFinishedHandlingItem(PreprocessorInstance *preProcessor, qint64 itemId)
This is called by PreprocessorInstance to signal that a certain preprocessor has finished handling an...
QList< PreprocessorInstance * > mPreprocessorChain
The preprocessor chain.
bool isActive()
Returns true if preprocessing is active in this Akonadi server.
bool mEnabled
Is preprocessing enabled at all in this Akonadi server instance? This is true by default and can be s...
void beginHandleItem(const PimItem &item, const DataStore *dataStore)
Trigger the preprocessor chain for the specified item.
PreprocessorManager(Tracer &tracer)
Creates an instance of PreprocessorManager.
~PreprocessorManager() override
Destroys the instance of PreprocessorManager and frees all the relevant resources.
void registerInstance(const QString &id)
This is called via D-Bus from AgentManager to register a preprocessor instance.
QHash< const DataStore *, std::deque< qint64 > * > mTransactionWaitQueueHash
The hashtable of transaction wait queues.
QMutex mMutex
The mutex used to protect the internals of this class (mainly the mPreprocessorChain member).
QTimer * mHeartbeatTimer
The heartbeat timer.
The global tracer instance where all akonadi components can send their tracing information to.
Definition tracer.h:38
void warning(const QString &componentName, const QString &msg) override
This method is called whenever a component wants to output a warning.
Definition tracer.cpp:117
Helper integration between Akonadi and Qt.
bool registerObject(const QString &path, QObject *object, RegisterOptions options)
QDBusConnection sessionBus()
void append(QList< T > &&value)
QMetaObject::Connection connect(const QObject *sender, PointerToMemberFunction signal, Functor functor)
void destroyed(QObject *obj)
bool disconnect(const QMetaObject::Connection &connection)
QObject * sender() const const
QString & append(QChar ch)
void start()
void stop()
void timeout()
This file is part of the KDE documentation.
Documentation copyright © 1996-2025 The KDE developers.
Generated on Fri Jan 3 2025 11:58:20 by doxygen 1.12.0 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.