8 #include "itemfetchhelper.h"
11 #include "connection.h"
13 #include "handlerhelper.h"
14 #include "shared/akranges.h"
15 #include "storage/itemqueryhelper.h"
16 #include "storage/itemretrievalmanager.h"
17 #include "storage/itemretrievalrequest.h"
18 #include "storage/parthelper.h"
19 #include "storage/parttypehelper.h"
20 #include "storage/selectquerybuilder.h"
21 #include "storage/transaction.h"
23 #include "agentmanagerinterface.h"
24 #include "akonadiserver_debug.h"
25 #include "intervalcheck.h"
26 #include "relationfetchhandler.h"
27 #include "tagfetchhelper.h"
30 #include <private/dbus_p.h>
34 #include <QStringList>
37 #include <QElapsedTimer>
40 using namespace Akonadi::Server;
41 using namespace AkRanges;
43 #define ENABLE_FETCH_PROFILING 0
44 #if ENABLE_FETCH_PROFILING
45 #define BEGIN_TIMER(name) \
46 QElapsedTimer name##Timer; \
49 #define END_TIMER(name) const double name##Elapsed = name##Timer.nsecsElapsed() / 1000000.0;
50 #define PROF_INC(name) ++name;
52 #define BEGIN_TIMER(name)
53 #define END_TIMER(name)
54 #define PROF_INC(name)
57 ItemFetchHelper::ItemFetchHelper(
Connection *connection,
59 const Protocol::ItemFetchScope &itemFetchScope,
60 const Protocol::TagFetchScope &tagFetchScope,
61 AkonadiServer &akonadi,
62 const Protocol::FetchLimit &itemsLimit)
63 : ItemFetchHelper(connection, connection->context(), scope, itemFetchScope, tagFetchScope, akonadi, itemsLimit)
67 ItemFetchHelper::ItemFetchHelper(
Connection *connection,
68 const CommandContext &context,
70 const Protocol::ItemFetchScope &itemFetchScope,
71 const Protocol::TagFetchScope &tagFetchScope,
72 AkonadiServer &akonadi,
73 const Protocol::FetchLimit &itemsLimit)
74 : mConnection(connection)
77 , mItemFetchScope(itemFetchScope)
78 , mTagFetchScope(tagFetchScope)
80 , mItemsLimit(itemsLimit)
81 , mItemQuery(PimItem::tableName())
84 std::fill(mItemQueryColumnMap, mItemQueryColumnMap + ItemQueryColumnCount, -1);
87 void ItemFetchHelper::disableATimeUpdates()
89 mUpdateATimeEnabled =
false;
92 enum PartQueryColumns {
94 PartQueryTypeIdColumn,
96 PartQueryStorageColumn,
97 PartQueryVersionColumn,
98 PartQueryDataSizeColumn
105 if (mItemsLimit.limit() > 0) {
106 partQuery =
QueryBuilder(mItemQuery.query(), mPimItemQueryAlias);
109 if (!partList.
isEmpty() || allPayload || allAttrs) {
110 partQuery.addJoin(
QueryBuilder::InnerJoin, Part::tableName(), partQuery.getTableWithColumn(PimItem::idColumn()), Part::pimItemIdFullColumnName());
111 partQuery.addColumn(partQuery.getTableWithColumn(PimItem::idColumn()));
112 partQuery.addColumn(Part::partTypeIdFullColumnName());
113 partQuery.addColumn(Part::dataFullColumnName());
114 partQuery.addColumn(Part::storageFullColumnName());
115 partQuery.addColumn(Part::versionFullColumnName());
116 partQuery.addColumn(Part::datasizeFullColumnName());
118 partQuery.addSortColumn(partQuery.getTableWithColumn(PimItem::idColumn()), Query::Descending);
120 if (!partList.
isEmpty() || allPayload || allAttrs) {
122 for (
const QByteArray &b : std::as_const(partList)) {
123 if (b.startsWith(
"PLD") || b.startsWith(
"ATR")) {
127 if (allPayload || allAttrs) {
128 partQuery.addJoin(
QueryBuilder::InnerJoin, PartType::tableName(), Part::partTypeIdFullColumnName(), PartType::idFullColumnName());
130 cond.addValueCondition(PartType::nsFullColumnName(), Query::Equals, QStringLiteral(
"PLD"));
133 cond.addValueCondition(PartType::nsFullColumnName(), Query::Equals, QStringLiteral(
"ATR"));
137 partQuery.addCondition(cond);
142 if (!partQuery.exec()) {
143 throw HandlerException(
"Unable to list item parts");
145 partQuery.query().next();
148 return partQuery.query();
151 QSqlQuery ItemFetchHelper::buildItemQuery()
154 #define ADD_COLUMN(colName, colId) \
156 mItemQuery.addColumn(colName); \
157 mItemQueryColumnMap[colId] = column++; \
159 ADD_COLUMN(PimItem::idFullColumnName(), ItemQueryPimItemIdColumn);
160 if (mItemFetchScope.fetchRemoteId()) {
161 ADD_COLUMN(PimItem::remoteIdFullColumnName(), ItemQueryPimItemRidColumn)
163 ADD_COLUMN(PimItem::mimeTypeIdFullColumnName(), ItemQueryMimeTypeIdColumn)
164 ADD_COLUMN(PimItem::revFullColumnName(), ItemQueryRevColumn)
165 if (mItemFetchScope.fetchRemoteRevision()) {
166 ADD_COLUMN(PimItem::remoteRevisionFullColumnName(), ItemQueryRemoteRevisionColumn)
168 if (mItemFetchScope.fetchSize()) {
169 ADD_COLUMN(PimItem::sizeFullColumnName(), ItemQuerySizeColumn)
171 if (mItemFetchScope.fetchMTime()) {
172 ADD_COLUMN(PimItem::datetimeFullColumnName(), ItemQueryDatetimeColumn)
174 ADD_COLUMN(PimItem::collectionIdFullColumnName(), ItemQueryCollectionIdColumn)
175 if (mItemFetchScope.fetchGID()) {
176 ADD_COLUMN(PimItem::gidFullColumnName(), ItemQueryPimItemGidColumn)
180 mItemQuery.addSortColumn(PimItem::idFullColumnName(),
static_cast<Query::SortOrder>(mItemsLimit.sortOrder()));
181 if (mItemsLimit.limit() > 0) {
182 mItemQuery.setLimit(mItemsLimit.limit(), mItemsLimit.limitOffset());
187 if (mItemFetchScope.changedSince().isValid()) {
188 mItemQuery.addValueCondition(PimItem::datetimeFullColumnName(), Query::GreaterOrEqual, mItemFetchScope.changedSince().toUTC());
191 if (!mItemQuery.exec()) {
192 throw HandlerException(
"Unable to list items");
195 mItemQuery.query().next();
197 return mItemQuery.query();
200 enum FlagQueryColumns {
201 FlagQueryPimItemIdColumn,
202 FlagQueryFlagIdColumn,
205 QSqlQuery ItemFetchHelper::buildFlagQuery()
208 if (mItemsLimit.limit() > 0) {
209 flagQuery =
QueryBuilder(mItemQuery.query(), mPimItemQueryAlias);
212 flagQuery.addJoin(
QueryBuilder::InnerJoin, PimItemFlagRelation::tableName(), flagQuery.getTableWithColumn(PimItem::idColumn()), PimItemFlagRelation::leftFullColumnName());
214 flagQuery.addColumn(flagQuery.getTableWithColumn(PimItem::idColumn()));
215 flagQuery.addColumn(PimItemFlagRelation::rightFullColumnName());
218 flagQuery.addSortColumn(flagQuery.getTableWithColumn(PimItem::idColumn()), Query::Descending);
220 if (!flagQuery.exec()) {
221 throw HandlerException(
"Unable to retrieve item flags");
224 flagQuery.query().next();
226 return flagQuery.query();
229 enum TagQueryColumns {
230 TagQueryItemIdColumn,
234 QSqlQuery ItemFetchHelper::buildTagQuery()
237 if (mItemsLimit.limit() > 0) {
238 tagQuery =
QueryBuilder(mItemQuery.query(), mPimItemQueryAlias);
241 tagQuery.addJoin(
QueryBuilder::InnerJoin, PimItemTagRelation::tableName(), tagQuery.getTableWithColumn(PimItem::idColumn()), PimItemTagRelation::leftFullColumnName());
242 tagQuery.addJoin(
QueryBuilder::InnerJoin, Tag::tableName(), Tag::idFullColumnName(), PimItemTagRelation::rightFullColumnName());
243 tagQuery.addColumn(tagQuery.getTableWithColumn(PimItem::idColumn()));
244 tagQuery.addColumn(Tag::idFullColumnName());
247 tagQuery.addSortColumn(tagQuery.getTableWithColumn(PimItem::idColumn()), Query::Descending);
249 if (!tagQuery.exec()) {
250 throw HandlerException(
"Unable to retrieve item tags");
253 tagQuery.query().next();
255 return tagQuery.query();
258 enum VRefQueryColumns {
259 VRefQueryCollectionIdColumn,
260 VRefQueryItemIdColumn,
263 QSqlQuery ItemFetchHelper::buildVRefQuery()
266 if(mItemsLimit.limit() > 0) {
267 vRefQuery =
QueryBuilder(mItemQuery.query(), mPimItemQueryAlias);
271 CollectionPimItemRelation::tableName(),
272 CollectionPimItemRelation::rightFullColumnName(),
273 vRefQuery.getTableWithColumn(PimItem::idColumn()));
274 vRefQuery.addColumn(CollectionPimItemRelation::leftFullColumnName());
275 vRefQuery.addColumn(CollectionPimItemRelation::rightFullColumnName());
277 vRefQuery.addSortColumn(vRefQuery.getTableWithColumn(PimItem::idColumn()), Query::Descending);
279 if (!vRefQuery.exec()) {
280 throw HandlerException(
"Unable to retrieve virtual references");
283 vRefQuery.query().next();
285 return vRefQuery.query();
288 bool ItemFetchHelper::isScopeLocal(
const Scope &scope)
291 if (!mConnection->sessionId().startsWith(
"akonadi_indexing_agent")) {
296 QueryBuilder qb(PimItem::tableName(), QueryBuilder::Select);
297 qb.setDistinct(
true);
298 qb.addColumn(Resource::nameFullColumnName());
299 qb.addJoin(
QueryBuilder::LeftJoin, Collection::tableName(), PimItem::collectionIdFullColumnName(), Collection::idFullColumnName());
300 qb.addJoin(
QueryBuilder::LeftJoin, Resource::tableName(), Collection::resourceIdFullColumnName(), Resource::idFullColumnName());
302 if (mContext.resource().isValid()) {
303 qb.addValueCondition(Resource::nameFullColumnName(), Query::NotEquals, mContext.resource().name());
307 throw HandlerException(
"Failed to query database");
324 org::freedesktop::Akonadi::AgentManager manager(DBus::serviceName(DBus::Control), QStringLiteral(
"/AgentManager"),
QDBusConnection::sessionBus());
325 const QString typeIdentifier = manager.agentInstanceType(resourceName);
327 return properties.value(QStringLiteral(
"HasLocalStorage"),
false).toBool();
330 DataStore *ItemFetchHelper::storageBackend()
const
333 if (
auto store = mConnection->storageBackend()) {
341 bool ItemFetchHelper::fetchItems(std::function<
void(Protocol::FetchItemsResponse &&)> &&itemCallback)
352 BEGIN_TIMER(itemRetriever)
353 BEGIN_TIMER(scopeLocal)
354 #if ENABLE_FETCH_PROFILING
355 double scopeLocalElapsed = 0;
357 if (!mItemFetchScope.cacheOnly() || isScopeLocal(mScope)) {
358 #if ENABLE_FETCH_PROFILING
359 scopeLocalElapsed = scopeLocalTimer.elapsed();
363 triggerOnDemandFetch();
367 ItemRetriever retriever(mAkonadi.itemRetrievalManager(), mConnection, mContext);
368 retriever.setScope(mScope);
369 retriever.setRetrieveParts(mItemFetchScope.requestedPayloads());
370 retriever.setRetrieveFullPayload(mItemFetchScope.fullPayload());
371 retriever.setChangedSince(mItemFetchScope.changedSince());
372 if (!retriever.exec() && !mItemFetchScope.ignoreErrors()) {
373 if (mContext.resource().isValid()) {
374 throw HandlerException(QStringLiteral(
"Unable to fetch item from backend (collection %1, resource %2) : %3")
375 .arg(mContext.collectionId())
376 .arg(mContext.resource().id())
379 throw HandlerException(QStringLiteral(
"Unable to fetch item from backend (collection %1) : %2")
380 .arg(mContext.collectionId())
385 END_TIMER(itemRetriever)
394 if (mItemFetchScope.ignoreErrors()) {
397 switch (mScope.scope()) {
400 case Scope::HierarchicalRid:
402 throw HandlerException(
"Item query returned empty result set");
410 QSqlQuery partQuery(storageBackend()->database());
411 if (!mItemFetchScope.requestedParts().isEmpty() || mItemFetchScope.fullPayload() || mItemFetchScope.allAttributes()) {
412 partQuery = buildPartQuery(mItemFetchScope.requestedParts(), mItemFetchScope.fullPayload(), mItemFetchScope.allAttributes());
418 QSqlQuery flagQuery(storageBackend()->database());
419 if (mItemFetchScope.fetchFlags()) {
420 flagQuery = buildFlagQuery();
426 QSqlQuery tagQuery(storageBackend()->database());
427 if (mItemFetchScope.fetchTags()) {
428 tagQuery = buildTagQuery();
433 QSqlQuery vRefQuery(storageBackend()->database());
434 if (mItemFetchScope.fetchVirtualReferences()) {
435 vRefQuery = buildVRefQuery();
439 #if ENABLE_FETCH_PROFILING
447 BEGIN_TIMER(processing)
454 const qint64 pimItemId = extractQueryResult(itemQuery, ItemQueryPimItemIdColumn).toLongLong();
455 const int pimItemRev = extractQueryResult(itemQuery, ItemQueryRevColumn).toInt();
457 Protocol::FetchItemsResponse response;
458 response.setId(pimItemId);
459 response.setRevision(pimItemRev);
460 const qint64 mimeTypeId = extractQueryResult(itemQuery, ItemQueryMimeTypeIdColumn).toLongLong();
461 auto mtIter = mimeTypeIdNameCache.
find(mimeTypeId);
462 if (mtIter == mimeTypeIdNameCache.
end()) {
463 mtIter = mimeTypeIdNameCache.
insert(mimeTypeId, MimeType::retrieveById(mimeTypeId).
name());
465 response.setMimeType(mtIter.value());
466 if (mItemFetchScope.fetchRemoteId()) {
467 response.setRemoteId(extractQueryResult(itemQuery, ItemQueryPimItemRidColumn).
toString());
469 response.setParentId(extractQueryResult(itemQuery, ItemQueryCollectionIdColumn).toLongLong());
471 if (mItemFetchScope.fetchSize()) {
472 response.setSize(extractQueryResult(itemQuery, ItemQuerySizeColumn).toLongLong());
474 if (mItemFetchScope.fetchMTime()) {
475 response.setMTime(Utils::variantToDateTime(extractQueryResult(itemQuery, ItemQueryDatetimeColumn)));
477 if (mItemFetchScope.fetchRemoteRevision()) {
478 response.setRemoteRevision(extractQueryResult(itemQuery, ItemQueryRemoteRevisionColumn).
toString());
480 if (mItemFetchScope.fetchGID()) {
481 response.setGid(extractQueryResult(itemQuery, ItemQueryPimItemGidColumn).
toString());
484 if (mItemFetchScope.fetchFlags()) {
486 while (flagQuery.isValid()) {
487 const qint64
id = flagQuery.
value(FlagQueryPimItemIdColumn).toLongLong();
488 if (
id > pimItemId) {
491 }
else if (
id < pimItemId) {
494 const qint64 flagId = flagQuery.value(FlagQueryFlagIdColumn).toLongLong();
495 auto flagNameIter = flagIdNameCache.
find(flagId);
496 if (flagNameIter == flagIdNameCache.
end()) {
497 flagNameIter = flagIdNameCache.
insert(flagId, Flag::retrieveById(flagId).
name().toUtf8());
499 flags << flagNameIter.
value();
502 response.setFlags(flags);
505 if (mItemFetchScope.fetchTags()) {
508 while (tagQuery.isValid()) {
510 const qint64
id = tagQuery.value(TagQueryItemIdColumn).toLongLong();
511 if (
id > pimItemId) {
514 }
else if (
id < pimItemId) {
517 tagIds << tagQuery.
value(TagQueryTagIdColumn).toLongLong();
521 if (mTagFetchScope.fetchIdOnly()) {
522 tags = tagIds | Views::transform([](
const auto tagId) {
523 Protocol::FetchTagsResponse resp;
527 | Actions::toQVector;
529 tags = tagIds | Views::transform([
this](
const auto tagId) {
530 return HandlerHelper::fetchTagsResponse(Tag::retrieveById(tagId), mTagFetchScope, mConnection);
532 | Actions::toQVector;
534 response.setTags(tags);
537 if (mItemFetchScope.fetchVirtualReferences()) {
539 while (vRefQuery.isValid()) {
541 const qint64
id = vRefQuery.value(VRefQueryItemIdColumn).toLongLong();
542 if (
id > pimItemId) {
545 }
else if (
id < pimItemId) {
548 vRefs << vRefQuery.
value(VRefQueryCollectionIdColumn).toLongLong();
551 response.setVirtualReferences(vRefs);
554 if (mItemFetchScope.fetchRelations()) {
558 condition.
addValueCondition(Relation::leftIdFullColumnName(), Query::Equals, pimItemId);
559 condition.
addValueCondition(Relation::rightIdFullColumnName(), Query::Equals, pimItemId);
562 << Relation::remoteIdColumn());
564 throw HandlerException(
"Unable to list item relations");
567 const auto result = qb.
result();
568 relations.
reserve(result.size());
569 for (
const Relation &rel : result) {
570 relations.
push_back(HandlerHelper::fetchRelationsResponse(rel));
573 response.setRelations(relations);
576 if (mItemFetchScope.ancestorDepth() != Protocol::ItemFetchScope::NoAncestor) {
577 response.setAncestors(ancestorsForItem(response.parentId()));
580 bool skipItem =
false;
584 while (partQuery.isValid()) {
586 const qint64
id = partQuery.value(PartQueryPimIdColumn).toLongLong();
587 if (
id > pimItemId) {
590 }
else if (
id < pimItemId) {
594 const qint64 partTypeId = partQuery.value(PartQueryTypeIdColumn).toLongLong();
595 auto ptIter = partTypeIdNameCache.
find(partTypeId);
596 if (ptIter == partTypeIdNameCache.
end()) {
599 Protocol::PartMetaData metaPart;
600 Protocol::StreamPayloadResponse partData;
601 partData.setPayloadName(ptIter.value());
602 metaPart.setName(ptIter.value());
603 metaPart.setVersion(partQuery.value(PartQueryVersionColumn).toInt());
604 metaPart.setSize(partQuery.value(PartQueryDataSizeColumn).toLongLong());
606 const QByteArray data = Utils::variantToByteArray(partQuery.value(PartQueryDataColumn));
607 if (mItemFetchScope.checkCachedPayloadPartsOnly()) {
609 cachedParts << ptIter.
value();
613 if (mItemFetchScope.ignoreErrors() && data.
isEmpty()) {
616 qCDebug(AKONADISERVER_LOG) <<
"item" <<
id <<
"has an empty payload part in parttable for part" << metaPart.name();
620 metaPart.setStorageType(
static_cast<Protocol::PartMetaData::StorageType
>(partQuery.value(PartQueryStorageColumn).toInt()));
624 partData.setData(data);
626 partData.setMetaData(metaPart);
628 if (mItemFetchScope.requestedParts().contains(ptIter.value()) || mItemFetchScope.fullPayload() || mItemFetchScope.allAttributes()) {
635 response.setParts(parts);
642 if (mItemFetchScope.checkCachedPayloadPartsOnly()) {
643 response.setCachedParts(cachedParts);
647 itemCallback(std::move(response));
649 mConnection->sendResponse(std::move(response));
659 END_TIMER(processing)
663 if (mUpdateATimeEnabled && (needsAccessTimeUpdate(mItemFetchScope.requestedParts()) || mItemFetchScope.fullPayload())) {
664 updateItemAccessTime();
669 #if ENABLE_FETCH_PROFILING
670 qCDebug(AKONADISERVER_LOG) <<
"ItemFetchHelper execution stats:";
671 qCDebug(AKONADISERVER_LOG) <<
"\tItems query:" << itemsElapsed <<
"ms," << itemsCount <<
" items in total";
672 qCDebug(AKONADISERVER_LOG) <<
"\tFlags query:" << flagsElapsed <<
"ms, " << flagsCount <<
" flags in total";
673 qCDebug(AKONADISERVER_LOG) <<
"\tParts query:" << partsElapsed <<
"ms, " << partsCount <<
" parts in total";
674 qCDebug(AKONADISERVER_LOG) <<
"\tTags query: " << tagsElapsed <<
"ms, " << tagsCount <<
" tags in total";
675 qCDebug(AKONADISERVER_LOG) <<
"\tVRefs query:" << vRefsElapsed <<
"ms, " << vRefsCount <<
" vRefs in total";
676 qCDebug(AKONADISERVER_LOG) <<
"\t------------";
677 qCDebug(AKONADISERVER_LOG) <<
"\tItem retriever:" << itemRetrieverElapsed <<
"ms (scope local:" << scopeLocalElapsed <<
"ms)";
678 qCDebug(AKONADISERVER_LOG) <<
"\tTotal query:" << (itemsElapsed + flagsElapsed + partsElapsed + tagsElapsed + vRefsElapsed) <<
"ms";
679 qCDebug(AKONADISERVER_LOG) <<
"\tTotal processing: " << processingElapsed <<
"ms";
680 qCDebug(AKONADISERVER_LOG) <<
"\tATime update:" << aTimeElapsed <<
"ms";
681 qCDebug(AKONADISERVER_LOG) <<
"\t============";
682 qCDebug(AKONADISERVER_LOG) <<
"\tTotal FETCH:" << fetchElapsed <<
"ms";
683 qCDebug(AKONADISERVER_LOG);
684 qCDebug(AKONADISERVER_LOG);
696 return parts.
contains(AKONADI_PARAM_PLD_RFC822);
699 void ItemFetchHelper::updateItemAccessTime()
701 Transaction transaction(storageBackend(), QStringLiteral(
"update atime"));
702 QueryBuilder qb(PimItem::tableName(), QueryBuilder::Update);
707 qCWarning(AKONADISERVER_LOG) <<
"Unable to update item access time";
709 transaction.commit();
713 void ItemFetchHelper::triggerOnDemandFetch()
715 if (mContext.collectionId() <= 0 || mItemFetchScope.cacheOnly()) {
719 Collection collection = mContext.collection();
722 if (mConnection->sessionId() == collection.resource().name().
toLatin1()) {
726 storageBackend()->activeCachePolicy(collection);
727 if (!collection.cachePolicySyncOnDemand()) {
731 mConnection->akonadi().intervalChecker().requestCollectionSync(collection);
736 if (mItemFetchScope.ancestorDepth() == Protocol::ItemFetchScope::NoAncestor || parentColId == 0) {
739 const auto it = mAncestorCache.constFind(parentColId);
740 if (it != mAncestorCache.cend()) {
745 Collection col = Collection::retrieveById(parentColId);
746 const int depthNum = mItemFetchScope.ancestorDepth() == Protocol::ItemFetchScope::ParentAncestor ? 1 : INT_MAX;
747 for (
int i = 0; i < depthNum; ++i) {
748 if (!col.isValid()) {
749 Protocol::Ancestor ancestor;
751 ancestors << ancestor;
754 Protocol::Ancestor ancestor;
755 ancestor.setId(col.id());
756 ancestor.setRemoteId(col.
remoteId());
757 ancestors << ancestor;
760 mAncestorCache.insert(parentColId, ancestors);
764 QVariant ItemFetchHelper::extractQueryResult(
const QSqlQuery &query, ItemFetchHelper::ItemQueryColumns column)
const
766 const int colId = mItemQueryColumnMap[column];
767 Q_ASSERT(colId >= 0);