Akonadi

itemcreatehandler.cpp
1 /***************************************************************************
2  * SPDX-FileCopyrightText: 2007 Robert Zwerus <[email protected]> *
3  * *
4  * SPDX-License-Identifier: LGPL-2.0-or-later *
5  ***************************************************************************/
6 
7 #include "itemcreatehandler.h"
8 
9 #include "akonadi.h"
10 #include "connection.h"
11 #include "handlerhelper.h"
12 #include "itemfetchhelper.h"
13 #include "preprocessormanager.h"
14 #include "storage/datastore.h"
15 #include "storage/dbconfig.h"
16 #include "storage/itemretrievalmanager.h"
17 #include "storage/parthelper.h"
18 #include "storage/partstreamer.h"
19 #include "storage/parttypehelper.h"
20 #include "storage/selectquerybuilder.h"
21 #include "storage/transaction.h"
22 #include <private/externalpartstorage_p.h>
23 
24 #include "shared/akranges.h"
25 #include "shared/akscopeguard.h"
26 
27 #include <numeric> //std::accumulate
28 
29 using namespace Akonadi;
30 using namespace Akonadi::Server;
31 using namespace AkRanges;
32 
33 ItemCreateHandler::ItemCreateHandler(AkonadiServer &akonadi)
34  : Handler(akonadi)
35 {
36 }
37 
38 bool ItemCreateHandler::buildPimItem(const Protocol::CreateItemCommand &cmd, PimItem &item, Collection &parentCol)
39 {
40  parentCol = HandlerHelper::collectionFromScope(cmd.collection(), connection()->context());
41  if (!parentCol.isValid()) {
42  return failureResponse(QStringLiteral("Invalid parent collection"));
43  }
44  if (parentCol.isVirtual()) {
45  return failureResponse(QStringLiteral("Cannot append item into virtual collection"));
46  }
47 
48  MimeType mimeType = MimeType::retrieveByNameOrCreate(cmd.mimeType());
49  if (!mimeType.isValid()) {
50  return failureResponse(QStringLiteral("Unable to create mimetype '") % cmd.mimeType() % QStringLiteral("'."));
51  }
52 
53  item.setRev(0);
54  item.setSize(cmd.itemSize());
55  item.setMimeTypeId(mimeType.id());
56  item.setCollectionId(parentCol.id());
57  item.setDatetime(cmd.dateTime());
58  if (cmd.remoteId().isEmpty()) {
59  // from application
60  item.setDirty(true);
61  } else {
62  // from resource
63  item.setRemoteId(cmd.remoteId());
64  item.setDirty(false);
65  }
66  item.setRemoteRevision(cmd.remoteRevision());
67  item.setGid(cmd.gid());
68  item.setAtime(QDateTime::currentDateTimeUtc());
69 
70  return true;
71 }
72 
73 bool ItemCreateHandler::insertItem(const Protocol::CreateItemCommand &cmd, PimItem &item, const Collection &parentCol)
74 {
75  if (!item.datetime().isValid()) {
76  item.setDatetime(QDateTime::currentDateTimeUtc());
77  }
78 
79  if (!item.insert()) {
80  return failureResponse(QStringLiteral("Failed to append item"));
81  }
82 
83  // set message flags
84  const QSet<QByteArray> flags = cmd.mergeModes() == Protocol::CreateItemCommand::None ? cmd.flags() : cmd.addedFlags();
85  if (!flags.isEmpty()) {
86  // This will hit an entry in cache inserted there in buildPimItem()
87  const Flag::List flagList = HandlerHelper::resolveFlags(flags);
88  bool flagsChanged = false;
89  if (!storageBackend()->appendItemsFlags({item}, flagList, &flagsChanged, false, parentCol, true)) {
90  return failureResponse("Unable to append item flags.");
91  }
92  }
93 
94  const Scope tags = cmd.mergeModes() == Protocol::CreateItemCommand::None ? cmd.tags() : cmd.addedTags();
95  if (!tags.isEmpty()) {
96  const Tag::List tagList = HandlerHelper::tagsFromScope(tags, connection()->context());
97  bool tagsChanged = false;
98  if (!storageBackend()->appendItemsTags({item}, tagList, &tagsChanged, false, parentCol, true)) {
99  return failureResponse(QStringLiteral("Unable to append item tags."));
100  }
101  }
102 
103  // Handle individual parts
104  qint64 partSizes = 0;
105  PartStreamer streamer(connection(), item);
106  const auto parts = cmd.parts();
107  for (const QByteArray &partName : parts) {
108  qint64 partSize = 0;
109  try {
110  streamer.stream(true, partName, partSize);
111  } catch (const PartStreamerException &e) {
112  return failureResponse(e.what());
113  }
114  partSizes += partSize;
115  }
116  const Protocol::Attributes attrs = cmd.attributes();
117  for (auto iter = attrs.cbegin(), end = attrs.cend(); iter != end; ++iter) {
118  try {
119  streamer.streamAttribute(true, iter.key(), iter.value());
120  } catch (const PartStreamerException &e) {
121  return failureResponse(e.what());
122  }
123  }
124 
125  // TODO: Try to avoid this addition query
126  if (partSizes > item.size()) {
127  item.setSize(partSizes);
128  item.update();
129  }
130 
131  // Preprocessing
132  if (akonadi().preprocessorManager().isActive()) {
133  Part hiddenAttribute;
134  hiddenAttribute.setPimItemId(item.id());
135  hiddenAttribute.setPartType(PartTypeHelper::fromFqName(QStringLiteral(AKONADI_ATTRIBUTE_HIDDEN)));
136  hiddenAttribute.setData(QByteArray());
137  hiddenAttribute.setDatasize(0);
138  // TODO: Handle errors? Technically, this is not a critical issue as no data are lost
139  PartHelper::insert(&hiddenAttribute);
140  }
141 
142  const bool seen = flags.contains(AKONADI_FLAG_SEEN) || flags.contains(AKONADI_FLAG_IGNORED);
143  notify(item, seen, item.collection());
144  sendResponse(item, Protocol::CreateItemCommand::None);
145 
146  return true;
147 }
148 
149 bool ItemCreateHandler::mergeItem(const Protocol::CreateItemCommand &cmd, PimItem &newItem, PimItem &currentItem, const Collection &parentCol)
150 {
151  bool needsUpdate = false;
152  QSet<QByteArray> changedParts;
153 
154  if (!newItem.remoteId().isEmpty() && currentItem.remoteId() != newItem.remoteId()) {
155  currentItem.setRemoteId(newItem.remoteId());
156  changedParts.insert(AKONADI_PARAM_REMOTEID);
157  needsUpdate = true;
158  }
159  if (!newItem.remoteRevision().isEmpty() && currentItem.remoteRevision() != newItem.remoteRevision()) {
160  currentItem.setRemoteRevision(newItem.remoteRevision());
161  changedParts.insert(AKONADI_PARAM_REMOTEREVISION);
162  needsUpdate = true;
163  }
164  if (!newItem.gid().isEmpty() && currentItem.gid() != newItem.gid()) {
165  currentItem.setGid(newItem.gid());
166  changedParts.insert(AKONADI_PARAM_GID);
167  needsUpdate = true;
168  }
169  if (newItem.datetime().isValid() && newItem.datetime() != currentItem.datetime()) {
170  currentItem.setDatetime(newItem.datetime());
171  needsUpdate = true;
172  }
173 
174  if (newItem.size() > 0 && newItem.size() != currentItem.size()) {
175  currentItem.setSize(newItem.size());
176  needsUpdate = true;
177  }
178 
179  const Collection col = Collection::retrieveById(parentCol.id());
180  if (cmd.flags().isEmpty() && !cmd.flagsOverwritten()) {
181  bool flagsAdded = false;
182  bool flagsRemoved = false;
183  if (!cmd.addedFlags().isEmpty()) {
184  const auto addedFlags = HandlerHelper::resolveFlags(cmd.addedFlags());
185  storageBackend()->appendItemsFlags({currentItem}, addedFlags, &flagsAdded, true, col, true);
186  }
187  if (!cmd.removedFlags().isEmpty()) {
188  const auto removedFlags = HandlerHelper::resolveFlags(cmd.removedFlags());
189  storageBackend()->removeItemsFlags({currentItem}, removedFlags, &flagsRemoved, col, true);
190  }
191  if (flagsAdded || flagsRemoved) {
192  changedParts.insert(AKONADI_PARAM_FLAGS);
193  needsUpdate = true;
194  }
195  } else {
196  bool flagsChanged = false;
197  QSet<QByteArray> flagNames = cmd.flags();
198 
199  static QVector<QByteArray> localFlagsToPreserve = {"$ATTACHMENT", "$INVITATION", "$ENCRYPTED", "$SIGNED", "$WATCHED"};
200 
201  // Make sure we don't overwrite some local-only flags that can't come
202  // through from Resource during ItemSync, like $ATTACHMENT, because the
203  // resource is not aware of them (they are usually assigned by client
204  // upon inspecting the payload)
205  const Flag::List currentFlags = currentItem.flags();
206  for (const Flag &currentFlag : currentFlags) {
207  const QByteArray currentFlagName = currentFlag.name().toLatin1();
208  if (localFlagsToPreserve.contains(currentFlagName)) {
209  flagNames.insert(currentFlagName);
210  }
211  }
212  const auto flags = HandlerHelper::resolveFlags(flagNames);
213  storageBackend()->setItemsFlags({currentItem}, &currentFlags, flags, &flagsChanged, col, true);
214  if (flagsChanged) {
215  changedParts.insert(AKONADI_PARAM_FLAGS);
216  needsUpdate = true;
217  }
218  }
219 
220  if (cmd.tags().isEmpty()) {
221  bool tagsAdded = false;
222  bool tagsRemoved = false;
223  if (!cmd.addedTags().isEmpty()) {
224  const auto addedTags = HandlerHelper::tagsFromScope(cmd.addedTags(), connection()->context());
225  storageBackend()->appendItemsTags({currentItem}, addedTags, &tagsAdded, true, col, true);
226  }
227  if (!cmd.removedTags().isEmpty()) {
228  const Tag::List removedTags = HandlerHelper::tagsFromScope(cmd.removedTags(), connection()->context());
229  storageBackend()->removeItemsTags({currentItem}, removedTags, &tagsRemoved, true);
230  }
231 
232  if (tagsAdded || tagsRemoved) {
233  changedParts.insert(AKONADI_PARAM_TAGS);
234  needsUpdate = true;
235  }
236  } else {
237  bool tagsChanged = false;
238  const auto tags = HandlerHelper::tagsFromScope(cmd.tags(), connection()->context());
239  storageBackend()->setItemsTags({currentItem}, tags, &tagsChanged, true);
240  if (tagsChanged) {
241  changedParts.insert(AKONADI_PARAM_TAGS);
242  needsUpdate = true;
243  }
244  }
245 
246  const Part::List existingParts = Part::retrieveFiltered(Part::pimItemIdColumn(), currentItem.id());
247  QMap<QByteArray, qint64> partsSizes;
248  for (const Part &part : existingParts) {
249  partsSizes.insert(PartTypeHelper::fullName(part.partType()).toLatin1(), part.datasize());
250  }
251 
252  PartStreamer streamer(connection(), currentItem);
253  const auto partNames = cmd.parts();
254  for (const QByteArray &partName : partNames) {
255  bool changed = false;
256  qint64 partSize = 0;
257  try {
258  streamer.stream(true, partName, partSize, &changed);
259  } catch (const PartStreamerException &e) {
260  return failureResponse(e.what());
261  }
262 
263  if (changed) {
264  changedParts.insert(partName);
265  partsSizes.insert(partName, partSize);
266  needsUpdate = true;
267  }
268  }
269 
270  const qint64 size = std::accumulate(partsSizes.begin(), partsSizes.end(), 0LL);
271  if (size > currentItem.size()) {
272  currentItem.setSize(size);
273  needsUpdate = true;
274  }
275 
276  if (needsUpdate) {
277  currentItem.setRev(qMax(newItem.rev(), currentItem.rev()) + 1);
278  currentItem.setAtime(QDateTime::currentDateTimeUtc());
279  // Only mark dirty when merged from application
280  currentItem.setDirty(!connection()->context().resource().isValid());
281 
282  // Store all changes
283  if (!currentItem.update()) {
284  return failureResponse("Failed to store merged item");
285  }
286 
287  notify(currentItem, currentItem.collection(), changedParts);
288  }
289 
290  sendResponse(currentItem, cmd.mergeModes());
291 
292  return true;
293 }
294 
295 bool ItemCreateHandler::sendResponse(const PimItem &item, Protocol::CreateItemCommand::MergeModes mergeModes)
296 {
297  if (mergeModes & Protocol::CreateItemCommand::Silent || mergeModes & Protocol::CreateItemCommand::None) {
298  Protocol::FetchItemsResponse resp;
299  resp.setId(item.id());
300  resp.setMTime(item.datetime());
301  Handler::sendResponse(std::move(resp));
302  return true;
303  }
304 
305  Protocol::ItemFetchScope fetchScope;
306  fetchScope.setAncestorDepth(Protocol::ItemFetchScope::ParentAncestor);
307  fetchScope.setFetch(Protocol::ItemFetchScope::AllAttributes | Protocol::ItemFetchScope::FullPayload | Protocol::ItemFetchScope::CacheOnly
308  | Protocol::ItemFetchScope::Flags | Protocol::ItemFetchScope::GID | Protocol::ItemFetchScope::MTime | Protocol::ItemFetchScope::RemoteID
309  | Protocol::ItemFetchScope::RemoteRevision | Protocol::ItemFetchScope::Size | Protocol::ItemFetchScope::Tags);
310  ImapSet set;
311  set.add(QVector<qint64>() << item.id());
312  Scope scope;
313  scope.setUidSet(set);
314 
315  ItemFetchHelper fetchHelper(connection(), scope, fetchScope, Protocol::TagFetchScope{}, akonadi());
316  if (!fetchHelper.fetchItems()) {
317  return failureResponse("Failed to retrieve item");
318  }
319 
320  return true;
321 }
322 
323 bool ItemCreateHandler::notify(const PimItem &item, bool seen, const Collection &collection)
324 {
325  storageBackend()->notificationCollector()->itemAdded(item, seen, collection);
326 
327  if (akonadi().preprocessorManager().isActive()) {
328  // enqueue the item for preprocessing
329  akonadi().preprocessorManager().beginHandleItem(item, storageBackend());
330  }
331  return true;
332 }
333 
334 bool ItemCreateHandler::notify(const PimItem &item, const Collection &collection, const QSet<QByteArray> &changedParts)
335 {
336  if (!changedParts.isEmpty()) {
337  storageBackend()->notificationCollector()->itemChanged(item, changedParts, collection);
338  }
339  return true;
340 }
341 
342 void ItemCreateHandler::recoverFromMultipleMergeCandidates(const PimItem::List &items, const Collection &collection)
343 {
344  // HACK HACK HACK: When this happens within ItemSync, we are running inside a client-side
345  // transaction, so just calling commit here won't have any effect, since this handler will
346  // ultimately fail and the client will rollback the transaction. To circumvent this, we
347  // will forcibly commit the transaction, do our changes here within a new transaction and
348  // then we open a new transaction so that the client won't notice.
349 
350  int transactionDepth = 0;
351  while (storageBackend()->inTransaction()) {
352  ++transactionDepth;
353  storageBackend()->commitTransaction();
354  }
355  const AkScopeGuard restoreTransaction([&]() {
356  for (int i = 0; i < transactionDepth; ++i) {
357  storageBackend()->beginTransaction(QStringLiteral("RestoredTransactionAfterMMCRecovery"));
358  }
359  });
360 
361  Transaction transaction(storageBackend(), QStringLiteral("MMC Recovery Transaction"));
362 
363  // If any of the conflicting items is dirty or does not have a remote ID, we don't want to remove
364  // them as it would cause data loss. There's a chance next changeReplay will fix this, so
365  // next time the ItemSync hits this multiple merge candidates, all changes will be committed
366  // and this check will succeed
367  if (items | Actions::any([](const auto &item) {
368  return item.dirty() || item.remoteId().isEmpty();
369  })) {
370  qCWarning(AKONADISERVER_LOG) << "Automatic multiple merge candidates recovery failed: at least one of the candidates has uncommitted changes!";
371  return;
372  }
373 
374  // This cannot happen with ItemSync, but in theory could happen during individual GID merge.
375  if (items | Actions::any([collection](const auto &item) {
376  return item.collectionId() != collection.id();
377  })) {
378  qCWarning(AKONADISERVER_LOG) << "Automatic multiple merge candidates recovery failed: all candidates do not belong to the same collection.";
379  return;
380  }
381 
382  storageBackend()->cleanupPimItems(items, DataStore::Silent);
383  if (!transaction.commit()) {
384  qCWarning(AKONADISERVER_LOG) << "Automatic multiple merge candidates recovery failed: failed to commit database transaction.";
385  return;
386  }
387 
388  // Schedule a new sync of the collection, one that will succeed
389  akonadi().itemRetrievalManager().triggerCollectionSync(collection.resource().name(), collection.id());
390 
391  qCInfo(AKONADISERVER_LOG) << "Automatic multiple merge candidates recovery successful: conflicting items"
392  << (items | Views::transform([](const auto &i) {
393  return i.id();
394  })
395  | Actions::toQVector)
396  << "in collection" << collection.name() << "(ID:" << collection.id()
397  << ") were removed and a new sync was scheduled in the resource" << collection.resource().name();
398 }
399 
400 bool ItemCreateHandler::parseStream()
401 {
402  const auto &cmd = Protocol::cmdCast<Protocol::CreateItemCommand>(m_command);
403 
404  // FIXME: The streaming/reading of all item parts can hold the transaction for
405  // unnecessary long time -> should we wrap the PimItem into one transaction
406  // and try to insert Parts independently? In case we fail to insert a part,
407  // it's not a problem as it can be re-fetched at any time, except for attributes.
408  Transaction transaction(storageBackend(), QStringLiteral("ItemCreateHandler"));
409  ExternalPartStorageTransaction storageTrx;
410 
411  PimItem item;
412  Collection parentCol;
413  if (!buildPimItem(cmd, item, parentCol)) {
414  return false;
415  }
416 
417  if (cmd.mergeModes() == Protocol::CreateItemCommand::None) {
418  if (!insertItem(cmd, item, parentCol)) {
419  return false;
420  }
421  if (!transaction.commit()) {
422  return failureResponse(QStringLiteral("Failed to commit transaction"));
423  }
424  storageTrx.commit();
425  } else {
426  // Merging is always restricted to the same collection
428  qb.setForUpdate();
429  qb.addValueCondition(PimItem::collectionIdColumn(), Query::Equals, parentCol.id());
430  Query::Condition rootCondition(Query::Or);
431 
432  Query::Condition mergeCondition(Query::And);
433  if (cmd.mergeModes() & Protocol::CreateItemCommand::GID) {
434  mergeCondition.addValueCondition(PimItem::gidColumn(), Query::Equals, item.gid());
435  }
436  if (cmd.mergeModes() & Protocol::CreateItemCommand::RemoteID) {
437  mergeCondition.addValueCondition(PimItem::remoteIdColumn(), Query::Equals, item.remoteId());
438  }
439  rootCondition.addCondition(mergeCondition);
440 
441  // If an Item with matching RID but empty GID exists during GID merge,
442  // merge into this item instead of creating a new one
443  if (cmd.mergeModes() & Protocol::CreateItemCommand::GID && !item.remoteId().isEmpty()) {
444  mergeCondition = Query::Condition(Query::And);
445  mergeCondition.addValueCondition(PimItem::remoteIdColumn(), Query::Equals, item.remoteId());
446  mergeCondition.addValueCondition(PimItem::gidColumn(), Query::Equals, QLatin1String(""));
447  rootCondition.addCondition(mergeCondition);
448  }
449  qb.addCondition(rootCondition);
450 
451  if (!qb.exec()) {
452  return failureResponse("Failed to query database for item");
453  }
454 
455  const QVector<PimItem> result = qb.result();
456  if (result.isEmpty()) {
457  // No item with such GID/RID exists, so call ItemCreateHandler::insert() and behave
458  // like if this was a new item
459  if (!insertItem(cmd, item, parentCol)) {
460  return false;
461  }
462  if (!transaction.commit()) {
463  return failureResponse("Failed to commit transaction");
464  }
465  storageTrx.commit();
466 
467  } else if (result.count() == 1) {
468  // Item with matching GID/RID combination exists, so merge this item into it
469  // and send itemChanged()
470  PimItem existingItem = result.at(0);
471 
472  if (!mergeItem(cmd, item, existingItem, parentCol)) {
473  return false;
474  }
475  if (!transaction.commit()) {
476  return failureResponse("Failed to commit transaction");
477  }
478  storageTrx.commit();
479  } else {
480  qCWarning(AKONADISERVER_LOG) << "Multiple merge candidates, will attempt to recover:";
481  for (const PimItem &item : result) {
482  qCWarning(AKONADISERVER_LOG) << "\tID:" << item.id() << ", RID:" << item.remoteId() << ", GID:" << item.gid()
483  << ", Collection:" << item.collection().name() << "(" << item.collectionId() << ")"
484  << ", Resource:" << item.collection().resource().name() << "(" << item.collection().resourceId() << ")";
485  }
486 
487  transaction.commit(); // commit the current transaction, before we attempt MMC recovery
488  recoverFromMultipleMergeCandidates(result, parentCol);
489 
490  // Even if the recovery was successful, indicate error to force the client to abort the
491  // sync, since we've interfered with the overall state.
492  return failureResponse(QStringLiteral("Multiple merge candidates in collection '%1', aborting").arg(item.collection().name()));
493  }
494  }
495 
496  return successResponse<Protocol::CreateItemResponse>();
497 }
bool isEmpty() const const
Helper class for DataStore transaction handling.
Definition: transaction.h:22
void setForUpdate(bool forUpdate=true)
Indicate to the database to acquire an exclusive lock on the rows already during SELECT statement.
QMap::iterator begin()
Represents a collection of PIM items.
Definition: collection.h:61
QDateTime currentDateTimeUtc()
KCALUTILS_EXPORT QString mimeType()
QMap::iterator insert(const Key &key, const T &value)
QMap::iterator end()
const T & at(int i) const const
QVector< T > result()
Returns the result of this SELECT query.
void addValueCondition(const QString &column, CompareOperator op, const QVariant &value)
Add a WHERE condition which compares a column with a given value.
Definition: query.cpp:12
bool contains(const T &value) const const
void addValueCondition(const QString &column, Query::CompareOperator op, const QVariant &value, ConditionType type=WhereCondition)
Add a WHERE or HAVING condition which compares a column with a given value.
void addCondition(const Condition &condition)
Add a WHERE condition.
Definition: query.cpp:54
bool contains(const T &value) const const
bool exec()
Executes the query, returns true on success.
Helper class for creating and executing database SELECT queries.
The handler interfaces describes an entity capable of handling an AkonadiIMAP command.
Definition: handler.h:39
bool isValid(QStringView ifopt)
QSet::iterator insert(const T &value)
int count(const T &value) const const
Represents a WHERE condition tree.
Definition: query.h:61
void addCondition(const Query::Condition &condition, ConditionType type=WhereCondition)
Add a WHERE condition.
bool commit()
Commits the transaction.
Definition: transaction.cpp:29
bool isEmpty() const const
Helper integration between Akonadi and Qt.
This file is part of the KDE documentation.
Documentation copyright © 1996-2022 The KDE developers.
Generated on Thu Jun 30 2022 03:51:46 by doxygen 1.8.17 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.