7#include "itemcreatehandler.h"
10#include "connection.h"
11#include "handlerhelper.h"
12#include "itemfetchhelper.h"
13#include "preprocessormanager.h"
14#include "private/externalpartstorage_p.h"
15#include "storage/datastore.h"
16#include "storage/dbconfig.h"
17#include "storage/itemretrievalmanager.h"
18#include "storage/parthelper.h"
19#include "storage/partstreamer.h"
20#include "storage/parttypehelper.h"
21#include "storage/selectquerybuilder.h"
22#include "storage/transaction.h"
24#include "shared/akranges.h"
31using namespace Akonadi::Server;
32using namespace AkRanges;
34ItemCreateHandler::ItemCreateHandler(AkonadiServer &akonadi)
39bool ItemCreateHandler::buildPimItem(
const Protocol::CreateItemCommand &cmd, PimItem &item,
Collection &parentCol)
41 parentCol = HandlerHelper::collectionFromScope(cmd.collection(), connection()->context());
42 if (!parentCol.isValid()) {
43 return failureResponse(QStringLiteral(
"Invalid parent collection"));
45 if (parentCol.isVirtual()) {
46 return failureResponse(QStringLiteral(
"Cannot append item into virtual collection"));
49 MimeType
mimeType = MimeType::retrieveByNameOrCreate(cmd.mimeType());
51 return failureResponse(QStringLiteral(
"Unable to create mimetype '") % cmd.mimeType() % QStringLiteral(
"'."));
55 item.setSize(cmd.itemSize());
57 item.setCollectionId(parentCol.id());
58 item.setDatetime(cmd.dateTime());
59 if (cmd.remoteId().isEmpty()) {
64 item.setRemoteId(cmd.remoteId());
67 item.setRemoteRevision(cmd.remoteRevision());
68 item.setGid(cmd.gid());
75bool ItemCreateHandler::insertItem(
const Protocol::CreateItemCommand &cmd, PimItem &item,
const Collection &parentCol)
77 if (!item.datetime().isValid()) {
82 return failureResponse(QStringLiteral(
"Failed to append item"));
86 const QSet<QByteArray> flags = cmd.mergeModes() == Protocol::CreateItemCommand::None ? cmd.flags() : cmd.addedFlags();
90 bool flagsChanged =
false;
91 if (!storageBackend()->appendItemsFlags({item}, flagList, &flagsChanged,
false, parentCol,
true)) {
92 return failureResponse(
"Unable to append item flags.");
96 const Scope tags = cmd.mergeModes() == Protocol::CreateItemCommand::None ? cmd.tags() : cmd.addedTags();
97 if (!tags.isEmpty()) {
98 const Tag::List tagList = HandlerHelper::tagsFromScope(tags, connection()->context());
99 bool tagsChanged =
false;
100 if (!storageBackend()->appendItemsTags({item}, tagList, &tagsChanged,
false, parentCol,
true)) {
101 return failureResponse(QStringLiteral(
"Unable to append item tags."));
106 qint64 partSizes = 0;
107 PartStreamer streamer(connection(), item);
108 const auto parts = cmd.parts();
109 for (
const QByteArray &partName : parts) {
112 streamer.stream(
true, partName, partSize);
113 }
catch (
const PartStreamerException &e) {
114 return failureResponse(e.what());
116 partSizes += partSize;
118 const Protocol::Attributes attrs = cmd.attributes();
119 for (
auto iter = attrs.cbegin(), end = attrs.cend(); iter != end; ++iter) {
121 streamer.streamAttribute(
true, iter.key(), iter.value());
122 }
catch (
const PartStreamerException &e) {
123 return failureResponse(e.what());
128 if (partSizes > item.size()) {
129 item.setSize(partSizes);
134 if (akonadi().preprocessorManager().isActive()) {
135 Part hiddenAttribute;
136 hiddenAttribute.setPimItemId(item.id());
138 hiddenAttribute.setData(QByteArray());
139 hiddenAttribute.setDatasize(0);
144 const bool seen = flags.
contains(AKONADI_FLAG_SEEN) || flags.
contains(AKONADI_FLAG_IGNORED);
145 notify(item, seen, item.collection());
146 sendResponse(item, Protocol::CreateItemCommand::None);
151bool ItemCreateHandler::mergeItem(
const Protocol::CreateItemCommand &cmd, PimItem &newItem, PimItem ¤tItem,
const Collection &parentCol)
153 bool needsUpdate =
false;
154 bool ignoreFlagsChanges =
false;
155 QSet<QByteArray> changedParts;
157 if (currentItem.atime() > newItem.atime()) {
158 qCDebug(AKONADISERVER_LOG) <<
"Akoandi has newer atime of Item " << currentItem.id() <<
" than the resource (local atime =" << currentItem.atime()
159 <<
", remote atime =" << newItem.atime() <<
"), ignoring flags changes.";
164 ignoreFlagsChanges =
true;
167 if (!newItem.remoteId().isEmpty() && currentItem.remoteId() != newItem.remoteId()) {
168 currentItem.setRemoteId(newItem.remoteId());
169 changedParts.
insert(AKONADI_PARAM_REMOTEID);
172 if (!newItem.remoteRevision().isEmpty() && currentItem.remoteRevision() != newItem.remoteRevision()) {
173 currentItem.setRemoteRevision(newItem.remoteRevision());
174 changedParts.
insert(AKONADI_PARAM_REMOTEREVISION);
177 if (!newItem.gid().isEmpty() && currentItem.gid() != newItem.gid()) {
178 currentItem.setGid(newItem.gid());
179 changedParts.
insert(AKONADI_PARAM_GID);
182 if (newItem.datetime().isValid() && newItem.datetime() != currentItem.datetime()) {
183 currentItem.setDatetime(newItem.datetime());
187 if (newItem.size() > 0 && newItem.size() != currentItem.size()) {
188 currentItem.setSize(newItem.size());
192 const Collection col = Collection::retrieveById(parentCol.id());
193 if (cmd.flags().isEmpty() && !cmd.flagsOverwritten()) {
194 bool flagsAdded =
false;
195 bool flagsRemoved =
false;
196 if (!cmd.addedFlags().isEmpty()) {
198 storageBackend()->appendItemsFlags({currentItem}, addedFlags, &flagsAdded,
true, col,
true);
200 if (!cmd.removedFlags().isEmpty()) {
202 storageBackend()->removeItemsFlags({currentItem}, removedFlags, &flagsRemoved, col,
true);
204 if (flagsAdded || flagsRemoved) {
205 changedParts.
insert(AKONADI_PARAM_FLAGS);
208 }
else if (!ignoreFlagsChanges) {
209 bool flagsChanged =
false;
210 QSet<QByteArray> flagNames = cmd.flags();
212 static QList<QByteArray> localFlagsToPreserve = {
"$ATTACHMENT",
"$INVITATION",
"$ENCRYPTED",
"$SIGNED",
"$WATCHED"};
218 const Flag::List currentFlags = currentItem.flags();
219 for (
const Flag ¤tFlag : currentFlags) {
220 const QByteArray currentFlagName = currentFlag.name().toLatin1();
221 if (localFlagsToPreserve.
contains(currentFlagName)) {
222 flagNames.
insert(currentFlagName);
226 storageBackend()->setItemsFlags({currentItem}, ¤tFlags, flags, &flagsChanged, col,
true);
228 changedParts.
insert(AKONADI_PARAM_FLAGS);
233 if (cmd.tags().isEmpty()) {
234 bool tagsAdded =
false;
235 bool tagsRemoved =
false;
236 if (!cmd.addedTags().isEmpty()) {
237 const auto addedTags = HandlerHelper::tagsFromScope(cmd.addedTags(), connection()->context());
238 storageBackend()->appendItemsTags({currentItem}, addedTags, &tagsAdded,
true, col,
true);
240 if (!cmd.removedTags().isEmpty()) {
241 const Tag::List removedTags = HandlerHelper::tagsFromScope(cmd.removedTags(), connection()->context());
242 storageBackend()->removeItemsTags({currentItem}, removedTags, &tagsRemoved,
true);
245 if (tagsAdded || tagsRemoved) {
246 changedParts.
insert(AKONADI_PARAM_TAGS);
250 bool tagsChanged =
false;
251 const auto tags = HandlerHelper::tagsFromScope(cmd.tags(), connection()->context());
252 storageBackend()->setItemsTags({currentItem}, tags, &tagsChanged,
true);
254 changedParts.
insert(AKONADI_PARAM_TAGS);
259 const Part::List existingParts = Part::retrieveFiltered(Part::pimItemIdColumn(), currentItem.id());
260 QMap<QByteArray, qint64> partsSizes;
261 for (
const Part &part : existingParts) {
265 PartStreamer streamer(connection(), currentItem);
266 const auto partNames = cmd.parts();
267 for (
const QByteArray &partName : partNames) {
268 bool changed =
false;
271 streamer.stream(
true, partName, partSize, &changed);
272 }
catch (
const PartStreamerException &e) {
273 return failureResponse(e.what());
277 changedParts.
insert(partName);
278 partsSizes.
insert(partName, partSize);
283 const qint64 size = std::accumulate(partsSizes.
begin(), partsSizes.
end(), 0LL);
284 if (size > currentItem.size()) {
285 currentItem.setSize(size);
290 currentItem.setRev(qMax(newItem.rev(), currentItem.rev()) + 1);
293 currentItem.setDirty(!connection()->context().resource().
isValid());
296 if (!currentItem.update()) {
297 return failureResponse(
"Failed to store merged item");
300 notify(currentItem, currentItem.collection(), changedParts);
303 sendResponse(currentItem, cmd.mergeModes());
308bool ItemCreateHandler::sendResponse(
const PimItem &item, Protocol::CreateItemCommand::MergeModes mergeModes)
310 if (mergeModes & Protocol::CreateItemCommand::Silent || mergeModes & Protocol::CreateItemCommand::None) {
311 Protocol::FetchItemsResponse resp;
312 resp.setId(item.id());
313 resp.setMTime(item.datetime());
314 Handler::sendResponse(std::move(resp));
318 Protocol::ItemFetchScope fetchScope;
319 fetchScope.setAncestorDepth(Protocol::ItemFetchScope::ParentAncestor);
320 fetchScope.setFetch(Protocol::ItemFetchScope::AllAttributes | Protocol::ItemFetchScope::FullPayload | Protocol::ItemFetchScope::CacheOnly
321 | Protocol::ItemFetchScope::Flags | Protocol::ItemFetchScope::GID | Protocol::ItemFetchScope::MTime | Protocol::ItemFetchScope::RemoteID
322 | Protocol::ItemFetchScope::RemoteRevision | Protocol::ItemFetchScope::Size | Protocol::ItemFetchScope::Tags);
323 ItemFetchHelper fetchHelper(connection(), Scope{item.id()}, fetchScope, Protocol::TagFetchScope{}, akonadi());
324 if (!fetchHelper.fetchItems()) {
325 return failureResponse(
"Failed to retrieve item");
331bool ItemCreateHandler::notify(
const PimItem &item,
bool seen,
const Collection &collection)
333 storageBackend()->notificationCollector()->itemAdded(item, seen, collection);
335 if (akonadi().preprocessorManager().isActive()) {
337 akonadi().preprocessorManager().beginHandleItem(item, storageBackend());
342bool ItemCreateHandler::notify(
const PimItem &item,
const Collection &collection,
const QSet<QByteArray> &changedParts)
345 storageBackend()->notificationCollector()->itemChanged(item, changedParts, collection);
350void ItemCreateHandler::recoverFromMultipleMergeCandidates(
const PimItem::List &items,
const Collection &collection)
358 int transactionDepth = 0;
359 while (storageBackend()->inTransaction()) {
361 storageBackend()->commitTransaction();
363 const auto restoreTransaction = qScopeGuard([&]() {
364 for (
int i = 0; i < transactionDepth; ++i) {
365 storageBackend()->beginTransaction(QStringLiteral(
"RestoredTransactionAfterMMCRecovery"));
369 Transaction transaction(storageBackend(), QStringLiteral(
"MMC Recovery Transaction"));
375 if (items | Actions::any([](
const auto &item) {
376 return item.dirty() || item.remoteId().isEmpty();
378 qCWarning(AKONADISERVER_LOG) <<
"Automatic multiple merge candidates recovery failed: at least one of the candidates has uncommitted changes!";
383 if (items | Actions::any([collection](
const auto &item) {
384 return item.collectionId() != collection.id();
386 qCWarning(AKONADISERVER_LOG) <<
"Automatic multiple merge candidates recovery failed: all candidates do not belong to the same collection.";
390 storageBackend()->cleanupPimItems(items, DataStore::Silent);
391 if (!transaction.commit()) {
392 qCWarning(AKONADISERVER_LOG) <<
"Automatic multiple merge candidates recovery failed: failed to commit database transaction.";
397 akonadi().itemRetrievalManager().triggerCollectionSync(collection.resource().name(), collection.id());
399 qCInfo(AKONADISERVER_LOG) <<
"Automatic multiple merge candidates recovery successful: conflicting items"
400 << (items | Views::transform([](
const auto &i) {
403 | Actions::toQVector)
404 <<
"in collection" << collection.name() <<
"(ID:" << collection.id()
405 <<
") were removed and a new sync was scheduled in the resource" << collection.resource().name();
410 const auto &cmd = Protocol::cmdCast<Protocol::CreateItemCommand>(m_command);
416 Transaction transaction(storageBackend(), QStringLiteral(
"ItemCreateHandler"));
417 ExternalPartStorageTransaction storageTrx;
421 if (!buildPimItem(cmd, item, parentCol)) {
425 if ((cmd.mergeModes() & ~Protocol::CreateItemCommand::Silent) == 0) {
426 if (!insertItem(cmd, item, parentCol)) {
429 if (!transaction.
commit()) {
430 return failureResponse(QStringLiteral(
"Failed to commit transaction"));
437 qb.
addValueCondition(PimItem::collectionIdColumn(), Query::Equals, parentCol.id());
441 if (cmd.mergeModes() & Protocol::CreateItemCommand::GID) {
442 mergeCondition.
addValueCondition(PimItem::gidColumn(), Query::Equals, item.gid());
444 if (cmd.mergeModes() & Protocol::CreateItemCommand::RemoteID) {
445 mergeCondition.
addValueCondition(PimItem::remoteIdColumn(), Query::Equals, item.remoteId());
451 if (cmd.mergeModes() & Protocol::CreateItemCommand::GID && !item.remoteId().isEmpty()) {
453 mergeCondition.
addValueCondition(PimItem::remoteIdColumn(), Query::Equals, item.remoteId());
460 return failureResponse(
"Failed to query database for item");
467 if (!insertItem(cmd, item, parentCol)) {
470 if (!transaction.
commit()) {
471 return failureResponse(
"Failed to commit transaction");
475 }
else if (result.
count() == 1) {
478 PimItem existingItem = result.
at(0);
480 if (!mergeItem(cmd, item, existingItem, parentCol)) {
483 if (!transaction.
commit()) {
484 return failureResponse(
"Failed to commit transaction");
488 qCWarning(AKONADISERVER_LOG) <<
"Multiple merge candidates, will attempt to recover:";
489 for (
const PimItem &item : result) {
490 qCWarning(AKONADISERVER_LOG) <<
"\tID:" << item.id() <<
", RID:" << item.remoteId() <<
", GID:" << item.gid()
491 <<
", Collection:" << item.collection().name() <<
"(" << item.collectionId() <<
")"
492 <<
", Resource:" << item.collection().resource().name() <<
"(" << item.collection().resourceId() <<
")";
496 recoverFromMultipleMergeCandidates(result, parentCol);
500 return failureResponse(QStringLiteral(
"Multiple merge candidates in collection '%1', aborting").arg(item.collection().name()));
504 return successResponse<Protocol::CreateItemResponse>();
Represents a collection of PIM items.
static Flag::List resolveFlags(const QSet< QByteArray > &flagNames)
Converts a bytearray list of flag names into flag records.
The handler interfaces describes an entity capable of handling an AkonadiIMAP command.
bool parseStream() override
Parse and handle the IMAP message using the streaming parser.
void addValueCondition(const QString &column, Query::CompareOperator op, const QVariant &value, ConditionType type=WhereCondition)
Add a WHERE or HAVING condition which compares a column with a given value.
bool exec()
Executes the query, returns true on success.
void addCondition(const Query::Condition &condition, ConditionType type=WhereCondition)
Add a WHERE condition.
void setForUpdate(bool forUpdate=true)
Indicate to the database to acquire an exclusive lock on the rows already during SELECT statement.
Represents a WHERE condition tree.
void addValueCondition(const QString &column, CompareOperator op, const QVariant &value)
Add a WHERE condition which compares a column with a given value.
void addCondition(const Condition &condition)
Add a WHERE condition.
Helper class for creating and executing database SELECT queries.
QList< T > result()
Returns the result of this SELECT query.
Helper class for DataStore transaction handling.
bool commit()
Commits the transaction.
bool insert(Part *part, qint64 *insertId=nullptr)
Adds a new part to the database and if necessary to the filesystem.
PartType fromFqName(const QString &fqName)
Retrieve (or create) PartType for the given fully qualified name.
QString fullName(const PartType &type)
Returns full part name.
Helper integration between Akonadi and Qt.
KCALUTILS_EXPORT QString mimeType()
bool isValid(QStringView ifopt)
QDateTime currentDateTimeUtc()
const_reference at(qsizetype i) const const
bool contains(const AT &value) const const
qsizetype count() const const
bool isEmpty() const const
iterator insert(const Key &key, const T &value)
bool contains(const QSet< T > &other) const const
iterator insert(const T &value)
bool isEmpty() const const
QByteArray toLatin1() const const