Akonadi

itemretrievalmanager.cpp
1/*
2 SPDX-FileCopyrightText: 2009 Volker Krause <vkrause@kde.org>
3
4 SPDX-License-Identifier: LGPL-2.0-or-later
5*/
6
7#include "itemretrievalmanager.h"
8#include "akonadiserver_debug.h"
9#include "itemretrievaljob.h"
10
11#include "resourceinterface.h"
12
13#include "private/dbus_p.h"
14
15#include <QDBusConnection>
16#include <QDBusConnectionInterface>
17#include <QScopedPointer>
18
19using namespace Akonadi;
20using namespace Akonadi::Server;
21
22Q_DECLARE_METATYPE(Akonadi::Server::ItemRetrievalResult)
23
24class ItemRetrievalJobFactory : public AbstractItemRetrievalJobFactory
25{
26 AbstractItemRetrievalJob *retrievalJob(ItemRetrievalRequest request, QObject *parent) override
27 {
28 return new ItemRetrievalJob(std::move(request), parent);
29 }
30};
31
33 : ItemRetrievalManager(std::make_unique<ItemRetrievalJobFactory>(), parent)
34{
35}
36
37ItemRetrievalManager::ItemRetrievalManager(std::unique_ptr<AbstractItemRetrievalJobFactory> factory, QObject *parent)
38 : AkThread(QStringLiteral("ItemRetrievalManager"), QThread::HighPriority, parent)
39 , mJobFactory(std::move(factory))
40{
41 qRegisterMetaType<ItemRetrievalResult>("Akonadi::Server::ItemRetrievalResult");
42 qDBusRegisterMetaType<QByteArrayList>();
43}
44
45ItemRetrievalManager::~ItemRetrievalManager()
46{
47 quitThread();
48}
49
50void ItemRetrievalManager::init()
51{
52 AkThread::init();
53
55 connect(conn.interface(), &QDBusConnectionInterface::serviceOwnerChanged, this, &ItemRetrievalManager::serviceOwnerChanged);
56 connect(this, &ItemRetrievalManager::requestAdded, this, &ItemRetrievalManager::processRequest, Qt::QueuedConnection);
57}
58
59// called within the retrieval thread
60void ItemRetrievalManager::serviceOwnerChanged(const QString &serviceName, const QString &oldOwner, const QString &newOwner)
61{
62 Q_UNUSED(newOwner)
63 if (oldOwner.isEmpty()) {
64 return;
65 }
66 const auto service = DBus::parseAgentServiceName(serviceName);
67 if (!service.has_value() || service->agentType != DBus::Resource) {
68 return;
69 }
70 qCDebug(AKONADISERVER_LOG) << "ItemRetrievalManager lost connection to resource" << serviceName << ", discarding cached interface";
71 mResourceInterfaces.erase(service->identifier);
72}
73
74// called within the retrieval thread
75org::freedesktop::Akonadi::Resource *ItemRetrievalManager::resourceInterface(const QString &id)
76{
77 if (id.isEmpty()) {
78 return nullptr;
79 }
80
81 auto ifaceIt = mResourceInterfaces.find(id);
82 if (ifaceIt != mResourceInterfaces.cend() && ifaceIt->second->isValid()) {
83 return ifaceIt->second.get();
84 }
85
86 auto iface =
87 std::make_unique<org::freedesktop::Akonadi::Resource>(DBus::agentServiceName(id, DBus::Resource), QStringLiteral("/"), QDBusConnection::sessionBus());
88 if (!iface->isValid()) {
89 qCCritical(AKONADISERVER_LOG,
90 "Cannot connect to agent instance with identifier '%s', error message: '%s'",
91 qUtf8Printable(id),
92 qUtf8Printable(iface ? iface->lastError().message() : QString()));
93 return nullptr;
94 }
95 // DBus calls can take some time to reply -- e.g. if a huge local mbox has to be parsed first.
96 iface->setTimeout(5 * 60 * 1000); // 5 minutes, rather than 25 seconds
97 std::tie(ifaceIt, std::ignore) = mResourceInterfaces.emplace(id, std::move(iface));
98 return ifaceIt->second.get();
99}
100
101// called from any thread
103{
104 QWriteLocker locker(&mLock);
105 qCDebug(AKONADISERVER_LOG) << "ItemRetrievalManager posting retrieval request for items" << req.ids << "to" << req.resourceId << ". There are"
106 << mPendingRequests.size() << "request queues and" << mPendingRequests[req.resourceId].size() << "items mine";
107 mPendingRequests[req.resourceId].emplace_back(std::move(req));
108 locker.unlock();
109
110 Q_EMIT requestAdded();
111}
112
113QList<AbstractItemRetrievalJob *> ItemRetrievalManager::scheduleJobsForIdleResourcesLocked()
114{
116 for (auto it = mPendingRequests.begin(); it != mPendingRequests.end();) {
117 if (it->second.empty()) {
118 it = mPendingRequests.erase(it);
119 continue;
120 }
121
122 if (!mCurrentJobs.contains(it->first) || mCurrentJobs.value(it->first) == nullptr) {
123 // TODO: check if there is another one for the same uid with more parts requested
124 auto req = std::move(it->second.front());
125 it->second.pop_front();
126 Q_ASSERT(req.resourceId == it->first);
127 auto job = mJobFactory->retrievalJob(std::move(req), this);
128 connect(job, &AbstractItemRetrievalJob::requestCompleted, this, &ItemRetrievalManager::retrievalJobFinished);
129 mCurrentJobs.insert(job->request().resourceId, job);
130 // delay job execution until after we unlocked the mutex, since the job can emit the finished signal immediately in some cases
131 newJobs.append(job);
132 qCDebug(AKONADISERVER_LOG) << "ItemRetrievalJob" << job << "started for request" << job->request().id;
133 }
134 ++it;
135 }
136
137 return newJobs;
138}
139
140// called within the retrieval thread
141void ItemRetrievalManager::processRequest()
142{
143 QWriteLocker locker(&mLock);
144 // look for idle resources
145 auto newJobs = scheduleJobsForIdleResourcesLocked();
146 // someone asked as to process requests although everything is done already, he might still be waiting
147 if (mPendingRequests.empty() && mCurrentJobs.isEmpty() && newJobs.isEmpty()) {
148 return;
149 }
150 locker.unlock();
151
152 // Start the jobs
153 for (auto job : newJobs) {
154 if (auto j = qobject_cast<ItemRetrievalJob *>(job)) {
155 j->setInterface(resourceInterface(j->request().resourceId));
156 }
157 job->start();
158 }
159}
160
161namespace
162{
163bool isSubsetOf(const QByteArrayList &superset, const QByteArrayList &subset)
164{
165 // For very small lists like these, this is faster than copy, sort and std::include
166 return std::all_of(subset.cbegin(), subset.cend(), [&superset](const auto &val) {
167 return superset.contains(val);
168 });
169}
170
171}
172
173void ItemRetrievalManager::retrievalJobFinished(AbstractItemRetrievalJob *job)
174{
175 const auto &request = job->request();
176 const auto &result = job->result();
177
178 if (result.errorMsg.has_value()) {
179 qCWarning(AKONADISERVER_LOG) << "ItemRetrievalJob for request" << request.id << "finished with error:" << *result.errorMsg;
180 } else {
181 qCInfo(AKONADISERVER_LOG) << "ItemRetrievalJob for request" << request.id << "finished";
182 }
183
184 QWriteLocker locker(&mLock);
185 Q_ASSERT(mCurrentJobs.contains(request.resourceId));
186 mCurrentJobs.remove(request.resourceId);
187 // Check if there are any pending requests that are satisfied by this retrieval job
188 auto &requests = mPendingRequests[request.resourceId];
189 for (auto it = requests.begin(); it != requests.end();) {
190 // TODO: also complete requests that are subset of the completed one
191 if (it->ids == request.ids && isSubsetOf(request.parts, it->parts)) {
192 qCDebug(AKONADISERVER_LOG) << "Someone else requested items " << request.ids << "as well, marking as processed.";
193 ItemRetrievalResult otherResult{std::move(*it)};
194 otherResult.errorMsg = result.errorMsg;
195 Q_EMIT requestFinished(otherResult);
196 it = requests.erase(it);
197 } else {
198 ++it;
199 }
200 }
201 locker.unlock();
202
203 Q_EMIT requestFinished(result);
204 Q_EMIT requestAdded(); // trigger processRequest() again, in case there is more in the queues
205}
206
207// Can be called from any thread
208void ItemRetrievalManager::triggerCollectionSync(const QString &resource, qint64 colId)
209{
210 QTimer::singleShot(0, this, [this, resource, colId]() {
211 if (auto interface = resourceInterface(resource)) {
212 interface->synchronizeCollection(colId);
213 }
214 });
215}
216
217void ItemRetrievalManager::triggerCollectionTreeSync(const QString &resource)
218{
219 QTimer::singleShot(0, this, [this, resource]() {
220 if (auto interface = resourceInterface(resource)) {
221 interface->synchronizeCollectionTree();
222 }
223 });
224}
225
226#include "moc_itemretrievalmanager.cpp"
Async D-Bus retrieval, no modification of the request (thus no need for locking)
Manages and processes item retrieval requests.
QReadWriteLock mLock
Protects mPendingRequests and every Request object posted to it.
QHash< QString, AbstractItemRetrievalJob * > mCurrentJobs
Currently running jobs, one per resource.
ItemRetrievalManager(QObject *parent=nullptr)
Use AkThread::create() to create and start a new ItemRetrievalManager thread.
virtual void requestItemDelivery(ItemRetrievalRequest request)
Added for convenience.
std::unordered_map< QString, std::list< ItemRetrievalRequest > > mPendingRequests
Pending requests queues, one per resource.
Details of a single item retrieval request.
Helper integration between Akonadi and Qt.
QDBusConnectionInterface * interface() const const
QDBusConnection sessionBus()
void serviceOwnerChanged(const QString &name, const QString &oldOwner, const QString &newOwner)
bool contains(const Key &key) const const
iterator insert(const Key &key, const T &value)
bool isEmpty() const const
bool remove(const Key &key)
T value(const Key &key) const const
void append(QList< T > &&value)
const_iterator cbegin() const const
const_iterator cend() const const
bool isEmpty() const const
Q_EMITQ_EMIT
QMetaObject::Connection connect(const QObject *sender, PointerToMemberFunction signal, Functor functor)
T qobject_cast(QObject *object)
bool isEmpty() const const
qsizetype size() const const
QueuedConnection
This file is part of the KDE documentation.
Documentation copyright © 1996-2025 The KDE developers.
Generated on Fri Jan 3 2025 11:58:20 by doxygen 1.12.0 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.