Akonadi

searchtaskmanager.cpp
1 /*
2  Copyright (c) 2013, 2014 Daniel Vrátil <[email protected]>
3 
4  This library is free software; you can redistribute it and/or modify it
5  under the terms of the GNU Library General Public License as published by
6  the Free Software Foundation; either version 2 of the License, or (at your
7  option) any later version.
8 
9  This library is distributed in the hope that it will be useful, but WITHOUT
10  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
12  License for more details.
13 
14  You should have received a copy of the GNU Library General Public License
15  along with this library; see the file COPYING.LIB. If not, write to the
16  Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17  02110-1301, USA.
18 */
19 
20 #include "searchtaskmanager.h"
21 #include "agentsearchinstance.h"
22 #include "connection.h"
23 #include "storage/selectquerybuilder.h"
24 #include "entities.h"
25 #include "akonadiserver_search_debug.h"
26 
27 #include <private/dbus_p.h>
28 
29 #include <QSqlError>
30 #include <QTimer>
31 #include <QTime>
32 #include <QDBusConnection>
33 #if QT_VERSION >= QT_VERSION_CHECK(5, 15, 0)
34 #include <QDeadlineTimer>
35 #endif
36 using namespace Akonadi;
37 using namespace Akonadi::Server;
38 
39 SearchTaskManager::SearchTaskManager()
40  : AkThread(QStringLiteral("SearchTaskManager"))
41  , mShouldStop(false)
42 {
43  QTimer::singleShot(0, this, &SearchTaskManager::searchLoop);
44 }
45 
46 SearchTaskManager::~SearchTaskManager()
47 {
48  QMutexLocker locker(&mLock);
49  mShouldStop = true;
50  mWait.wakeAll();
51  locker.unlock();
52 
53  quitThread();
54 
55  mInstancesLock.lock();
56  qDeleteAll(mInstances);
57  mInstancesLock.unlock();
58 }
59 
60 void SearchTaskManager::registerInstance(const QString &id)
61 {
62  QMutexLocker locker(&mInstancesLock);
63 
64  qCDebug(AKONADISERVER_SEARCH_LOG) << "SearchManager::registerInstance(" << id << ")";
65 
66  AgentSearchInstance *instance = mInstances.value(id);
67  if (instance) {
68  return; // already registered
69  }
70 
71  instance = new AgentSearchInstance(id, *this);
72  if (!instance->init()) {
73  qCDebug(AKONADISERVER_SEARCH_LOG) << "Failed to initialize Search agent";
74  delete instance;
75  return;
76  }
77 
78  qCDebug(AKONADISERVER_SEARCH_LOG) << "Registering search instance " << id;
79  mInstances.insert(id, instance);
80 }
81 
82 void SearchTaskManager::unregisterInstance(const QString &id)
83 {
84  QMutexLocker locker(&mInstancesLock);
85 
87  if (it != mInstances.end()) {
88  qCDebug(AKONADISERVER_SEARCH_LOG) << "Unregistering search instance" << id;
89  it.value()->deleteLater();
90  mInstances.erase(it);
91  }
92 }
93 
94 void SearchTaskManager::addTask(SearchTask *task)
95 {
96  QueryBuilder qb(Collection::tableName());
97  qb.addJoin(QueryBuilder::InnerJoin, Resource::tableName(),
98  Collection::resourceIdFullColumnName(),
99  Resource::idFullColumnName());
100  qb.addColumn(Collection::idFullColumnName());
101  qb.addColumn(Resource::nameFullColumnName());
102 
103  Q_ASSERT(!task->collections.isEmpty());
104  QVariantList list;
105  list.reserve(task->collections.size());
106  for (qint64 collection : qAsConst(task->collections)) {
107  list << collection;
108  }
109  qb.addValueCondition(Collection::idFullColumnName(), Query::In, list);
110 
111  if (!qb.exec()) {
112  throw SearchException(qb.query().lastError().text());
113  }
114 
115  QSqlQuery query = qb.query();
116  if (!query.next()) {
117  return;
118  }
119 
120  mInstancesLock.lock();
121 
122  org::freedesktop::Akonadi::AgentManager agentManager(DBus::serviceName(DBus::Control), QStringLiteral("/AgentManager"),
124  do {
125  const QString resourceId = query.value(1).toString();
126  if (!mInstances.contains(resourceId)) {
127  qCDebug(AKONADISERVER_SEARCH_LOG) << "Resource" << resourceId << "does not implement Search interface, skipping";
128  } else if (!agentManager.agentInstanceOnline(resourceId)) {
129  qCDebug(AKONADISERVER_SEARCH_LOG) << "Agent" << resourceId << "is offline, skipping";
130  } else if (agentManager.agentInstanceStatus(resourceId) > 2) { // 2 == Broken, 3 == Not Configured
131  qCDebug(AKONADISERVER_SEARCH_LOG) << "Agent" << resourceId << "is broken or not configured";
132  } else {
133  const qint64 collectionId = query.value(0).toLongLong();
134  qCDebug(AKONADISERVER_SEARCH_LOG) << "Enqueued search query (" << resourceId << ", " << collectionId << ")";
135  task->queries << qMakePair(resourceId, collectionId);
136  }
137  } while (query.next());
138  mInstancesLock.unlock();
139 
140  QMutexLocker locker(&mLock);
141  mTasklist.append(task);
142  mWait.wakeAll();
143 }
144 
145 void SearchTaskManager::pushResults(const QByteArray &searchId, const QSet<qint64> &ids,
146  Connection *connection)
147 {
148  Q_UNUSED(searchId);
149 
150  const auto resourceName = connection->context().resource().name();
151  qCDebug(AKONADISERVER_SEARCH_LOG) << ids.count() << "results for search" << searchId << "pushed from" << resourceName;
152 
153  QMutexLocker locker(&mLock);
154  ResourceTask *task = mRunningTasks.take(resourceName);
155  if (!task) {
156  qCDebug(AKONADISERVER_SEARCH_LOG) << "No running task for" << resourceName << " - maybe it has timed out?";
157  return;
158  }
159 
160  if (task->parentTask->id != searchId) {
161  qCDebug(AKONADISERVER_SEARCH_LOG) << "Received results for different search - maybe the original task has timed out?";
162  qCDebug(AKONADISERVER_SEARCH_LOG) << "Search is" << searchId << ", but task is" << task->parentTask->id;
163  return;
164  }
165 
166  task->results = ids;
167  mPendingResults.append(task);
168 
169  mWait.wakeAll();
170 }
171 
172 bool SearchTaskManager::allResourceTasksCompleted(SearchTask *agentSearchTask) const
173 {
174  // Check for queries pending to be dispatched
175  if (!agentSearchTask->queries.isEmpty()) {
176  return false;
177  }
178 
179  // Check for running queries
182  for (; it != end; ++it) {
183  if (it.value()->parentTask == agentSearchTask) {
184  return false;
185  }
186  }
187 
188  return true;
189 }
190 
191 SearchTaskManager::TasksMap::Iterator SearchTaskManager::cancelRunningTask(TasksMap::Iterator &iter)
192 {
193  ResourceTask *task = iter.value();
194  SearchTask *parentTask = task->parentTask;
195  QMutexLocker locker(&parentTask->sharedLock);
196  //erase the task before allResourceTasksCompleted
197  SearchTaskManager::TasksMap::Iterator it = mRunningTasks.erase(iter);
198  // We're not clearing the results since we don't want to clear successful results from other resources
199  parentTask->complete = allResourceTasksCompleted(parentTask);
200  parentTask->notifier.wakeAll();
201  delete task;
202 
203  return it;
204 }
205 
206 void SearchTaskManager::searchLoop()
207 {
208  qint64 timeout = ULONG_MAX;
209 
210  QMutexLocker locker(&mLock);
211 
212  Q_FOREVER {
213  qCDebug(AKONADISERVER_SEARCH_LOG) << "Search loop is waiting, will wake again in" << timeout << "ms";
214 #if QT_VERSION < QT_VERSION_CHECK(5, 15, 0)
215  mWait.wait(&mLock, timeout);
216 #else
217  mWait.wait(&mLock, QDeadlineTimer(QDeadlineTimer::Forever));
218 #endif
219  if (mShouldStop) {
220  Q_FOREACH (SearchTask *task, mTasklist) {
221  QMutexLocker locker(&task->sharedLock);
222  task->queries.clear();
223  task->notifier.wakeAll();
224  }
225 
226  QMap<QString, ResourceTask *>::Iterator it = mRunningTasks.begin();
227  for (; it != mRunningTasks.end();) {
228  if (mTasklist.contains(it.value()->parentTask)) {
229  delete it.value();
230  it = mRunningTasks.erase(it);
231  continue;
232  }
233  it = cancelRunningTask(it);
234  }
235 
236  break;
237  }
238 
239  // First notify about available results
240  while (!mPendingResults.isEmpty()) {
241  ResourceTask *finishedTask = mPendingResults.first();
242  mPendingResults.remove(0);
243  qCDebug(AKONADISERVER_SEARCH_LOG) << "Pending results from" << finishedTask->resourceId << "for collection" << finishedTask->collectionId << "for search" << finishedTask->parentTask->id << "available!";
244  SearchTask *parentTask = finishedTask->parentTask;
245  QMutexLocker locker(&parentTask->sharedLock);
246  // We need to append, this agent search task is shared
247  parentTask->pendingResults += finishedTask->results;
248  parentTask->complete = allResourceTasksCompleted(parentTask);
249  parentTask->notifier.wakeAll();
250  delete finishedTask;
251  }
252 
253  // No check whether there are any tasks running longer than 1 minute and kill them
254  QMap<QString, ResourceTask *>::Iterator it = mRunningTasks.begin();
255  const qint64 now = QDateTime::currentMSecsSinceEpoch();
256  for (; it != mRunningTasks.end();) {
257  ResourceTask *task = it.value();
258  if (now - task->timestamp > 60 * 1000) {
259  // Remove the task - and signal to parent task that it has "finished" without results
260  qCDebug(AKONADISERVER_SEARCH_LOG) << "Resource task" << task->resourceId << "for search" << task->parentTask->id << "timed out!";
261  it = cancelRunningTask(it);
262  } else {
263  ++it;
264  }
265  }
266 
267  if (!mTasklist.isEmpty()) {
268  SearchTask *task = mTasklist.first();
269  qCDebug(AKONADISERVER_SEARCH_LOG) << "Search task" << task->id << "available!";
270  if (task->queries.isEmpty()) {
271  qCDebug(AKONADISERVER_SEARCH_LOG) << "nothing to do for task";
272  QMutexLocker locker(&task->sharedLock);
273  //After this the AgentSearchTask will be destroyed
274  task->complete = true;
275  task->notifier.wakeAll();
276  mTasklist.remove(0);
277  continue;
278  }
279 
280  for (auto it = task->queries.begin(); it != task->queries.end();) {
281  if (!mRunningTasks.contains(it->first)) {
282  const auto &[resource, colId] = *it;
283  qCDebug(AKONADISERVER_SEARCH_LOG) << "\t Sending query for collection" << colId << "to resource" << resource;
284  ResourceTask *rTask = new ResourceTask;
285  rTask->resourceId = resource;
286  rTask->collectionId = colId;
287  rTask->parentTask = task;
288  rTask->timestamp = QDateTime::currentMSecsSinceEpoch();
289  mRunningTasks.insert(resource, rTask);
290 
291  mInstancesLock.lock();
292  AgentSearchInstance *instance = mInstances.value(resource);
293  if (!instance) {
294  mInstancesLock.unlock();
295  // Resource disappeared in the meanwhile
296  continue;
297  }
298 
299  instance->search(task->id, task->query, colId);
300  mInstancesLock.unlock();
301 
302  task->sharedLock.lock();
303  it = task->queries.erase(it);
304  task->sharedLock.unlock();
305  } else {
306  ++it;
307  }
308  }
309  // Yay! We managed to dispatch all requests!
310  if (task->queries.isEmpty()) {
311  qCDebug(AKONADISERVER_SEARCH_LOG) << "All queries from task" << task->id << "dispatched!";
312  mTasklist.remove(0);
313  }
314 
315  timeout = 60 * 1000; // check whether all tasks have finished within a minute
316  } else {
317  if (mRunningTasks.isEmpty()) {
318  timeout = ULONG_MAX;
319  }
320  }
321  }
322 }
QMap::iterator erase(QMap::iterator pos)
std::optional< QSqlQuery > query(const QString &queryStatement)
Returns the cached (and prepared) query for queryStatement.
Definition: querycache.cpp:108
void reserve(int alloc)
NOTE: only supported for UPDATE and SELECT queries.
Definition: querybuilder.h:62
QDBusConnection sessionBus()
qint64 currentMSecsSinceEpoch()
QString & insert(int position, QChar ch)
QMap::iterator end()
int count() const const
KDEGAMES_EXPORT QAction * end(const QObject *recvr, const char *slot, QObject *parent)
QMap::iterator begin()
Helper integration between Akonadi and Qt.
typedef Iterator
T & first()
Helper class to construct arbitrary SQL queries.
Definition: querybuilder.h:45
qlonglong toLongLong(bool *ok, int base) const const
QMap::iterator find(const Key &key)
KIOFILEWIDGETS_EXPORT QStringList list(const QString &fileClass)
const T value(const Key &key, const T &defaultValue) const const
int remove(const Key &key)
An Connection represents one connection of a client to the server.
Definition: connection.h:53
This file is part of the KDE documentation.
Documentation copyright © 1996-2020 The KDE developers.
Generated on Fri Jun 5 2020 23:08:56 by doxygen 1.8.11 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.