Akonadi

searchtaskmanager.cpp
1/*
2 SPDX-FileCopyrightText: 2013, 2014 Daniel Vrátil <dvratil@redhat.com>
3
4 SPDX-License-Identifier: LGPL-2.0-or-later
5*/
6
7#include "searchtaskmanager.h"
8#include "agentsearchinstance.h"
9#include "akonadiserver_search_debug.h"
10#include "connection.h"
11#include "entities.h"
12#include "storage/selectquerybuilder.h"
13
14#include "private/dbus_p.h"
15
16#include <QDBusConnection>
17#include <QDeadlineTimer>
18#include <QSqlError>
19#include <QTime>
20#include <QTimer>
21using namespace Akonadi;
22using namespace Akonadi::Server;
23
24SearchTaskManager::SearchTaskManager()
25 : AkThread(QStringLiteral("SearchTaskManager"))
26 , mShouldStop(false)
27{
28 QTimer::singleShot(0, this, &SearchTaskManager::searchLoop);
29}
30
31SearchTaskManager::~SearchTaskManager()
32{
33 QMutexLocker locker(&mLock);
34 mShouldStop = true;
35 mWait.wakeAll();
36 locker.unlock();
37
38 quitThread();
39
40 mInstancesLock.lock();
41 qDeleteAll(mInstances);
42 mInstancesLock.unlock();
43}
44
45void SearchTaskManager::registerInstance(const QString &id)
46{
47 QMutexLocker locker(&mInstancesLock);
48
49 qCDebug(AKONADISERVER_SEARCH_LOG) << "SearchManager::registerInstance(" << id << ")";
50
51 AgentSearchInstance *instance = mInstances.value(id);
52 if (instance) {
53 return; // already registered
54 }
55
56 instance = new AgentSearchInstance(id, *this);
57 if (!instance->init()) {
58 qCDebug(AKONADISERVER_SEARCH_LOG) << "Failed to initialize Search agent";
59 delete instance;
60 return;
61 }
62
63 qCDebug(AKONADISERVER_SEARCH_LOG) << "Registering search instance " << id;
64 mInstances.insert(id, instance);
65}
66
67void SearchTaskManager::unregisterInstance(const QString &id)
68{
69 QMutexLocker locker(&mInstancesLock);
70
72 if (it != mInstances.end()) {
73 qCDebug(AKONADISERVER_SEARCH_LOG) << "Unregistering search instance" << id;
74 it.value()->deleteLater();
75 mInstances.erase(it);
76 }
77}
78
79void SearchTaskManager::addTask(SearchTask *task)
80{
81 QueryBuilder qb(Collection::tableName());
82 qb.addJoin(QueryBuilder::InnerJoin, Resource::tableName(), Collection::resourceIdFullColumnName(), Resource::idFullColumnName());
83 qb.addColumn(Collection::idFullColumnName());
85
86 Q_ASSERT(!task->collections.isEmpty());
87 QVariantList list;
88 list.reserve(task->collections.size());
89 for (qint64 collection : std::as_const(task->collections)) {
90 list << collection;
91 }
92 qb.addValueCondition(Collection::idFullColumnName(), Query::In, list);
93
94 if (!qb.exec()) {
95 throw SearchException(qb.query().lastError().text());
96 }
97
98 QSqlQuery query = qb.query();
99 if (!query.next()) {
100 return;
101 }
102
103 mInstancesLock.lock();
104
105 org::freedesktop::Akonadi::AgentManager agentManager(DBus::serviceName(DBus::Control), QStringLiteral("/AgentManager"), QDBusConnection::sessionBus());
106 do {
107 const QString resourceId = query.value(1).toString();
108 if (!mInstances.contains(resourceId)) {
109 qCDebug(AKONADISERVER_SEARCH_LOG) << "Resource" << resourceId << "does not implement Search interface, skipping";
110 } else if (!agentManager.agentInstanceOnline(resourceId)) {
111 qCDebug(AKONADISERVER_SEARCH_LOG) << "Agent" << resourceId << "is offline, skipping";
112 } else if (agentManager.agentInstanceStatus(resourceId) > 2) { // 2 == Broken, 3 == Not Configured
113 qCDebug(AKONADISERVER_SEARCH_LOG) << "Agent" << resourceId << "is broken or not configured";
114 } else {
115 const qint64 collectionId = query.value(0).toLongLong();
116 qCDebug(AKONADISERVER_SEARCH_LOG) << "Enqueued search query (" << resourceId << ", " << collectionId << ")";
117 task->queries << qMakePair(resourceId, collectionId);
118 }
119 } while (query.next());
120 mInstancesLock.unlock();
121
122 QMutexLocker locker(&mLock);
123 mTasklist.append(task);
124 mWait.wakeAll();
125}
126
127void SearchTaskManager::pushResults(const QByteArray &searchId, const QSet<qint64> &ids, Connection *connection)
128{
129 Q_UNUSED(searchId)
130
131 const auto resourceName = connection->context().resource().name();
132 qCDebug(AKONADISERVER_SEARCH_LOG) << ids.count() << "results for search" << searchId << "pushed from" << resourceName;
133
134 QMutexLocker locker(&mLock);
135 ResourceTask *task = mRunningTasks.take(resourceName);
136 if (!task) {
137 qCDebug(AKONADISERVER_SEARCH_LOG) << "No running task for" << resourceName << " - maybe it has timed out?";
138 return;
139 }
140
141 if (task->parentTask->id != searchId) {
142 qCDebug(AKONADISERVER_SEARCH_LOG) << "Received results for different search - maybe the original task has timed out?";
143 qCDebug(AKONADISERVER_SEARCH_LOG) << "Search is" << searchId << ", but task is" << task->parentTask->id;
144 return;
145 }
146
147 task->results = ids;
148 mPendingResults.append(task);
149
150 mWait.wakeAll();
151}
152
153bool SearchTaskManager::allResourceTasksCompleted(SearchTask *agentSearchTask) const
154{
155 // Check for queries pending to be dispatched
156 if (!agentSearchTask->queries.isEmpty()) {
157 return false;
158 }
159
160 // Check for running queries
163 for (; it != end; ++it) {
164 if (it.value()->parentTask == agentSearchTask) {
165 return false;
166 }
167 }
168
169 return true;
170}
171
172SearchTaskManager::TasksMap::Iterator SearchTaskManager::cancelRunningTask(TasksMap::Iterator &iter)
173{
174 ResourceTask *task = iter.value();
175 SearchTask *parentTask = task->parentTask;
176 QMutexLocker locker(&parentTask->sharedLock);
177 // erase the task before allResourceTasksCompleted
179 // We're not clearing the results since we don't want to clear successful results from other resources
180 parentTask->complete = allResourceTasksCompleted(parentTask);
181 parentTask->notifier.wakeAll();
182 delete task;
183
184 return it;
185}
186
187void SearchTaskManager::searchLoop()
188{
189 qint64 timeout = ULONG_MAX;
190
191 QMutexLocker locker(&mLock);
192
193 for (;;) {
194 qCDebug(AKONADISERVER_SEARCH_LOG) << "Search loop is waiting, will wake again in" << timeout << "ms";
196 if (mShouldStop) {
197 for (SearchTask *task : std::as_const(mTasklist)) {
198 QMutexLocker locker(&task->sharedLock);
199 task->queries.clear();
200 task->notifier.wakeAll();
201 }
202
204 for (; it != mRunningTasks.end();) {
205 if (mTasklist.contains(it.value()->parentTask)) {
206 delete it.value();
207 it = mRunningTasks.erase(it);
208 continue;
209 }
210 it = cancelRunningTask(it);
211 }
212
213 break;
214 }
215
216 // First notify about available results
217 while (!mPendingResults.isEmpty()) {
218 ResourceTask *finishedTask = mPendingResults.first();
219 mPendingResults.remove(0);
220 qCDebug(AKONADISERVER_SEARCH_LOG) << "Pending results from" << finishedTask->resourceId << "for collection" << finishedTask->collectionId
221 << "for search" << finishedTask->parentTask->id << "available!";
222 SearchTask *parentTask = finishedTask->parentTask;
223 QMutexLocker locker(&parentTask->sharedLock);
224 // We need to append, this agent search task is shared
225 parentTask->pendingResults += finishedTask->results;
226 parentTask->complete = allResourceTasksCompleted(parentTask);
227 parentTask->notifier.wakeAll();
228 delete finishedTask;
229 }
230
231 // No check whether there are any tasks running longer than 1 minute and kill them
233 const qint64 now = QDateTime::currentMSecsSinceEpoch();
234 for (; it != mRunningTasks.end();) {
235 ResourceTask *task = it.value();
236 if (now - task->timestamp > 60 * 1000) {
237 // Remove the task - and signal to parent task that it has "finished" without results
238 qCDebug(AKONADISERVER_SEARCH_LOG) << "Resource task" << task->resourceId << "for search" << task->parentTask->id << "timed out!";
239 it = cancelRunningTask(it);
240 } else {
241 ++it;
242 }
243 }
244
245 if (!mTasklist.isEmpty()) {
246 SearchTask *task = mTasklist.first();
247 qCDebug(AKONADISERVER_SEARCH_LOG) << "Search task" << task->id << "available!";
248 if (task->queries.isEmpty()) {
249 qCDebug(AKONADISERVER_SEARCH_LOG) << "nothing to do for task";
250 QMutexLocker locker(&task->sharedLock);
251 // After this the AgentSearchTask will be destroyed
252 task->complete = true;
253 task->notifier.wakeAll();
254 mTasklist.remove(0);
255 continue;
256 }
257
258 for (auto it = task->queries.begin(); it != task->queries.end();) {
259 if (!mRunningTasks.contains(it->first)) {
260 const auto &[resource, colId] = *it;
261 qCDebug(AKONADISERVER_SEARCH_LOG) << "\t Sending query for collection" << colId << "to resource" << resource;
262 auto rTask = new ResourceTask;
263 rTask->resourceId = resource;
264 rTask->collectionId = colId;
265 rTask->parentTask = task;
267 mRunningTasks.insert(resource, rTask);
268
269 mInstancesLock.lock();
270 AgentSearchInstance *instance = mInstances.value(resource);
271 if (!instance) {
272 mInstancesLock.unlock();
273 // Resource disappeared in the meanwhile
274 continue;
275 }
276
277 instance->search(task->id, task->query, colId);
278 mInstancesLock.unlock();
279
280 task->sharedLock.lock();
281 it = task->queries.erase(it);
282 task->sharedLock.unlock();
283 } else {
284 ++it;
285 }
286 }
287 // Yay! We managed to dispatch all requests!
288 if (task->queries.isEmpty()) {
289 qCDebug(AKONADISERVER_SEARCH_LOG) << "All queries from task" << task->id << "dispatched!";
290 mTasklist.remove(0);
291 }
292
293 timeout = 60 * 1000; // check whether all tasks have finished within a minute
294 } else {
295 if (mRunningTasks.isEmpty()) {
296 timeout = ULONG_MAX;
297 }
298 }
299 }
300}
301
302#include "moc_searchtaskmanager.cpp"
An Connection represents one connection of a client to the server.
Definition connection.h:39
Helper class to construct arbitrary SQL queries.
@ InnerJoin
NOTE: only supported for UPDATE and SELECT queries.
QString name() const
Helper integration between Akonadi and Qt.
KSERVICE_EXPORT KService::List query(FilterFunc filterFunc)
KIOCORE_EXPORT QStringList list(const QString &fileClass)
const QList< QKeySequence > & end()
A glue between Qt and the standard library.
qint64 currentMSecsSinceEpoch()
QDBusConnection sessionBus()
void append(QList< T > &&value)
bool contains(const AT &value) const const
T & first()
bool isEmpty() const const
void remove(qsizetype i, qsizetype n)
void reserve(qsizetype size)
T value(qsizetype i) const const
typedef Iterator
iterator begin()
bool contains(const Key &key) const const
iterator end()
iterator erase(const_iterator first, const_iterator last)
iterator find(const Key &key)
iterator insert(const Key &key, const T &value)
bool isEmpty() const const
T take(const Key &key)
T value(const Key &key, const T &defaultValue) const const
void lock()
void unlock()
T qobject_cast(QObject *object)
qsizetype count() const const
QTaskBuilder< Task > task(Task &&task)
bool wait(QMutex *lockedMutex, QDeadlineTimer deadline)
This file is part of the KDE documentation.
Documentation copyright © 1996-2024 The KDE developers.
Generated on Tue Mar 26 2024 11:13:38 by doxygen 1.10.0 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.