Akonadi

searchtaskmanager.cpp
1 /*
2  SPDX-FileCopyrightText: 2013, 2014 Daniel Vr├ítil <[email protected]>
3 
4  SPDX-License-Identifier: LGPL-2.0-or-later
5 */
6 
7 #include "searchtaskmanager.h"
8 #include "agentsearchinstance.h"
9 #include "akonadiserver_search_debug.h"
10 #include "connection.h"
11 #include "entities.h"
12 #include "storage/selectquerybuilder.h"
13 
14 #include <private/dbus_p.h>
15 
16 #include <QDBusConnection>
17 #include <QSqlError>
18 #include <QTime>
19 #include <QTimer>
20 #include <QDeadlineTimer>
21 using namespace Akonadi;
22 using namespace Akonadi::Server;
23 
24 SearchTaskManager::SearchTaskManager()
25  : AkThread(QStringLiteral("SearchTaskManager"))
26  , mShouldStop(false)
27 {
28  QTimer::singleShot(0, this, &SearchTaskManager::searchLoop);
29 }
30 
31 SearchTaskManager::~SearchTaskManager()
32 {
33  QMutexLocker locker(&mLock);
34  mShouldStop = true;
35  mWait.wakeAll();
36  locker.unlock();
37 
38  quitThread();
39 
40  mInstancesLock.lock();
41  qDeleteAll(mInstances);
42  mInstancesLock.unlock();
43 }
44 
45 void SearchTaskManager::registerInstance(const QString &id)
46 {
47  QMutexLocker locker(&mInstancesLock);
48 
49  qCDebug(AKONADISERVER_SEARCH_LOG) << "SearchManager::registerInstance(" << id << ")";
50 
51  AgentSearchInstance *instance = mInstances.value(id);
52  if (instance) {
53  return; // already registered
54  }
55 
56  instance = new AgentSearchInstance(id, *this);
57  if (!instance->init()) {
58  qCDebug(AKONADISERVER_SEARCH_LOG) << "Failed to initialize Search agent";
59  delete instance;
60  return;
61  }
62 
63  qCDebug(AKONADISERVER_SEARCH_LOG) << "Registering search instance " << id;
64  mInstances.insert(id, instance);
65 }
66 
67 void SearchTaskManager::unregisterInstance(const QString &id)
68 {
69  QMutexLocker locker(&mInstancesLock);
70 
72  if (it != mInstances.end()) {
73  qCDebug(AKONADISERVER_SEARCH_LOG) << "Unregistering search instance" << id;
74  it.value()->deleteLater();
75  mInstances.erase(it);
76  }
77 }
78 
79 void SearchTaskManager::addTask(SearchTask *task)
80 {
81  QueryBuilder qb(Collection::tableName());
82  qb.addJoin(QueryBuilder::InnerJoin, Resource::tableName(), Collection::resourceIdFullColumnName(), Resource::idFullColumnName());
83  qb.addColumn(Collection::idFullColumnName());
84  qb.addColumn(Resource::nameFullColumnName());
85 
86  Q_ASSERT(!task->collections.isEmpty());
87  QVariantList list;
88  list.reserve(task->collections.size());
89  for (qint64 collection : std::as_const(task->collections)) {
90  list << collection;
91  }
92  qb.addValueCondition(Collection::idFullColumnName(), Query::In, list);
93 
94  if (!qb.exec()) {
95  throw SearchException(qb.query().lastError().text());
96  }
97 
98  QSqlQuery query = qb.query();
99  if (!query.next()) {
100  return;
101  }
102 
103  mInstancesLock.lock();
104 
105  org::freedesktop::Akonadi::AgentManager agentManager(DBus::serviceName(DBus::Control), QStringLiteral("/AgentManager"), QDBusConnection::sessionBus());
106  do {
107  const QString resourceId = query.value(1).toString();
108  if (!mInstances.contains(resourceId)) {
109  qCDebug(AKONADISERVER_SEARCH_LOG) << "Resource" << resourceId << "does not implement Search interface, skipping";
110  } else if (!agentManager.agentInstanceOnline(resourceId)) {
111  qCDebug(AKONADISERVER_SEARCH_LOG) << "Agent" << resourceId << "is offline, skipping";
112  } else if (agentManager.agentInstanceStatus(resourceId) > 2) { // 2 == Broken, 3 == Not Configured
113  qCDebug(AKONADISERVER_SEARCH_LOG) << "Agent" << resourceId << "is broken or not configured";
114  } else {
115  const qint64 collectionId = query.value(0).toLongLong();
116  qCDebug(AKONADISERVER_SEARCH_LOG) << "Enqueued search query (" << resourceId << ", " << collectionId << ")";
117  task->queries << qMakePair(resourceId, collectionId);
118  }
119  } while (query.next());
120  mInstancesLock.unlock();
121 
122  QMutexLocker locker(&mLock);
123  mTasklist.append(task);
124  mWait.wakeAll();
125 }
126 
127 void SearchTaskManager::pushResults(const QByteArray &searchId, const QSet<qint64> &ids, Connection *connection)
128 {
129  Q_UNUSED(searchId)
130 
131  const auto resourceName = connection->context().resource().name();
132  qCDebug(AKONADISERVER_SEARCH_LOG) << ids.count() << "results for search" << searchId << "pushed from" << resourceName;
133 
134  QMutexLocker locker(&mLock);
135  ResourceTask *task = mRunningTasks.take(resourceName);
136  if (!task) {
137  qCDebug(AKONADISERVER_SEARCH_LOG) << "No running task for" << resourceName << " - maybe it has timed out?";
138  return;
139  }
140 
141  if (task->parentTask->id != searchId) {
142  qCDebug(AKONADISERVER_SEARCH_LOG) << "Received results for different search - maybe the original task has timed out?";
143  qCDebug(AKONADISERVER_SEARCH_LOG) << "Search is" << searchId << ", but task is" << task->parentTask->id;
144  return;
145  }
146 
147  task->results = ids;
148  mPendingResults.append(task);
149 
150  mWait.wakeAll();
151 }
152 
153 bool SearchTaskManager::allResourceTasksCompleted(SearchTask *agentSearchTask) const
154 {
155  // Check for queries pending to be dispatched
156  if (!agentSearchTask->queries.isEmpty()) {
157  return false;
158  }
159 
160  // Check for running queries
163  for (; it != end; ++it) {
164  if (it.value()->parentTask == agentSearchTask) {
165  return false;
166  }
167  }
168 
169  return true;
170 }
171 
172 SearchTaskManager::TasksMap::Iterator SearchTaskManager::cancelRunningTask(TasksMap::Iterator &iter)
173 {
174  ResourceTask *task = iter.value();
175  SearchTask *parentTask = task->parentTask;
176  QMutexLocker locker(&parentTask->sharedLock);
177  // erase the task before allResourceTasksCompleted
178  SearchTaskManager::TasksMap::Iterator it = mRunningTasks.erase(iter);
179  // We're not clearing the results since we don't want to clear successful results from other resources
180  parentTask->complete = allResourceTasksCompleted(parentTask);
181  parentTask->notifier.wakeAll();
182  delete task;
183 
184  return it;
185 }
186 
187 void SearchTaskManager::searchLoop()
188 {
189  qint64 timeout = ULONG_MAX;
190 
191  QMutexLocker locker(&mLock);
192 
193  for (;;) {
194  qCDebug(AKONADISERVER_SEARCH_LOG) << "Search loop is waiting, will wake again in" << timeout << "ms";
195  mWait.wait(&mLock, QDeadlineTimer(QDeadlineTimer::Forever));
196  if (mShouldStop) {
197  for (SearchTask *task : std::as_const(mTasklist)) {
198  QMutexLocker locker(&task->sharedLock);
199  task->queries.clear();
200  task->notifier.wakeAll();
201  }
202 
203  QMap<QString, ResourceTask *>::Iterator it = mRunningTasks.begin();
204  for (; it != mRunningTasks.end();) {
205  if (mTasklist.contains(it.value()->parentTask)) {
206  delete it.value();
207  it = mRunningTasks.erase(it);
208  continue;
209  }
210  it = cancelRunningTask(it);
211  }
212 
213  break;
214  }
215 
216  // First notify about available results
217  while (!mPendingResults.isEmpty()) {
218  ResourceTask *finishedTask = mPendingResults.first();
219  mPendingResults.remove(0);
220  qCDebug(AKONADISERVER_SEARCH_LOG) << "Pending results from" << finishedTask->resourceId << "for collection" << finishedTask->collectionId
221  << "for search" << finishedTask->parentTask->id << "available!";
222  SearchTask *parentTask = finishedTask->parentTask;
223  QMutexLocker locker(&parentTask->sharedLock);
224  // We need to append, this agent search task is shared
225  parentTask->pendingResults += finishedTask->results;
226  parentTask->complete = allResourceTasksCompleted(parentTask);
227  parentTask->notifier.wakeAll();
228  delete finishedTask;
229  }
230 
231  // No check whether there are any tasks running longer than 1 minute and kill them
232  QMap<QString, ResourceTask *>::Iterator it = mRunningTasks.begin();
233  const qint64 now = QDateTime::currentMSecsSinceEpoch();
234  for (; it != mRunningTasks.end();) {
235  ResourceTask *task = it.value();
236  if (now - task->timestamp > 60 * 1000) {
237  // Remove the task - and signal to parent task that it has "finished" without results
238  qCDebug(AKONADISERVER_SEARCH_LOG) << "Resource task" << task->resourceId << "for search" << task->parentTask->id << "timed out!";
239  it = cancelRunningTask(it);
240  } else {
241  ++it;
242  }
243  }
244 
245  if (!mTasklist.isEmpty()) {
246  SearchTask *task = mTasklist.first();
247  qCDebug(AKONADISERVER_SEARCH_LOG) << "Search task" << task->id << "available!";
248  if (task->queries.isEmpty()) {
249  qCDebug(AKONADISERVER_SEARCH_LOG) << "nothing to do for task";
250  QMutexLocker locker(&task->sharedLock);
251  // After this the AgentSearchTask will be destroyed
252  task->complete = true;
253  task->notifier.wakeAll();
254  mTasklist.remove(0);
255  continue;
256  }
257 
258  for (auto it = task->queries.begin(); it != task->queries.end();) {
259  if (!mRunningTasks.contains(it->first)) {
260  const auto &[resource, colId] = *it;
261  qCDebug(AKONADISERVER_SEARCH_LOG) << "\t Sending query for collection" << colId << "to resource" << resource;
262  auto rTask = new ResourceTask;
263  rTask->resourceId = resource;
264  rTask->collectionId = colId;
265  rTask->parentTask = task;
266  rTask->timestamp = QDateTime::currentMSecsSinceEpoch();
267  mRunningTasks.insert(resource, rTask);
268 
269  mInstancesLock.lock();
270  AgentSearchInstance *instance = mInstances.value(resource);
271  if (!instance) {
272  mInstancesLock.unlock();
273  // Resource disappeared in the meanwhile
274  continue;
275  }
276 
277  instance->search(task->id, task->query, colId);
278  mInstancesLock.unlock();
279 
280  task->sharedLock.lock();
281  it = task->queries.erase(it);
282  task->sharedLock.unlock();
283  } else {
284  ++it;
285  }
286  }
287  // Yay! We managed to dispatch all requests!
288  if (task->queries.isEmpty()) {
289  qCDebug(AKONADISERVER_SEARCH_LOG) << "All queries from task" << task->id << "dispatched!";
290  mTasklist.remove(0);
291  }
292 
293  timeout = 60 * 1000; // check whether all tasks have finished within a minute
294  } else {
295  if (mRunningTasks.isEmpty()) {
296  timeout = ULONG_MAX;
297  }
298  }
299  }
300 }
int count() const const
const T value(const Key &key, const T &defaultValue) const const
@ InnerJoin
NOTE: only supported for UPDATE and SELECT queries.
Definition: querybuilder.h:48
QMap::iterator begin()
qint64 currentMSecsSinceEpoch()
KIOFILEWIDGETS_EXPORT QStringList list(const QString &fileClass)
void reserve(int alloc)
int remove(const Key &key)
KSERVICE_EXPORT KService::List query(FilterFunc filterFunc)
QMap::iterator end()
QMap::iterator find(const Key &key)
QDBusConnection sessionBus()
QMap::iterator erase(QMap::iterator pos)
QString & insert(int position, QChar ch)
T & first()
An Connection represents one connection of a client to the server.
Definition: connection.h:46
const QList< QKeySequence > & end()
T value(int i) const const
Helper class to construct arbitrary SQL queries.
Definition: querybuilder.h:31
typedef Iterator
Helper integration between Akonadi and Qt.
This file is part of the KDE documentation.
Documentation copyright © 1996-2022 The KDE developers.
Generated on Sat Jul 2 2022 06:41:49 by doxygen 1.8.17 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.