7#include "searchtaskmanager.h" 
    8#include "agentsearchinstance.h" 
    9#include "akonadiserver_search_debug.h" 
   10#include "connection.h" 
   12#include "storage/selectquerybuilder.h" 
   14#include "private/dbus_p.h" 
   16#include <QDBusConnection> 
   17#include <QDeadlineTimer> 
   22using namespace Akonadi::Server;
 
   24SearchTaskManager::SearchTaskManager()
 
   25    : AkThread(QStringLiteral(
"SearchTaskManager"))
 
   31SearchTaskManager::~SearchTaskManager()
 
   33    QMutexLocker locker(&mLock);
 
   40    mInstancesLock.lock();
 
   41    qDeleteAll(mInstances);
 
   42    mInstancesLock.unlock();
 
   45void SearchTaskManager::registerInstance(
const QString &
id)
 
   47    QMutexLocker locker(&mInstancesLock);
 
   49    qCDebug(AKONADISERVER_SEARCH_LOG) << 
"SearchManager::registerInstance(" << 
id << 
")";
 
   51    AgentSearchInstance *instance = mInstances.value(
id);
 
   56    instance = 
new AgentSearchInstance(
id, *
this);
 
   57    if (!instance->init()) {
 
   58        qCDebug(AKONADISERVER_SEARCH_LOG) << 
"Failed to initialize Search agent";
 
   63    qCDebug(AKONADISERVER_SEARCH_LOG) << 
"Registering search instance " << id;
 
   64    mInstances.
insert(
id, instance);
 
   67void SearchTaskManager::unregisterInstance(
const QString &
id)
 
   69    QMutexLocker locker(&mInstancesLock);
 
   72    if (it != mInstances.end()) {
 
   73        qCDebug(AKONADISERVER_SEARCH_LOG) << 
"Unregistering search instance" << id;
 
   74        it.
value()->deleteLater();
 
   79void SearchTaskManager::addTask(SearchTask *task)
 
   81    QueryBuilder qb(Collection::tableName());
 
   82    qb.addJoin(
QueryBuilder::InnerJoin, Resource::tableName(), Collection::resourceIdFullColumnName(), Resource::idFullColumnName());
 
   83    qb.addColumn(Collection::idFullColumnName());
 
   84    qb.addColumn(Resource::nameFullColumnName());
 
   86    Q_ASSERT(!
task->collections.isEmpty());
 
   89    for (qint64 collection : std::as_const(
task->collections)) {
 
   92    qb.addValueCondition(Collection::idFullColumnName(), Query::In, list);
 
   95        throw SearchException(qb.query().lastError().text());
 
   98    auto &
query = qb.query();
 
  103    mInstancesLock.lock();
 
  105    org::freedesktop::Akonadi::AgentManager agentManager(DBus::serviceName(DBus::Control), QStringLiteral(
"/AgentManager"), 
QDBusConnection::sessionBus());
 
  107        const QString resourceId = 
query.
value(1).toString();
 
  108        if (!mInstances.contains(resourceId)) {
 
  109            qCDebug(AKONADISERVER_SEARCH_LOG) << 
"Resource" << resourceId << 
"does not implement Search interface, skipping";
 
  110        } 
else if (!agentManager.agentInstanceOnline(resourceId)) {
 
  111            qCDebug(AKONADISERVER_SEARCH_LOG) << 
"Agent" << resourceId << 
"is offline, skipping";
 
  112        } 
else if (agentManager.agentInstanceStatus(resourceId) > 2) { 
 
  113            qCDebug(AKONADISERVER_SEARCH_LOG) << 
"Agent" << resourceId << 
"is broken or not configured";
 
  115            const qint64 collectionId = 
query.
value(0).toLongLong();
 
  116            qCDebug(AKONADISERVER_SEARCH_LOG) << 
"Enqueued search query (" << resourceId << 
", " << collectionId << 
")";
 
  117            task->queries << qMakePair(resourceId, collectionId);
 
  119    } 
while (
query.next());
 
  120    mInstancesLock.unlock();
 
  122    QMutexLocker locker(&mLock);
 
  123    mTasklist.append(task);
 
  127void SearchTaskManager::pushResults(
const QByteArray &searchId, 
const QSet<qint64> &ids, 
Connection *connection)
 
  131    const auto resourceName = connection->context().resource().
name();
 
  132    qCDebug(AKONADISERVER_SEARCH_LOG) << ids.
count() << 
"results for search" << searchId << 
"pushed from" << resourceName;
 
  134    QMutexLocker locker(&mLock);
 
  135    ResourceTask *
task = mRunningTasks.take(resourceName);
 
  137        qCDebug(AKONADISERVER_SEARCH_LOG) << 
"No running task for" << resourceName << 
" - maybe it has timed out?";
 
  141    if (
task->parentTask->id != searchId) {
 
  142        qCDebug(AKONADISERVER_SEARCH_LOG) << 
"Received results for different search - maybe the original task has timed out?";
 
  143        qCDebug(AKONADISERVER_SEARCH_LOG) << 
"Search is" << searchId << 
", but task is" << 
task->parentTask->id;
 
  148    mPendingResults.append(task);
 
  153bool SearchTaskManager::allResourceTasksCompleted(SearchTask *agentSearchTask)
 const 
  156    if (!agentSearchTask->queries.
isEmpty()) {
 
  161    QMap<QString, ResourceTask *>::const_iterator it = mRunningTasks.
begin();
 
  162    QMap<QString, ResourceTask *>::const_iterator 
end = mRunningTasks.end();
 
  163    for (; it != 
end; ++it) {
 
  164        if (it.value()->parentTask == agentSearchTask) {
 
  174    ResourceTask *
task = iter.value();
 
  175    SearchTask *parentTask = 
task->parentTask;
 
  176    QMutexLocker locker(&parentTask->sharedLock);
 
  180    parentTask->complete = allResourceTasksCompleted(parentTask);
 
  181    parentTask->notifier.
wakeAll();
 
  187void SearchTaskManager::searchLoop()
 
  189    qint64 timeout = ULONG_MAX;
 
  191    QMutexLocker locker(&mLock);
 
  194        qCDebug(AKONADISERVER_SEARCH_LOG) << 
"Search loop is waiting, will wake again in" << timeout << 
"ms";
 
  197            for (SearchTask *task : std::as_const(mTasklist)) {
 
  198                QMutexLocker locker(&
task->sharedLock);
 
  199                task->queries.clear();
 
  200                task->notifier.wakeAll();
 
  204            for (; it != mRunningTasks.
end();) {
 
  205                if (mTasklist.contains(it.
value()->parentTask)) {
 
  207                    it = mRunningTasks.
erase(it);
 
  210                it = cancelRunningTask(it);
 
  217        while (!mPendingResults.isEmpty()) {
 
  218            ResourceTask *finishedTask = mPendingResults.first();
 
  219            mPendingResults.remove(0);
 
  220            qCDebug(AKONADISERVER_SEARCH_LOG) << 
"Pending results from" << finishedTask->resourceId << 
"for collection" << finishedTask->collectionId
 
  221                                              << 
"for search" << finishedTask->parentTask->id << 
"available!";
 
  222            SearchTask *parentTask = finishedTask->parentTask;
 
  223            QMutexLocker locker(&parentTask->sharedLock);
 
  225            parentTask->pendingResults += finishedTask->results;
 
  226            parentTask->complete = allResourceTasksCompleted(parentTask);
 
  227            parentTask->notifier.
wakeAll();
 
  234        for (; it != mRunningTasks.
end();) {
 
  236            if (now - 
task->timestamp > 60 * 1000) {
 
  238                qCDebug(AKONADISERVER_SEARCH_LOG) << 
"Resource task" << 
task->resourceId << 
"for search" << 
task->parentTask->id << 
"timed out!";
 
  239                it = cancelRunningTask(it);
 
  245        if (!mTasklist.isEmpty()) {
 
  246            SearchTask *
task = mTasklist.first();
 
  247            qCDebug(AKONADISERVER_SEARCH_LOG) << 
"Search task" << 
task->id << 
"available!";
 
  248            if (
task->queries.isEmpty()) {
 
  249                qCDebug(AKONADISERVER_SEARCH_LOG) << 
"nothing to do for task";
 
  250                QMutexLocker locker(&
task->sharedLock);
 
  252                task->complete = 
true;
 
  253                task->notifier.wakeAll();
 
  258            for (
auto it = 
task->queries.begin(); it != 
task->queries.end();) {
 
  259                if (!mRunningTasks.contains(it->
first)) {
 
  260                    const auto &[resource, colId] = *it;
 
  261                    qCDebug(AKONADISERVER_SEARCH_LOG) << 
"\t Sending query for collection" << colId << 
"to resource" << resource;
 
  262                    auto rTask = 
new ResourceTask;
 
  263                    rTask->resourceId = resource;
 
  264                    rTask->collectionId = colId;
 
  265                    rTask->parentTask = 
task;
 
  267                    mRunningTasks.insert(resource, rTask);
 
  269                    mInstancesLock.lock();
 
  270                    AgentSearchInstance *instance = mInstances.value(resource);
 
  272                        mInstancesLock.unlock();
 
  277                    instance->search(
task->id, 
task->query, colId);
 
  278                    mInstancesLock.unlock();
 
  280                    task->sharedLock.lock();
 
  281                    it = 
task->queries.erase(it);
 
  282                    task->sharedLock.unlock();
 
  288            if (
task->queries.isEmpty()) {
 
  289                qCDebug(AKONADISERVER_SEARCH_LOG) << 
"All queries from task" << 
task->id << 
"dispatched!";
 
  295            if (mRunningTasks.isEmpty()) {
 
  302#include "moc_searchtaskmanager.cpp" 
An Connection represents one connection of a client to the server.
 
@ InnerJoin
NOTE: only supported for UPDATE and SELECT queries.
 
Helper integration between Akonadi and Qt.
 
KSERVICE_EXPORT KService::List query(FilterFunc filterFunc)
 
KIOCORE_EXPORT QStringList list(const QString &fileClass)
 
const QList< QKeySequence > & end()
 
qint64 currentMSecsSinceEpoch()
 
QDBusConnection sessionBus()
 
bool isEmpty() const const
 
void reserve(qsizetype size)
 
T value(qsizetype i) const const
 
iterator erase(const_iterator first, const_iterator last)
 
T value(const Key &key, const T &defaultValue) const const
 
qsizetype count() const const
 
QString & insert(qsizetype position, QChar ch)
 
QTaskBuilder< Task > task(Task &&task)