Akonadi

itemretrievalmanager.cpp
1 /*
2  SPDX-FileCopyrightText: 2009 Volker Krause <[email protected]>
3 
4  SPDX-License-Identifier: LGPL-2.0-or-later
5 */
6 
7 #include "itemretrievalmanager.h"
8 #include "akonadiserver_debug.h"
9 #include "itemretrievaljob.h"
10 
11 #include "resourceinterface.h"
12 
13 #include <private/dbus_p.h>
14 
15 #include <QDBusConnection>
16 #include <QDBusConnectionInterface>
17 #include <QScopedPointer>
18 
19 using namespace Akonadi;
20 using namespace Akonadi::Server;
21 
22 Q_DECLARE_METATYPE(Akonadi::Server::ItemRetrievalResult)
23 
24 class ItemRetrievalJobFactory : public AbstractItemRetrievalJobFactory
25 {
26  AbstractItemRetrievalJob *retrievalJob(ItemRetrievalRequest request, QObject *parent) override
27  {
28  return new ItemRetrievalJob(std::move(request), parent);
29  }
30 };
31 
33  : ItemRetrievalManager(std::make_unique<ItemRetrievalJobFactory>(), parent)
34 {
35 }
36 
37 ItemRetrievalManager::ItemRetrievalManager(std::unique_ptr<AbstractItemRetrievalJobFactory> factory, QObject *parent)
38  : AkThread(QStringLiteral("ItemRetrievalManager"), QThread::HighPriority, parent)
39  , mJobFactory(std::move(factory))
40 {
41  qRegisterMetaType<ItemRetrievalResult>("Akonadi::Server::ItemRetrievalResult");
42  qDBusRegisterMetaType<QByteArrayList>();
43 }
44 
45 ItemRetrievalManager::~ItemRetrievalManager()
46 {
47  quitThread();
48 }
49 
50 void ItemRetrievalManager::init()
51 {
52  AkThread::init();
53 
55  connect(conn.interface(), &QDBusConnectionInterface::serviceOwnerChanged, this, &ItemRetrievalManager::serviceOwnerChanged);
56  connect(this, &ItemRetrievalManager::requestAdded, this, &ItemRetrievalManager::processRequest, Qt::QueuedConnection);
57 }
58 
59 // called within the retrieval thread
60 void ItemRetrievalManager::serviceOwnerChanged(const QString &serviceName, const QString &oldOwner, const QString &newOwner)
61 {
62  Q_UNUSED(newOwner)
63  if (oldOwner.isEmpty()) {
64  return;
65  }
66  const auto service = DBus::parseAgentServiceName(serviceName);
67  if (!service.has_value() || service->agentType != DBus::Resource) {
68  return;
69  }
70  qCDebug(AKONADISERVER_LOG) << "ItemRetrievalManager lost connection to resource" << serviceName << ", discarding cached interface";
71  mResourceInterfaces.erase(service->identifier);
72 }
73 
74 // called within the retrieval thread
75 org::freedesktop::Akonadi::Resource *ItemRetrievalManager::resourceInterface(const QString &id)
76 {
77  if (id.isEmpty()) {
78  return nullptr;
79  }
80 
81  auto ifaceIt = mResourceInterfaces.find(id);
82  if (ifaceIt != mResourceInterfaces.cend() && ifaceIt->second->isValid()) {
83  return ifaceIt->second.get();
84  }
85 
86  auto iface =
87  std::make_unique<org::freedesktop::Akonadi::Resource>(DBus::agentServiceName(id, DBus::Resource), QStringLiteral("/"), QDBusConnection::sessionBus());
88  if (!iface->isValid()) {
89  qCCritical(AKONADISERVER_LOG,
90  "Cannot connect to agent instance with identifier '%s', error message: '%s'",
91  qUtf8Printable(id),
92  qUtf8Printable(iface ? iface->lastError().message() : QString()));
93  return nullptr;
94  }
95  // DBus calls can take some time to reply -- e.g. if a huge local mbox has to be parsed first.
96  iface->setTimeout(5 * 60 * 1000); // 5 minutes, rather than 25 seconds
97  std::tie(ifaceIt, std::ignore) = mResourceInterfaces.emplace(id, std::move(iface));
98  return ifaceIt->second.get();
99 }
100 
101 // called from any thread
103 {
104  QWriteLocker locker(&mLock);
105  qCDebug(AKONADISERVER_LOG) << "ItemRetrievalManager posting retrieval request for items" << req.ids << "to" << req.resourceId << ". There are"
106  << mPendingRequests.size() << "request queues and" << mPendingRequests[req.resourceId].size() << "items mine";
107  mPendingRequests[req.resourceId].emplace_back(std::move(req));
108  locker.unlock();
109 
110  Q_EMIT requestAdded();
111 }
112 
113 QVector<AbstractItemRetrievalJob *> ItemRetrievalManager::scheduleJobsForIdleResourcesLocked()
114 {
116  for (auto it = mPendingRequests.begin(); it != mPendingRequests.end();) {
117  if (it->second.empty()) {
118  it = mPendingRequests.erase(it);
119  continue;
120  }
121 
122  if (!mCurrentJobs.contains(it->first) || mCurrentJobs.value(it->first) == nullptr) {
123  // TODO: check if there is another one for the same uid with more parts requested
124  auto req = std::move(it->second.front());
125  it->second.pop_front();
126  Q_ASSERT(req.resourceId == it->first);
127  auto job = mJobFactory->retrievalJob(std::move(req), this);
128  connect(job, &AbstractItemRetrievalJob::requestCompleted, this, &ItemRetrievalManager::retrievalJobFinished);
129  mCurrentJobs.insert(job->request().resourceId, job);
130  // delay job execution until after we unlocked the mutex, since the job can emit the finished signal immediately in some cases
131  newJobs.append(job);
132  qCDebug(AKONADISERVER_LOG) << "ItemRetrievalJob" << job << "started for request" << job->request().id;
133  }
134  ++it;
135  }
136 
137  return newJobs;
138 }
139 
140 // called within the retrieval thread
141 void ItemRetrievalManager::processRequest()
142 {
143  QWriteLocker locker(&mLock);
144  // look for idle resources
145  auto newJobs = scheduleJobsForIdleResourcesLocked();
146  // someone asked as to process requests although everything is done already, he might still be waiting
147  if (mPendingRequests.empty() && mCurrentJobs.isEmpty() && newJobs.isEmpty()) {
148  return;
149  }
150  locker.unlock();
151 
152  // Start the jobs
153  for (auto job : newJobs) {
154  if (auto j = qobject_cast<ItemRetrievalJob *>(job)) {
155  j->setInterface(resourceInterface(j->request().resourceId));
156  }
157  job->start();
158  }
159 }
160 
161 namespace
162 {
163 bool isSubsetOf(const QByteArrayList &superset, const QByteArrayList &subset)
164 {
165  // For very small lists like these, this is faster than copy, sort and std::include
166  return std::all_of(subset.cbegin(), subset.cend(), [&superset](const auto &val) {
167  return superset.contains(val);
168  });
169 }
170 
171 }
172 
173 void ItemRetrievalManager::retrievalJobFinished(AbstractItemRetrievalJob *job)
174 {
175  const auto &request = job->request();
176  const auto &result = job->result();
177 
178  if (result.errorMsg.has_value()) {
179  qCWarning(AKONADISERVER_LOG) << "ItemRetrievalJob for request" << request.id << "finished with error:" << *result.errorMsg;
180  } else {
181  qCInfo(AKONADISERVER_LOG) << "ItemRetrievalJob for request" << request.id << "finished";
182  }
183 
184  QWriteLocker locker(&mLock);
185  Q_ASSERT(mCurrentJobs.contains(request.resourceId));
186  mCurrentJobs.remove(request.resourceId);
187  // Check if there are any pending requests that are satisfied by this retrieval job
188  auto &requests = mPendingRequests[request.resourceId];
189  for (auto it = requests.begin(); it != requests.end();) {
190  // TODO: also complete requests that are subset of the completed one
191  if (it->ids == request.ids && isSubsetOf(request.parts, it->parts)) {
192  qCDebug(AKONADISERVER_LOG) << "Someone else requested items " << request.ids << "as well, marking as processed.";
193  ItemRetrievalResult otherResult{std::move(*it)};
194  otherResult.errorMsg = result.errorMsg;
195  Q_EMIT requestFinished(otherResult);
196  it = requests.erase(it);
197  } else {
198  ++it;
199  }
200  }
201  locker.unlock();
202 
203  Q_EMIT requestFinished(result);
204  Q_EMIT requestAdded(); // trigger processRequest() again, in case there is more in the queues
205 }
206 
207 // Can be called from any thread
208 void ItemRetrievalManager::triggerCollectionSync(const QString &resource, qint64 colId)
209 {
210  QTimer::singleShot(0, this, [this, resource, colId]() {
211  if (auto interface = resourceInterface(resource)) {
212  interface->synchronizeCollection(colId);
213  }
214  });
215 }
216 
217 void ItemRetrievalManager::triggerCollectionTreeSync(const QString &resource)
218 {
219  QTimer::singleShot(0, this, [this, resource]() {
220  if (auto interface = resourceInterface(resource)) {
221  interface->synchronizeCollectionTree();
222  }
223  });
224 }
ItemRetrievalManager(QObject *parent=nullptr)
Use AkThread::create() to create and start a new ItemRetrievalManager thread.
const T value(const Key &key) const const
bool isEmpty() const const
Q_EMITQ_EMIT
void append(const T &value)
Async D-Bus retrieval, no modification of the request (thus no need for locking)
QMetaObject::Connection connect(const QObject *sender, const char *signal, const QObject *receiver, const char *method, Qt::ConnectionType type)
QHash::iterator insert(const Key &key, const T &value)
Manages and processes item retrieval requests.
QDBusConnection sessionBus()
bool isEmpty() const const
virtual void requestItemDelivery(ItemRetrievalRequest request)
Added for convenience.
QueuedConnection
void serviceOwnerChanged(const QString &name, const QString &oldOwner, const QString &newOwner)
QList::const_iterator cend() const const
QDBusConnectionInterface * interface() const const
int remove(const Key &key)
Details of a single item retrieval request.
A glue between Qt and the standard library.
QReadWriteLock mLock
Protects mPendingRequests and every Request object posted to it.
bool isEmpty() const const
QList::const_iterator cbegin() const const
QHash< QString, AbstractItemRetrievalJob * > mCurrentJobs
Currently running jobs, one per resource.
std::unordered_map< QString, std::list< ItemRetrievalRequest > > mPendingRequests
Pending requests queues, one per resource.
bool contains(const Key &key) const const
Helper integration between Akonadi and Qt.
This file is part of the KDE documentation.
Documentation copyright © 1996-2023 The KDE developers.
Generated on Sun Jun 4 2023 03:52:47 by doxygen 1.8.17 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.