Akonadi

itemcreatehandler.cpp
1 /***************************************************************************
2  * Copyright (C) 2007 by Robert Zwerus <[email protected]> *
3  * *
4  * This program is free software; you can redistribute it and/or modify *
5  * it under the terms of the GNU Library General Public License as *
6  * published by the Free Software Foundation; either version 2 of the *
7  * License, or (at your option) any later version. *
8  * *
9  * This program is distributed in the hope that it will be useful, *
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
12  * GNU General Public License for more details. *
13  * *
14  * You should have received a copy of the GNU Library General Public *
15  * License along with this program; if not, write to the *
16  * Free Software Foundation, Inc., *
17  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. *
18  ***************************************************************************/
19 
20 #include "itemcreatehandler.h"
21 
22 #include "itemfetchhelper.h"
23 #include "akonadi.h"
24 #include "connection.h"
25 #include "preprocessormanager.h"
26 #include "handlerhelper.h"
27 #include "storage/datastore.h"
28 #include "storage/transaction.h"
29 #include "storage/parttypehelper.h"
30 #include "storage/dbconfig.h"
31 #include "storage/partstreamer.h"
32 #include "storage/parthelper.h"
33 #include "storage/selectquerybuilder.h"
34 #include "storage/itemretrievalmanager.h"
35 #include <private/externalpartstorage_p.h>
36 
37 #include "shared/akranges.h"
38 #include "shared/akscopeguard.h"
39 
40 #include <numeric> //std::accumulate
41 
42 using namespace Akonadi;
43 using namespace Akonadi::Server;
44 using namespace AkRanges;
45 
46 ItemCreateHandler::ItemCreateHandler(AkonadiServer &akonadi)
47  : Handler(akonadi)
48 {}
49 
50 bool ItemCreateHandler::buildPimItem(const Protocol::CreateItemCommand &cmd, PimItem &item,
51  Collection &parentCol)
52 {
53  parentCol = HandlerHelper::collectionFromScope(cmd.collection(), connection()->context());
54  if (!parentCol.isValid()) {
55  return failureResponse(QStringLiteral("Invalid parent collection"));
56  }
57  if (parentCol.isVirtual()) {
58  return failureResponse(QStringLiteral("Cannot append item into virtual collection"));
59  }
60 
61  MimeType mimeType = MimeType::retrieveByNameOrCreate(cmd.mimeType());
62  if (!mimeType.isValid()) {
63  return failureResponse(QStringLiteral("Unable to create mimetype '") % cmd.mimeType() % QStringLiteral("'."));
64  }
65 
66  item.setRev(0);
67  item.setSize(cmd.itemSize());
68  item.setMimeTypeId(mimeType.id());
69  item.setCollectionId(parentCol.id());
70  item.setDatetime(cmd.dateTime());
71  if (cmd.remoteId().isEmpty()) {
72  // from application
73  item.setDirty(true);
74  } else {
75  // from resource
76  item.setRemoteId(cmd.remoteId());
77  item.setDirty(false);
78  }
79  item.setRemoteRevision(cmd.remoteRevision());
80  item.setGid(cmd.gid());
81  item.setAtime(QDateTime::currentDateTimeUtc());
82 
83  return true;
84 }
85 
86 bool ItemCreateHandler::insertItem(const Protocol::CreateItemCommand &cmd, PimItem &item,
87  const Collection &parentCol)
88 {
89  if (!item.datetime().isValid()) {
90  item.setDatetime(QDateTime::currentDateTimeUtc());
91  }
92 
93  if (!item.insert()) {
94  return failureResponse(QStringLiteral("Failed to append item"));
95  }
96 
97  // set message flags
98  const QSet<QByteArray> flags = cmd.mergeModes() == Protocol::CreateItemCommand::None ? cmd.flags() : cmd.addedFlags();
99  if (!flags.isEmpty()) {
100  // This will hit an entry in cache inserted there in buildPimItem()
101  const Flag::List flagList = HandlerHelper::resolveFlags(flags);
102  bool flagsChanged = false;
103  if (!storageBackend()->appendItemsFlags({item}, flagList, &flagsChanged, false, parentCol, true)) {
104  return failureResponse("Unable to append item flags.");
105  }
106  }
107 
108  const Scope tags = cmd.mergeModes() == Protocol::CreateItemCommand::None ? cmd.tags() : cmd.addedTags();
109  if (!tags.isEmpty()) {
110  const Tag::List tagList = HandlerHelper::tagsFromScope(tags, connection()->context());
111  bool tagsChanged = false;
112  if (!storageBackend()->appendItemsTags({item}, tagList, &tagsChanged, false, parentCol, true)) {
113  return failureResponse(QStringLiteral("Unable to append item tags."));
114  }
115  }
116 
117  // Handle individual parts
118  qint64 partSizes = 0;
119  PartStreamer streamer(connection(), item);
120  Q_FOREACH (const QByteArray &partName, cmd.parts()) {
121  qint64 partSize = 0;
122  try {
123  streamer.stream(true, partName, partSize);
124  } catch (const PartStreamerException &e) {
125  return failureResponse(e.what());
126  }
127  partSizes += partSize;
128  }
129  const Protocol::Attributes attrs = cmd.attributes();
130  for (auto iter = attrs.cbegin(), end = attrs.cend(); iter != end; ++iter) {
131  try {
132  streamer.streamAttribute(true, iter.key(), iter.value());
133  } catch (const PartStreamerException &e) {
134  return failureResponse(e.what());
135  }
136  }
137 
138  // TODO: Try to avoid this addition query
139  if (partSizes > item.size()) {
140  item.setSize(partSizes);
141  item.update();
142  }
143 
144  // Preprocessing
145  if (akonadi().preprocessorManager().isActive()) {
146  Part hiddenAttribute;
147  hiddenAttribute.setPimItemId(item.id());
148  hiddenAttribute.setPartType(PartTypeHelper::fromFqName(QStringLiteral(AKONADI_ATTRIBUTE_HIDDEN)));
149  hiddenAttribute.setData(QByteArray());
150  hiddenAttribute.setDatasize(0);
151  // TODO: Handle errors? Technically, this is not a critical issue as no data are lost
152  PartHelper::insert(&hiddenAttribute);
153  }
154 
155  const bool seen = flags.contains(AKONADI_FLAG_SEEN) || flags.contains(AKONADI_FLAG_IGNORED);
156  notify(item, seen, item.collection());
157  sendResponse(item, Protocol::CreateItemCommand::None);
158 
159  return true;
160 }
161 
162 bool ItemCreateHandler::mergeItem(const Protocol::CreateItemCommand &cmd,
163  PimItem &newItem, PimItem &currentItem,
164  const Collection &parentCol)
165 {
166  bool needsUpdate = false;
167  QSet<QByteArray> changedParts;
168 
169  if (!newItem.remoteId().isEmpty() && currentItem.remoteId() != newItem.remoteId()) {
170  currentItem.setRemoteId(newItem.remoteId());
171  changedParts.insert(AKONADI_PARAM_REMOTEID);
172  needsUpdate = true;
173  }
174  if (!newItem.remoteRevision().isEmpty() && currentItem.remoteRevision() != newItem.remoteRevision()) {
175  currentItem.setRemoteRevision(newItem.remoteRevision());
176  changedParts.insert(AKONADI_PARAM_REMOTEREVISION);
177  needsUpdate = true;
178  }
179  if (!newItem.gid().isEmpty() && currentItem.gid() != newItem.gid()) {
180  currentItem.setGid(newItem.gid());
181  changedParts.insert(AKONADI_PARAM_GID);
182  needsUpdate = true;
183  }
184  if (newItem.datetime().isValid() && newItem.datetime() != currentItem.datetime()) {
185  currentItem.setDatetime(newItem.datetime());
186  needsUpdate = true;
187  }
188 
189  if (newItem.size() > 0 && newItem.size() != currentItem.size()) {
190  currentItem.setSize(newItem.size());
191  needsUpdate = true;
192  }
193 
194  const Collection col = Collection::retrieveById(parentCol.id());
195  if (cmd.flags().isEmpty() && !cmd.flagsOverwritten()) {
196  bool flagsAdded = false, flagsRemoved = false;
197  if (!cmd.addedFlags().isEmpty()) {
198  const auto addedFlags = HandlerHelper::resolveFlags(cmd.addedFlags());
199  storageBackend()->appendItemsFlags({currentItem}, addedFlags, &flagsAdded, true, col, true);
200  }
201  if (!cmd.removedFlags().isEmpty()) {
202  const auto removedFlags = HandlerHelper::resolveFlags(cmd.removedFlags());
203  storageBackend()->removeItemsFlags({currentItem}, removedFlags, &flagsRemoved, col, true);
204  }
205  if (flagsAdded || flagsRemoved) {
206  changedParts.insert(AKONADI_PARAM_FLAGS);
207  needsUpdate = true;
208  }
209  } else {
210  bool flagsChanged = false;
211  QSet<QByteArray> flagNames = cmd.flags();
212 
213  static QVector<QByteArray> localFlagsToPreserve = {
214  "$ATTACHMENT",
215  "$INVITATION",
216  "$ENCRYPTED",
217  "$SIGNED",
218  "$WATCHED" };
219 
220  // Make sure we don't overwrite some local-only flags that can't come
221  // through from Resource during ItemSync, like $ATTACHMENT, because the
222  // resource is not aware of them (they are usually assigned by client
223  // upon inspecting the payload)
224  Q_FOREACH (const Flag &currentFlag, currentItem.flags()) {
225  const QByteArray currentFlagName = currentFlag.name().toLatin1();
226  if (localFlagsToPreserve.contains(currentFlagName)) {
227  flagNames.insert(currentFlagName);
228  }
229  }
230  const auto flags = HandlerHelper::resolveFlags(flagNames);
231  storageBackend()->setItemsFlags({currentItem}, flags, &flagsChanged, col, true);
232  if (flagsChanged) {
233  changedParts.insert(AKONADI_PARAM_FLAGS);
234  needsUpdate = true;
235  }
236  }
237 
238  if (cmd.tags().isEmpty()) {
239  bool tagsAdded = false, tagsRemoved = false;
240  if (!cmd.addedTags().isEmpty()) {
241  const auto addedTags = HandlerHelper::tagsFromScope(cmd.addedTags(), connection()->context());
242  storageBackend()->appendItemsTags({currentItem}, addedTags, &tagsAdded, true, col, true);
243  }
244  if (!cmd.removedTags().isEmpty()) {
245  const Tag::List removedTags = HandlerHelper::tagsFromScope(cmd.removedTags(), connection()->context());
246  storageBackend()->removeItemsTags({currentItem}, removedTags, &tagsRemoved, true);
247  }
248 
249  if (tagsAdded || tagsRemoved) {
250  changedParts.insert(AKONADI_PARAM_TAGS);
251  needsUpdate = true;
252  }
253  } else {
254  bool tagsChanged = false;
255  const auto tags = HandlerHelper::tagsFromScope(cmd.tags(), connection()->context());
256  storageBackend()->setItemsTags({currentItem}, tags, &tagsChanged, true);
257  if (tagsChanged) {
258  changedParts.insert(AKONADI_PARAM_TAGS);
259  needsUpdate = true;
260  }
261  }
262 
263  const Part::List existingParts = Part::retrieveFiltered(Part::pimItemIdColumn(), currentItem.id());
264  QMap<QByteArray, qint64> partsSizes;
265  for (const Part &part : existingParts) {
266  partsSizes.insert(PartTypeHelper::fullName(part.partType()).toLatin1(), part.datasize());
267  }
268 
269  PartStreamer streamer(connection(), currentItem);
270  Q_FOREACH (const QByteArray &partName, cmd.parts()) {
271  bool changed = false;
272  qint64 partSize = 0;
273  try {
274  streamer.stream(true, partName, partSize, &changed);
275  } catch (const PartStreamerException &e) {
276  return failureResponse(e.what());
277  }
278 
279  if (changed) {
280  changedParts.insert(partName);
281  partsSizes.insert(partName, partSize);
282  needsUpdate = true;
283  }
284  }
285 
286  const qint64 size = std::accumulate(partsSizes.begin(), partsSizes.end(), 0);
287  if (size > currentItem.size()) {
288  currentItem.setSize(size);
289  needsUpdate = true;
290  }
291 
292  if (needsUpdate) {
293  currentItem.setRev(qMax(newItem.rev(), currentItem.rev()) + 1);
294  currentItem.setAtime(QDateTime::currentDateTimeUtc());
295  // Only mark dirty when merged from application
296  currentItem.setDirty(!connection()->context().resource().isValid());
297 
298  // Store all changes
299  if (!currentItem.update()) {
300  return failureResponse("Failed to store merged item");
301  }
302 
303  notify(currentItem, currentItem.collection(), changedParts);
304  }
305 
306  sendResponse(currentItem, cmd.mergeModes());
307 
308  return true;
309 }
310 
311 bool ItemCreateHandler::sendResponse(const PimItem &item, Protocol::CreateItemCommand::MergeModes mergeModes)
312 {
313  if (mergeModes & Protocol::CreateItemCommand::Silent || mergeModes & Protocol::CreateItemCommand::None) {
314  Protocol::FetchItemsResponse resp;
315  resp.setId(item.id());
316  resp.setMTime(item.datetime());
317  Handler::sendResponse(std::move(resp));
318  return true;
319  }
320 
321  Protocol::ItemFetchScope fetchScope;
322  fetchScope.setAncestorDepth(Protocol::ItemFetchScope::ParentAncestor);
323  fetchScope.setFetch(Protocol::ItemFetchScope::AllAttributes |
324  Protocol::ItemFetchScope::FullPayload |
325  Protocol::ItemFetchScope::CacheOnly |
326  Protocol::ItemFetchScope::Flags |
327  Protocol::ItemFetchScope::GID |
328  Protocol::ItemFetchScope::MTime |
329  Protocol::ItemFetchScope::RemoteID |
330  Protocol::ItemFetchScope::RemoteRevision |
331  Protocol::ItemFetchScope::Size |
332  Protocol::ItemFetchScope::Tags);
333  ImapSet set;
334  set.add(QVector<qint64>() << item.id());
335  Scope scope;
336  scope.setUidSet(set);
337 
338  ItemFetchHelper fetchHelper(connection(), scope, fetchScope, Protocol::TagFetchScope{}, akonadi());
339  if (!fetchHelper.fetchItems()) {
340  return failureResponse("Failed to retrieve item");
341  }
342 
343  return true;
344 }
345 
346 bool ItemCreateHandler::notify(const PimItem &item, bool seen, const Collection &collection)
347 {
348  storageBackend()->notificationCollector()->itemAdded(item, seen, collection);
349 
350  if (akonadi().preprocessorManager().isActive()) {
351  // enqueue the item for preprocessing
352  akonadi().preprocessorManager().beginHandleItem(item, storageBackend());
353  }
354  return true;
355 }
356 
357 bool ItemCreateHandler::notify(const PimItem &item, const Collection &collection,
358  const QSet<QByteArray> &changedParts)
359 {
360  if (!changedParts.isEmpty()) {
361  storageBackend()->notificationCollector()->itemChanged(item, changedParts, collection);
362  }
363  return true;
364 }
365 
366 void ItemCreateHandler::recoverFromMultipleMergeCandidates(const PimItem::List &items, const Collection &collection)
367 {
368  // HACK HACK HACK: When this happens within ItemSync, we are running inside a client-side
369  // transaction, so just calling commit here won't have any effect, since this handler will
370  // ultimately fail and the client will rollback the transaction. To circumvent this, we
371  // will forcibly commit the transaction, do our changes here within a new transaction and
372  // then we open a new transaction so that the client won't notice.
373 
374  int transactionDepth = 0;
375  while (storageBackend()->inTransaction()) {
376  ++transactionDepth;
377  storageBackend()->commitTransaction();
378  }
379  const AkScopeGuard restoreTransaction([&]() {
380  for (int i = 0; i < transactionDepth; ++i) {
381  storageBackend()->beginTransaction(QStringLiteral("RestoredTransactionAfterMMCRecovery"));
382  }
383  });
384 
385  Transaction transaction(storageBackend(), QStringLiteral("MMC Recovery Transaction"));
386 
387  // If any of the conflicting items is dirty or does not have a remote ID, we don't want to remove
388  // them as it would cause data loss. There's a chance next changeReplay will fix this, so
389  // next time the ItemSync hits this multiple merge candidates, all changes will be committed
390  // and this check will succeed
391  if (items | Actions::any([](const auto &item) { return item.dirty() || item.remoteId().isEmpty(); })) {
392  qCWarning(AKONADISERVER_LOG) << "Automatic multiple merge candidates recovery failed: at least one of the candidates has uncommitted changes!";
393  return;
394  }
395 
396  // This cannot happen with ItemSync, but in theory could happen during individual GID merge.
397  if (items | Actions::any([collection](const auto &item) { return item.collectionId() != collection.id(); })) {
398  qCWarning(AKONADISERVER_LOG) << "Automatic multiple merge candidates recovery failed: all candidates do not belong to the same collection.";
399  return;
400  }
401 
402  storageBackend()->cleanupPimItems(items, DataStore::Silent);
403  if (!transaction.commit()) {
404  qCWarning(AKONADISERVER_LOG) << "Automatic multiple merge candidates recovery failed: failed to commit database transaction.";
405  return;
406  }
407 
408  // Schedule a new sync of the collection, one that will succeed
409  akonadi().itemRetrievalManager().triggerCollectionSync(collection.resource().name(), collection.id());
410 
411  qCInfo(AKONADISERVER_LOG) << "Automatic multiple merge candidates recovery successful: conflicting items" << (items | Views::transform([](const auto &i) { return i.id(); }) | Actions::toQVector)
412  << "in collection" << collection.name() << "(ID:" << collection.id() << ") were removed and a new sync was scheduled in the resource"
413  << collection.resource().name();
414 }
415 
416 bool ItemCreateHandler::parseStream()
417 {
418  const auto &cmd = Protocol::cmdCast<Protocol::CreateItemCommand>(m_command);
419 
420  // FIXME: The streaming/reading of all item parts can hold the transaction for
421  // unnecessary long time -> should we wrap the PimItem into one transaction
422  // and try to insert Parts independently? In case we fail to insert a part,
423  // it's not a problem as it can be re-fetched at any time, except for attributes.
424  Transaction transaction(storageBackend(), QStringLiteral("ItemCreateHandler"));
425  ExternalPartStorageTransaction storageTrx;
426 
427  PimItem item;
428  Collection parentCol;
429  if (!buildPimItem(cmd, item, parentCol)) {
430  return false;
431  }
432 
433  if (cmd.mergeModes() == Protocol::CreateItemCommand::None) {
434  if (!insertItem(cmd, item, parentCol)) {
435  return false;
436  }
437  if (!transaction.commit()) {
438  return failureResponse(QStringLiteral("Failed to commit transaction"));
439  }
440  storageTrx.commit();
441  } else {
442  // Merging is always restricted to the same collection
444  qb.setForUpdate();
445  qb.addValueCondition(PimItem::collectionIdColumn(), Query::Equals, parentCol.id());
446  Query::Condition rootCondition(Query::Or);
447 
448  Query::Condition mergeCondition(Query::And);
449  if (cmd.mergeModes() & Protocol::CreateItemCommand::GID) {
450  mergeCondition.addValueCondition(PimItem::gidColumn(), Query::Equals, item.gid());
451  }
452  if (cmd.mergeModes() & Protocol::CreateItemCommand::RemoteID) {
453  mergeCondition.addValueCondition(PimItem::remoteIdColumn(), Query::Equals, item.remoteId());
454  }
455  rootCondition.addCondition(mergeCondition);
456 
457  // If an Item with matching RID but empty GID exists during GID merge,
458  // merge into this item instead of creating a new one
459  if (cmd.mergeModes() & Protocol::CreateItemCommand::GID && !item.remoteId().isEmpty()) {
460  mergeCondition = Query::Condition(Query::And);
461  mergeCondition.addValueCondition(PimItem::remoteIdColumn(), Query::Equals, item.remoteId());
462  mergeCondition.addValueCondition(PimItem::gidColumn(), Query::Equals, QLatin1String(""));
463  rootCondition.addCondition(mergeCondition);
464  }
465  qb.addCondition(rootCondition);
466 
467  if (!qb.exec()) {
468  return failureResponse("Failed to query database for item");
469  }
470 
471  const QVector<PimItem> result = qb.result();
472  if (result.isEmpty()) {
473  // No item with such GID/RID exists, so call ItemCreateHandler::insert() and behave
474  // like if this was a new item
475  if (!insertItem(cmd, item, parentCol)) {
476  return false;
477  }
478  if (!transaction.commit()) {
479  return failureResponse("Failed to commit transaction");
480  }
481  storageTrx.commit();
482 
483  } else if (result.count() == 1) {
484  // Item with matching GID/RID combination exists, so merge this item into it
485  // and send itemChanged()
486  PimItem existingItem = result.at(0);
487 
488  if (!mergeItem(cmd, item, existingItem, parentCol)) {
489  return false;
490  }
491  if (!transaction.commit()) {
492  return failureResponse("Failed to commit transaction");
493  }
494  storageTrx.commit();
495  } else {
496  qCWarning(AKONADISERVER_LOG) << "Multiple merge candidates, will attempt to recover:";
497  for (const PimItem &item : result) {
498  qCWarning(AKONADISERVER_LOG) << "\tID:" << item.id() << ", RID:" << item.remoteId()
499  << ", GID:" << item.gid()
500  << ", Collection:" << item.collection().name() << "(" << item.collectionId() << ")"
501  << ", Resource:" << item.collection().resource().name() << "(" << item.collection().resourceId() << ")";
502  }
503 
504  transaction.commit(); // commit the current transaction, before we attempt MMC recovery
505  recoverFromMultipleMergeCandidates(result, parentCol);
506 
507  // Even if the recovery was successful, indicate error to force the client to abort the
508  // sync, since we've interfered with the overall state.
509  return failureResponse(QStringLiteral("Multiple merge candidates in collection '%1', aborting").arg(item.collection().name()));
510  }
511  }
512 
513  return successResponse<Protocol::CreateItemResponse>();
514 }
void addValueCondition(const QString &column, Query::CompareOperator op, const QVariant &value, ConditionType type=WhereCondition)
Add a WHERE or HAVING condition which compares a column with a given value.
void addCondition(const Condition &condition)
Add a WHERE condition.
Definition: query.cpp:67
bool isValid() const
Returns whether the collection is valid.
Definition: collection.cpp:137
QString name() const
Returns the i18n&#39;ed name of the collection.
Definition: collection.cpp:225
void setForUpdate(bool forUpdate=true)
Indicate to the database to acquire an exclusive lock on the rows already during SELECT statement...
The handler interfaces describes an entity capable of handling an AkonadiIMAP command.
Definition: handler.h:48
Represents a collection of PIM items.
Definition: collection.h:76
QSet::iterator insert(const T &value)
bool contains(const T &value) const const
Helper class for DataStore transaction handling.
Definition: transaction.h:37
QVector< T > result()
Returns the result of this SELECT query.
KDEGAMES_EXPORT QAction * end(const QObject *recvr, const char *slot, QObject *parent)
bool contains(const T &value) const const
const T & at(int i) const const
Helper class for creating and executing database SELECT queries.
Id id() const
Returns the unique identifier of the collection.
Definition: collection.cpp:112
bool isEmpty() const const
Helper integration between Akonadi and Qt.
int count(const T &value) const const
bool isEmpty() const const
QMap::iterator insert(const Key &key, const T &value)
Represents a WHERE condition tree.
Definition: query.h:77
QString resource() const
Returns the identifier of the resource owning the collection.
Definition: collection.cpp:318
void result(KJob *job)
QString mimeType(Type)
QDateTime currentDateTimeUtc()
void addCondition(const Query::Condition &condition, ConditionType type=WhereCondition)
Add a WHERE condition.
bool isVirtual() const
Returns whether the collection is virtual, for example a search collection.
Definition: collection.cpp:364
bool exec()
Executes the query, returns true on success.
void addValueCondition(const QString &column, CompareOperator op, const QVariant &value)
Add a WHERE condition which compares a column with a given value.
Definition: query.cpp:25
This file is part of the KDE documentation.
Documentation copyright © 1996-2020 The KDE developers.
Generated on Fri Jun 5 2020 23:08:55 by doxygen 1.8.11 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.