7 #include "itemcreatehandler.h"
10 #include "connection.h"
11 #include "handlerhelper.h"
12 #include "itemfetchhelper.h"
13 #include "preprocessormanager.h"
14 #include "storage/datastore.h"
15 #include "storage/dbconfig.h"
16 #include "storage/itemretrievalmanager.h"
17 #include "storage/parthelper.h"
18 #include "storage/partstreamer.h"
19 #include "storage/parttypehelper.h"
20 #include "storage/selectquerybuilder.h"
21 #include "storage/transaction.h"
22 #include <private/externalpartstorage_p.h>
24 #include "shared/akranges.h"
25 #include "shared/akscopeguard.h"
30 using namespace Akonadi::Server;
31 using namespace AkRanges;
33 ItemCreateHandler::ItemCreateHandler(AkonadiServer &akonadi)
38 bool ItemCreateHandler::buildPimItem(
const Protocol::CreateItemCommand &cmd, PimItem &item,
Collection &parentCol)
40 parentCol = HandlerHelper::collectionFromScope(cmd.collection(), connection()->context());
41 if (!parentCol.isValid()) {
42 return failureResponse(QStringLiteral(
"Invalid parent collection"));
44 if (parentCol.isVirtual()) {
45 return failureResponse(QStringLiteral(
"Cannot append item into virtual collection"));
48 MimeType
mimeType = MimeType::retrieveByNameOrCreate(cmd.mimeType());
50 return failureResponse(QStringLiteral(
"Unable to create mimetype '") % cmd.mimeType() % QStringLiteral(
"'."));
54 item.setSize(cmd.itemSize());
56 item.setCollectionId(parentCol.id());
57 item.setDatetime(cmd.dateTime());
58 if (cmd.remoteId().isEmpty()) {
63 item.setRemoteId(cmd.remoteId());
66 item.setRemoteRevision(cmd.remoteRevision());
67 item.setGid(cmd.gid());
73 bool ItemCreateHandler::insertItem(
const Protocol::CreateItemCommand &cmd, PimItem &item,
const Collection &parentCol)
75 if (!item.datetime().isValid()) {
80 return failureResponse(QStringLiteral(
"Failed to append item"));
84 const QSet<QByteArray> flags = cmd.mergeModes() == Protocol::CreateItemCommand::None ? cmd.flags() : cmd.addedFlags();
87 const Flag::List flagList = HandlerHelper::resolveFlags(flags);
88 bool flagsChanged =
false;
89 if (!storageBackend()->appendItemsFlags({item}, flagList, &flagsChanged,
false, parentCol,
true)) {
90 return failureResponse(
"Unable to append item flags.");
94 const Scope tags = cmd.mergeModes() == Protocol::CreateItemCommand::None ? cmd.tags() : cmd.addedTags();
95 if (!tags.isEmpty()) {
96 const Tag::List tagList = HandlerHelper::tagsFromScope(tags, connection()->context());
97 bool tagsChanged =
false;
98 if (!storageBackend()->appendItemsTags({item}, tagList, &tagsChanged,
false, parentCol,
true)) {
99 return failureResponse(QStringLiteral(
"Unable to append item tags."));
104 qint64 partSizes = 0;
105 PartStreamer streamer(connection(), item);
106 const auto parts = cmd.parts();
110 streamer.stream(
true, partName, partSize);
111 }
catch (
const PartStreamerException &e) {
112 return failureResponse(e.what());
114 partSizes += partSize;
116 const Protocol::Attributes attrs = cmd.attributes();
117 for (
auto iter = attrs.cbegin(), end = attrs.cend(); iter != end; ++iter) {
119 streamer.streamAttribute(
true, iter.key(), iter.value());
120 }
catch (
const PartStreamerException &e) {
121 return failureResponse(e.what());
126 if (partSizes > item.size()) {
127 item.setSize(partSizes);
132 if (akonadi().preprocessorManager().isActive()) {
133 Part hiddenAttribute;
134 hiddenAttribute.setPimItemId(item.id());
135 hiddenAttribute.setPartType(PartTypeHelper::fromFqName(QStringLiteral(AKONADI_ATTRIBUTE_HIDDEN)));
137 hiddenAttribute.setDatasize(0);
139 PartHelper::insert(&hiddenAttribute);
142 const bool seen = flags.
contains(AKONADI_FLAG_SEEN) || flags.
contains(AKONADI_FLAG_IGNORED);
143 notify(item, seen, item.collection());
144 sendResponse(item, Protocol::CreateItemCommand::None);
149 bool ItemCreateHandler::mergeItem(
const Protocol::CreateItemCommand &cmd, PimItem &newItem, PimItem ¤tItem,
const Collection &parentCol)
151 bool needsUpdate =
false;
154 if (!newItem.remoteId().isEmpty() && currentItem.remoteId() != newItem.remoteId()) {
155 currentItem.setRemoteId(newItem.remoteId());
156 changedParts.
insert(AKONADI_PARAM_REMOTEID);
159 if (!newItem.remoteRevision().isEmpty() && currentItem.remoteRevision() != newItem.remoteRevision()) {
160 currentItem.setRemoteRevision(newItem.remoteRevision());
161 changedParts.
insert(AKONADI_PARAM_REMOTEREVISION);
164 if (!newItem.gid().isEmpty() && currentItem.gid() != newItem.gid()) {
165 currentItem.setGid(newItem.gid());
166 changedParts.
insert(AKONADI_PARAM_GID);
169 if (newItem.datetime().isValid() && newItem.datetime() != currentItem.datetime()) {
170 currentItem.setDatetime(newItem.datetime());
174 if (newItem.size() > 0 && newItem.size() != currentItem.size()) {
175 currentItem.setSize(newItem.size());
179 const Collection col = Collection::retrieveById(parentCol.id());
180 if (cmd.flags().isEmpty() && !cmd.flagsOverwritten()) {
181 bool flagsAdded =
false;
182 bool flagsRemoved =
false;
183 if (!cmd.addedFlags().isEmpty()) {
184 const auto addedFlags = HandlerHelper::resolveFlags(cmd.addedFlags());
185 storageBackend()->appendItemsFlags({currentItem}, addedFlags, &flagsAdded,
true, col,
true);
187 if (!cmd.removedFlags().isEmpty()) {
188 const auto removedFlags = HandlerHelper::resolveFlags(cmd.removedFlags());
189 storageBackend()->removeItemsFlags({currentItem}, removedFlags, &flagsRemoved, col,
true);
191 if (flagsAdded || flagsRemoved) {
192 changedParts.
insert(AKONADI_PARAM_FLAGS);
196 bool flagsChanged =
false;
199 static QList<QByteArray> localFlagsToPreserve = {
"$ATTACHMENT",
"$INVITATION",
"$ENCRYPTED",
"$SIGNED",
"$WATCHED"};
205 const Flag::List currentFlags = currentItem.flags();
206 for (
const Flag ¤tFlag : currentFlags) {
207 const QByteArray currentFlagName = currentFlag.name().toLatin1();
208 if (localFlagsToPreserve.
contains(currentFlagName)) {
209 flagNames.
insert(currentFlagName);
212 const auto flags = HandlerHelper::resolveFlags(flagNames);
213 storageBackend()->setItemsFlags({currentItem}, ¤tFlags, flags, &flagsChanged, col,
true);
215 changedParts.
insert(AKONADI_PARAM_FLAGS);
220 if (cmd.tags().isEmpty()) {
221 bool tagsAdded =
false;
222 bool tagsRemoved =
false;
223 if (!cmd.addedTags().isEmpty()) {
224 const auto addedTags = HandlerHelper::tagsFromScope(cmd.addedTags(), connection()->context());
225 storageBackend()->appendItemsTags({currentItem}, addedTags, &tagsAdded,
true, col,
true);
227 if (!cmd.removedTags().isEmpty()) {
228 const Tag::List removedTags = HandlerHelper::tagsFromScope(cmd.removedTags(), connection()->context());
229 storageBackend()->removeItemsTags({currentItem}, removedTags, &tagsRemoved,
true);
232 if (tagsAdded || tagsRemoved) {
233 changedParts.
insert(AKONADI_PARAM_TAGS);
237 bool tagsChanged =
false;
238 const auto tags = HandlerHelper::tagsFromScope(cmd.tags(), connection()->context());
239 storageBackend()->setItemsTags({currentItem}, tags, &tagsChanged,
true);
241 changedParts.
insert(AKONADI_PARAM_TAGS);
246 const Part::List existingParts = Part::retrieveFiltered(Part::pimItemIdColumn(), currentItem.id());
248 for (
const Part &part : existingParts) {
249 partsSizes.
insert(PartTypeHelper::fullName(part.partType()).toLatin1(), part.datasize());
252 PartStreamer streamer(connection(), currentItem);
253 const auto partNames = cmd.parts();
254 for (
const QByteArray &partName : partNames) {
255 bool changed =
false;
258 streamer.stream(
true, partName, partSize, &changed);
259 }
catch (
const PartStreamerException &e) {
260 return failureResponse(e.what());
264 changedParts.
insert(partName);
265 partsSizes.
insert(partName, partSize);
270 const qint64 size = std::accumulate(partsSizes.
begin(), partsSizes.
end(), 0LL);
271 if (size > currentItem.size()) {
272 currentItem.setSize(size);
277 currentItem.setRev(qMax(newItem.rev(), currentItem.rev()) + 1);
280 currentItem.setDirty(!connection()->context().resource().
isValid());
283 if (!currentItem.update()) {
284 return failureResponse(
"Failed to store merged item");
287 notify(currentItem, currentItem.collection(), changedParts);
290 sendResponse(currentItem, cmd.mergeModes());
295 bool ItemCreateHandler::sendResponse(
const PimItem &item, Protocol::CreateItemCommand::MergeModes mergeModes)
297 if (mergeModes & Protocol::CreateItemCommand::Silent || mergeModes & Protocol::CreateItemCommand::None) {
298 Protocol::FetchItemsResponse resp;
299 resp.setId(item.id());
300 resp.setMTime(item.datetime());
301 Handler::sendResponse(std::move(resp));
305 Protocol::ItemFetchScope fetchScope;
306 fetchScope.setAncestorDepth(Protocol::ItemFetchScope::ParentAncestor);
307 fetchScope.setFetch(Protocol::ItemFetchScope::AllAttributes | Protocol::ItemFetchScope::FullPayload | Protocol::ItemFetchScope::CacheOnly
308 | Protocol::ItemFetchScope::Flags | Protocol::ItemFetchScope::GID | Protocol::ItemFetchScope::MTime | Protocol::ItemFetchScope::RemoteID
309 | Protocol::ItemFetchScope::RemoteRevision | Protocol::ItemFetchScope::Size | Protocol::ItemFetchScope::Tags);
313 scope.setUidSet(set);
315 ItemFetchHelper fetchHelper(connection(), scope, fetchScope, Protocol::TagFetchScope{}, akonadi());
316 if (!fetchHelper.fetchItems()) {
317 return failureResponse(
"Failed to retrieve item");
323 bool ItemCreateHandler::notify(
const PimItem &item,
bool seen,
const Collection &collection)
325 storageBackend()->notificationCollector()->itemAdded(item, seen, collection);
327 if (akonadi().preprocessorManager().isActive()) {
329 akonadi().preprocessorManager().beginHandleItem(item, storageBackend());
337 storageBackend()->notificationCollector()->itemChanged(item, changedParts, collection);
342 void ItemCreateHandler::recoverFromMultipleMergeCandidates(
const PimItem::List &items,
const Collection &collection)
350 int transactionDepth = 0;
351 while (storageBackend()->inTransaction()) {
353 storageBackend()->commitTransaction();
355 const AkScopeGuard restoreTransaction([&]() {
356 for (
int i = 0; i < transactionDepth; ++i) {
357 storageBackend()->beginTransaction(QStringLiteral(
"RestoredTransactionAfterMMCRecovery"));
361 Transaction transaction(storageBackend(), QStringLiteral(
"MMC Recovery Transaction"));
367 if (items | Actions::any([](
const auto &item) {
368 return item.dirty() || item.remoteId().isEmpty();
370 qCWarning(AKONADISERVER_LOG) <<
"Automatic multiple merge candidates recovery failed: at least one of the candidates has uncommitted changes!";
375 if (items | Actions::any([collection](
const auto &item) {
376 return item.collectionId() != collection.id();
378 qCWarning(AKONADISERVER_LOG) <<
"Automatic multiple merge candidates recovery failed: all candidates do not belong to the same collection.";
382 storageBackend()->cleanupPimItems(items, DataStore::Silent);
383 if (!transaction.commit()) {
384 qCWarning(AKONADISERVER_LOG) <<
"Automatic multiple merge candidates recovery failed: failed to commit database transaction.";
389 akonadi().itemRetrievalManager().triggerCollectionSync(collection.resource().name(), collection.id());
391 qCInfo(AKONADISERVER_LOG) <<
"Automatic multiple merge candidates recovery successful: conflicting items"
392 << (items | Views::transform([](
const auto &i) {
395 | Actions::toQVector)
396 <<
"in collection" << collection.name() <<
"(ID:" << collection.id()
397 <<
") were removed and a new sync was scheduled in the resource" << collection.resource().name();
400 bool ItemCreateHandler::parseStream()
402 const auto &cmd = Protocol::cmdCast<Protocol::CreateItemCommand>(m_command);
408 Transaction transaction(storageBackend(), QStringLiteral(
"ItemCreateHandler"));
409 ExternalPartStorageTransaction storageTrx;
413 if (!buildPimItem(cmd, item, parentCol)) {
417 if (cmd.mergeModes() == Protocol::CreateItemCommand::None) {
418 if (!insertItem(cmd, item, parentCol)) {
421 if (!transaction.
commit()) {
422 return failureResponse(QStringLiteral(
"Failed to commit transaction"));
429 qb.
addValueCondition(PimItem::collectionIdColumn(), Query::Equals, parentCol.id());
433 if (cmd.mergeModes() & Protocol::CreateItemCommand::GID) {
434 mergeCondition.
addValueCondition(PimItem::gidColumn(), Query::Equals, item.gid());
436 if (cmd.mergeModes() & Protocol::CreateItemCommand::RemoteID) {
437 mergeCondition.
addValueCondition(PimItem::remoteIdColumn(), Query::Equals, item.remoteId());
443 if (cmd.mergeModes() & Protocol::CreateItemCommand::GID && !item.remoteId().isEmpty()) {
445 mergeCondition.
addValueCondition(PimItem::remoteIdColumn(), Query::Equals, item.remoteId());
452 return failureResponse(
"Failed to query database for item");
459 if (!insertItem(cmd, item, parentCol)) {
462 if (!transaction.
commit()) {
463 return failureResponse(
"Failed to commit transaction");
467 }
else if (result.
count() == 1) {
470 PimItem existingItem = result.
at(0);
472 if (!mergeItem(cmd, item, existingItem, parentCol)) {
475 if (!transaction.
commit()) {
476 return failureResponse(
"Failed to commit transaction");
480 qCWarning(AKONADISERVER_LOG) <<
"Multiple merge candidates, will attempt to recover:";
481 for (
const PimItem &item : result) {
482 qCWarning(AKONADISERVER_LOG) <<
"\tID:" << item.id() <<
", RID:" << item.remoteId() <<
", GID:" << item.gid()
483 <<
", Collection:" << item.collection().name() <<
"(" << item.collectionId() <<
")"
484 <<
", Resource:" << item.collection().resource().name() <<
"(" << item.collection().resourceId() <<
")";
488 recoverFromMultipleMergeCandidates(result, parentCol);
492 return failureResponse(QStringLiteral(
"Multiple merge candidates in collection '%1', aborting").arg(item.collection().name()));
496 return successResponse<Protocol::CreateItemResponse>();