Akonadi

itemretrievalmanager.cpp
1 /*
2  Copyright (c) 2009 Volker Krause <[email protected]>
3 
4  This library is free software; you can redistribute it and/or modify it
5  under the terms of the GNU Library General Public License as published by
6  the Free Software Foundation; either version 2 of the License, or (at your
7  option) any later version.
8 
9  This library is distributed in the hope that it will be useful, but WITHOUT
10  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
12  License for more details.
13 
14  You should have received a copy of the GNU Library General Public License
15  along with this library; see the file COPYING.LIB. If not, write to the
16  Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17  02110-1301, USA.
18 */
19 
20 #include "itemretrievalmanager.h"
21 #include "itemretrievalrequest.h"
22 #include "itemretrievaljob.h"
23 #include "akonadiserver_debug.h"
24 
25 #include "resourceinterface.h"
26 
27 #include <private/dbus_p.h>
28 
29 #include <QReadWriteLock>
30 #include <QScopedPointer>
31 #include <QWaitCondition>
32 #include <QDBusConnection>
33 #include <QDBusConnectionInterface>
34 
35 using namespace Akonadi;
36 using namespace Akonadi::Server;
37 
38 Q_DECLARE_METATYPE(Akonadi::Server::ItemRetrievalResult)
39 
40 class ItemRetrievalJobFactory : public AbstractItemRetrievalJobFactory
41 {
42  AbstractItemRetrievalJob *retrievalJob(ItemRetrievalRequest request, QObject *parent) override {
43  return new ItemRetrievalJob(std::move(request), parent);
44  }
45 };
46 
47 ItemRetrievalManager::ItemRetrievalManager(QObject *parent)
48  : ItemRetrievalManager(std::make_unique<ItemRetrievalJobFactory>(), parent)
49 {
50 }
51 
52 ItemRetrievalManager::ItemRetrievalManager(std::unique_ptr<AbstractItemRetrievalJobFactory> factory, QObject *parent)
53  : AkThread(QStringLiteral("ItemRetrievalManager"), QThread::HighPriority, parent)
54  , mJobFactory(std::move(factory))
55 {
56  qRegisterMetaType<ItemRetrievalResult>("Akonadi::Server::ItemRetrievalResult");
57  qDBusRegisterMetaType<QByteArrayList>();
58 }
59 
60 ItemRetrievalManager::~ItemRetrievalManager()
61 {
62  quitThread();
63 }
64 
65 void ItemRetrievalManager::init()
66 {
67  AkThread::init();
68 
71  this, &ItemRetrievalManager::serviceOwnerChanged);
72  connect(this, &ItemRetrievalManager::requestAdded,
73  this, &ItemRetrievalManager::processRequest,
74  Qt::QueuedConnection);
75 }
76 
77 // called within the retrieval thread
78 void ItemRetrievalManager::serviceOwnerChanged(const QString &serviceName, const QString &oldOwner, const QString &newOwner)
79 {
80  Q_UNUSED(newOwner);
81  if (oldOwner.isEmpty()) {
82  return;
83  }
84  const auto service = DBus::parseAgentServiceName(serviceName);
85  if (!service.has_value() || service->agentType != DBus::Resource) {
86  return;
87  }
88  qCDebug(AKONADISERVER_LOG) << "ItemRetrievalManager lost connection to resource" << serviceName << ", discarding cached interface";
89  mResourceInterfaces.erase(service->identifier);
90 }
91 
92 // called within the retrieval thread
93 org::freedesktop::Akonadi::Resource *ItemRetrievalManager::resourceInterface(const QString &id)
94 {
95  if (id.isEmpty()) {
96  return nullptr;
97  }
98 
99  auto ifaceIt = mResourceInterfaces.find(id);
100  if (ifaceIt != mResourceInterfaces.cend() && ifaceIt->second->isValid()) {
101  return ifaceIt->second.get();
102  }
103 
104  auto iface = std::make_unique<org::freedesktop::Akonadi::Resource>(
105  DBus::agentServiceName(id, DBus::Resource), QStringLiteral("/"), QDBusConnection::sessionBus());
106  if (!iface->isValid()) {
107  qCCritical(AKONADISERVER_LOG, "Cannot connect to agent instance with identifier '%s', error message: '%s'",
108  qUtf8Printable(id), qUtf8Printable(iface ? iface->lastError().message() : QString()));
109  return nullptr;
110  }
111  // DBus calls can take some time to reply -- e.g. if a huge local mbox has to be parsed first.
112  iface->setTimeout(5 * 60 * 1000); // 5 minutes, rather than 25 seconds
113  std::tie(ifaceIt, std::ignore) = mResourceInterfaces.emplace(id, std::move(iface));
114  return ifaceIt->second.get();
115 }
116 
117 // called from any thread
119 {
120  QWriteLocker locker(&mLock);
121  qCDebug(AKONADISERVER_LOG) << "ItemRetrievalManager posting retrieval request for items" << req.ids
122  << "to" <<req.resourceId << ". There are" << mPendingRequests.size() << "request queues and"
123  << mPendingRequests[req.resourceId].size() << "items mine";
124  mPendingRequests[req.resourceId].emplace_back(std::move(req));
125  locker.unlock();
126 
127  Q_EMIT requestAdded();
128 }
129 
130 QVector<AbstractItemRetrievalJob *> ItemRetrievalManager::scheduleJobsForIdleResourcesLocked()
131 {
133  for (auto it = mPendingRequests.begin(); it != mPendingRequests.end();) {
134  if (it->second.empty()) {
135  it = mPendingRequests.erase(it);
136  continue;
137  }
138 
139  if (!mCurrentJobs.contains(it->first) || mCurrentJobs.value(it->first) == nullptr) {
140  // TODO: check if there is another one for the same uid with more parts requested
141  auto req = std::move(it->second.front());
142  it->second.pop_front();
143  Q_ASSERT(req.resourceId == it->first);
144  auto job = mJobFactory->retrievalJob(std::move(req), this);
145  connect(job, &AbstractItemRetrievalJob::requestCompleted, this, &ItemRetrievalManager::retrievalJobFinished);
146  mCurrentJobs.insert(job->request().resourceId, job);
147  // delay job execution until after we unlocked the mutex, since the job can emit the finished signal immediately in some cases
148  newJobs.append(job);
149  qCDebug(AKONADISERVER_LOG) << "ItemRetrievalJob" << job << "started for request" << job->request().id;
150  }
151  ++it;
152  }
153 
154  return newJobs;
155 }
156 
157 // called within the retrieval thread
158 void ItemRetrievalManager::processRequest()
159 {
160  QWriteLocker locker(&mLock);
161  // look for idle resources
162  auto newJobs = scheduleJobsForIdleResourcesLocked();
163  // someone asked as to process requests although everything is done already, he might still be waiting
164  if (mPendingRequests.empty() && mCurrentJobs.isEmpty() && newJobs.isEmpty()) {
165  return;
166  }
167  locker.unlock();
168 
169  // Start the jobs
170  for (auto *job : newJobs) {
171  if (ItemRetrievalJob *j = qobject_cast<ItemRetrievalJob *>(job)) {
172  j->setInterface(resourceInterface(j->request().resourceId));
173  }
174  job->start();
175  }
176 }
177 
178 namespace {
179 
180 bool isSubsetOf(const QByteArrayList &superset, const QByteArrayList &subset)
181 {
182  // For very small lists like these, this is faster than copy, sort and std::include
183  return std::all_of(subset.cbegin(), subset.cend(),
184  [&superset](const auto &val) { return superset.contains(val); });
185 }
186 
187 }
188 
189 void ItemRetrievalManager::retrievalJobFinished(AbstractItemRetrievalJob *job)
190 {
191  const auto &request = job->request();
192  const auto &result = job->result();
193 
194  if (result.errorMsg.has_value()) {
195  qCWarning(AKONADISERVER_LOG) << "ItemRetrievalJob for request" << request.id << "finished with error:" << *result.errorMsg;
196  } else {
197  qCInfo(AKONADISERVER_LOG) << "ItemRetrievalJob for request" << request.id << "finished";
198  }
199 
200  QWriteLocker locker(&mLock);
201  Q_ASSERT(mCurrentJobs.contains(request.resourceId));
202  mCurrentJobs.remove(request.resourceId);
203  // Check if there are any pending requests that are satisfied by this retrieval job
204  auto &requests = mPendingRequests[request.resourceId];
205  for (auto it = requests.begin(); it != requests.end();) {
206  // TODO: also complete requests that are subset of the completed one
207  if (it->ids == request.ids && isSubsetOf(request.parts, it->parts)) {
208  qCDebug(AKONADISERVER_LOG) << "Someone else requested items " << request.ids << "as well, marking as processed.";
209  ItemRetrievalResult otherResult{std::move(*it)};
210  otherResult.errorMsg = result.errorMsg;
211  Q_EMIT requestFinished(otherResult);
212  it = requests.erase(it);
213  } else {
214  ++it;
215  }
216  }
217  locker.unlock();
218 
219  Q_EMIT requestFinished(result);
220  Q_EMIT requestAdded(); // trigger processRequest() again, in case there is more in the queues
221 }
222 
223 // Can be called from any thread
224 void ItemRetrievalManager::triggerCollectionSync(const QString &resource, qint64 colId)
225 {
226  QTimer::singleShot(0, this, [this, resource, colId]() {
227  if (auto *interface = resourceInterface(resource)) {
228  interface->synchronizeCollection(colId);
229  }
230  });
231 }
232 
233 void ItemRetrievalManager::triggerCollectionTreeSync(const QString &resource)
234 {
235  QTimer::singleShot(0, this, [this, resource]() {
236  if (auto *interface = resourceInterface(resource)) {
237  interface->synchronizeCollectionTree();
238  }
239  });
240 }
void append(const T &value)
QDBusConnectionInterface * interface() const const
int size() const const
QVector::iterator erase(QVector::iterator begin, QVector::iterator end)
QDBusConnection sessionBus()
A glue between Qt and the standard library.
Manages and processes item retrieval requests.
bool isEmpty() const const
virtual void requestItemDelivery(ItemRetrievalRequest request)
Added for convenience.
bool contains(const T &value) const const
Details of a single item retrieval request.
QList::const_iterator cend() const const
Definition: item.h:44
QList::const_iterator cbegin() const const
Helper integration between Akonadi and Qt.
void serviceOwnerChanged(const QString &name, const QString &oldOwner, const QString &newOwner)
Async D-Bus retrieval, no modification of the request (thus no need for locking)
KIOCORE_EXPORT CopyJob * move(const QUrl &src, const QUrl &dest, JobFlags flags=DefaultFlags)
QMetaObject::Connection connect(const QObject *sender, const char *signal, const QObject *receiver, const char *method, Qt::ConnectionType type)
QObject * parent() const const
Q_EMITQ_EMIT
This file is part of the KDE documentation.
Documentation copyright © 1996-2020 The KDE developers.
Generated on Fri Jun 5 2020 23:08:55 by doxygen 1.8.11 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.