7#include "itemcreatehandler.h" 
   10#include "connection.h" 
   11#include "handlerhelper.h" 
   12#include "itemfetchhelper.h" 
   13#include "preprocessormanager.h" 
   14#include "private/externalpartstorage_p.h" 
   15#include "storage/datastore.h" 
   16#include "storage/dbconfig.h" 
   17#include "storage/itemretrievalmanager.h" 
   18#include "storage/parthelper.h" 
   19#include "storage/partstreamer.h" 
   20#include "storage/parttypehelper.h" 
   21#include "storage/selectquerybuilder.h" 
   22#include "storage/transaction.h" 
   24#include "shared/akranges.h" 
   31using namespace Akonadi::Server;
 
   32using namespace AkRanges;
 
   34ItemCreateHandler::ItemCreateHandler(AkonadiServer &akonadi)
 
   39bool ItemCreateHandler::buildPimItem(
const Protocol::CreateItemCommand &cmd, PimItem &item, 
Collection &parentCol)
 
   41    parentCol = HandlerHelper::collectionFromScope(cmd.collection(), connection()->context());
 
   42    if (!parentCol.isValid()) {
 
   43        return failureResponse(QStringLiteral(
"Invalid parent collection"));
 
   45    if (parentCol.isVirtual()) {
 
   46        return failureResponse(QStringLiteral(
"Cannot append item into virtual collection"));
 
   49    MimeType 
mimeType = MimeType::retrieveByNameOrCreate(cmd.mimeType());
 
   51        return failureResponse(QStringLiteral(
"Unable to create mimetype '") % cmd.mimeType() % QStringLiteral(
"'."));
 
   55    item.setSize(cmd.itemSize());
 
   57    item.setCollectionId(parentCol.id());
 
   58    item.setDatetime(cmd.dateTime());
 
   59    if (cmd.remoteId().isEmpty()) {
 
   64        item.setRemoteId(cmd.remoteId());
 
   67    item.setRemoteRevision(cmd.remoteRevision());
 
   68    item.setGid(cmd.gid());
 
   75bool ItemCreateHandler::insertItem(
const Protocol::CreateItemCommand &cmd, PimItem &item, 
const Collection &parentCol)
 
   77    if (!item.datetime().isValid()) {
 
   82        return failureResponse(QStringLiteral(
"Failed to append item"));
 
   86    const QSet<QByteArray> flags = cmd.mergeModes() == Protocol::CreateItemCommand::None ? cmd.flags() : cmd.addedFlags();
 
   90        bool flagsChanged = 
false;
 
   91        if (!storageBackend()->appendItemsFlags({item}, flagList, &flagsChanged, 
false, parentCol, 
true)) {
 
   92            return failureResponse(
"Unable to append item flags.");
 
   96    const Scope tags = cmd.mergeModes() == Protocol::CreateItemCommand::None ? cmd.tags() : cmd.addedTags();
 
   97    if (!tags.isEmpty()) {
 
   98        const Tag::List tagList = HandlerHelper::tagsFromScope(tags, connection()->context());
 
   99        bool tagsChanged = 
false;
 
  100        if (!storageBackend()->appendItemsTags({item}, tagList, &tagsChanged, 
false, parentCol, 
true)) {
 
  101            return failureResponse(QStringLiteral(
"Unable to append item tags."));
 
  106    qint64 partSizes = 0;
 
  107    PartStreamer streamer(connection(), item);
 
  108    const auto parts = cmd.parts();
 
  109    for (
const QByteArray &partName : parts) {
 
  112            streamer.stream(
true, partName, partSize);
 
  113        } 
catch (
const PartStreamerException &e) {
 
  114            return failureResponse(e.what());
 
  116        partSizes += partSize;
 
  118    const Protocol::Attributes attrs = cmd.attributes();
 
  119    for (
auto iter = attrs.cbegin(), end = attrs.cend(); iter != end; ++iter) {
 
  121            streamer.streamAttribute(
true, iter.key(), iter.value());
 
  122        } 
catch (
const PartStreamerException &e) {
 
  123            return failureResponse(e.what());
 
  128    if (partSizes > item.size()) {
 
  129        item.setSize(partSizes);
 
  134    if (akonadi().preprocessorManager().isActive()) {
 
  135        Part hiddenAttribute;
 
  136        hiddenAttribute.setPimItemId(item.id());
 
  138        hiddenAttribute.setData(QByteArray());
 
  139        hiddenAttribute.setDatasize(0);
 
  144    const bool seen = flags.
contains(AKONADI_FLAG_SEEN) || flags.
contains(AKONADI_FLAG_IGNORED);
 
  145    notify(item, seen, item.collection());
 
  146    sendResponse(item, Protocol::CreateItemCommand::None);
 
  151bool ItemCreateHandler::mergeItem(
const Protocol::CreateItemCommand &cmd, PimItem &newItem, PimItem ¤tItem, 
const Collection &parentCol)
 
  153    bool needsUpdate = 
false;
 
  154    bool ignoreFlagsChanges = 
false;
 
  155    QSet<QByteArray> changedParts;
 
  157    if (currentItem.atime() > newItem.atime()) {
 
  158        qCDebug(AKONADISERVER_LOG) << 
"Akoandi has newer atime of Item " << currentItem.id() << 
" than the resource (local atime =" << currentItem.atime()
 
  159                                   << 
", remote atime =" << newItem.atime() << 
"), ignoring flags changes.";
 
  164        ignoreFlagsChanges = 
true;
 
  167    if (!newItem.remoteId().isEmpty() && currentItem.remoteId() != newItem.remoteId()) {
 
  168        currentItem.setRemoteId(newItem.remoteId());
 
  169        changedParts.
insert(AKONADI_PARAM_REMOTEID);
 
  172    if (!newItem.remoteRevision().isEmpty() && currentItem.remoteRevision() != newItem.remoteRevision()) {
 
  173        currentItem.setRemoteRevision(newItem.remoteRevision());
 
  174        changedParts.
insert(AKONADI_PARAM_REMOTEREVISION);
 
  177    if (!newItem.gid().isEmpty() && currentItem.gid() != newItem.gid()) {
 
  178        currentItem.setGid(newItem.gid());
 
  179        changedParts.
insert(AKONADI_PARAM_GID);
 
  182    if (newItem.datetime().isValid() && newItem.datetime() != currentItem.datetime()) {
 
  183        currentItem.setDatetime(newItem.datetime());
 
  187    if (newItem.size() > 0 && newItem.size() != currentItem.size()) {
 
  188        currentItem.setSize(newItem.size());
 
  192    const Collection col = Collection::retrieveById(parentCol.id());
 
  193    if (cmd.flags().isEmpty() && !cmd.flagsOverwritten()) {
 
  194        bool flagsAdded = 
false;
 
  195        bool flagsRemoved = 
false;
 
  196        if (!cmd.addedFlags().isEmpty()) {
 
  198            storageBackend()->appendItemsFlags({currentItem}, addedFlags, &flagsAdded, 
true, col, 
true);
 
  200        if (!cmd.removedFlags().isEmpty()) {
 
  202            storageBackend()->removeItemsFlags({currentItem}, removedFlags, &flagsRemoved, col, 
true);
 
  204        if (flagsAdded || flagsRemoved) {
 
  205            changedParts.
insert(AKONADI_PARAM_FLAGS);
 
  208    } 
else if (!ignoreFlagsChanges) {
 
  209        bool flagsChanged = 
false;
 
  210        QSet<QByteArray> flagNames = cmd.flags();
 
  212        static QList<QByteArray> localFlagsToPreserve = {
"$ATTACHMENT", 
"$INVITATION", 
"$ENCRYPTED", 
"$SIGNED", 
"$WATCHED"};
 
  218        const Flag::List currentFlags = currentItem.flags();
 
  219        for (
const Flag ¤tFlag : currentFlags) {
 
  220            const QByteArray currentFlagName = currentFlag.name().toLatin1();
 
  221            if (localFlagsToPreserve.
contains(currentFlagName)) {
 
  222                flagNames.
insert(currentFlagName);
 
  226        storageBackend()->setItemsFlags({currentItem}, ¤tFlags, flags, &flagsChanged, col, 
true);
 
  228            changedParts.
insert(AKONADI_PARAM_FLAGS);
 
  233    if (cmd.tags().isEmpty()) {
 
  234        bool tagsAdded = 
false;
 
  235        bool tagsRemoved = 
false;
 
  236        if (!cmd.addedTags().isEmpty()) {
 
  237            const auto addedTags = HandlerHelper::tagsFromScope(cmd.addedTags(), connection()->context());
 
  238            storageBackend()->appendItemsTags({currentItem}, addedTags, &tagsAdded, 
true, col, 
true);
 
  240        if (!cmd.removedTags().isEmpty()) {
 
  241            const Tag::List removedTags = HandlerHelper::tagsFromScope(cmd.removedTags(), connection()->context());
 
  242            storageBackend()->removeItemsTags({currentItem}, removedTags, &tagsRemoved, 
true);
 
  245        if (tagsAdded || tagsRemoved) {
 
  246            changedParts.
insert(AKONADI_PARAM_TAGS);
 
  250        bool tagsChanged = 
false;
 
  251        const auto tags = HandlerHelper::tagsFromScope(cmd.tags(), connection()->context());
 
  252        storageBackend()->setItemsTags({currentItem}, tags, &tagsChanged, 
true);
 
  254            changedParts.
insert(AKONADI_PARAM_TAGS);
 
  259    const Part::List existingParts = Part::retrieveFiltered(Part::pimItemIdColumn(), currentItem.id());
 
  260    QMap<QByteArray, qint64> partsSizes;
 
  261    for (
const Part &part : existingParts) {
 
  265    PartStreamer streamer(connection(), currentItem);
 
  266    const auto partNames = cmd.parts();
 
  267    for (
const QByteArray &partName : partNames) {
 
  268        bool changed = 
false;
 
  271            streamer.stream(
true, partName, partSize, &changed);
 
  272        } 
catch (
const PartStreamerException &e) {
 
  273            return failureResponse(e.what());
 
  277            changedParts.
insert(partName);
 
  278            partsSizes.
insert(partName, partSize);
 
  283    const qint64 size = std::accumulate(partsSizes.
begin(), partsSizes.
end(), 0LL);
 
  284    if (size > currentItem.size()) {
 
  285        currentItem.setSize(size);
 
  290        currentItem.setRev(qMax(newItem.rev(), currentItem.rev()) + 1);
 
  293        currentItem.setDirty(!connection()->context().resource().
isValid());
 
  296        if (!currentItem.update()) {
 
  297            return failureResponse(
"Failed to store merged item");
 
  300        notify(currentItem, currentItem.collection(), changedParts);
 
  303    sendResponse(currentItem, cmd.mergeModes());
 
  308bool ItemCreateHandler::sendResponse(
const PimItem &item, Protocol::CreateItemCommand::MergeModes mergeModes)
 
  310    if (mergeModes & Protocol::CreateItemCommand::Silent || mergeModes & Protocol::CreateItemCommand::None) {
 
  311        Protocol::FetchItemsResponse resp;
 
  312        resp.setId(item.id());
 
  313        resp.setMTime(item.datetime());
 
  314        Handler::sendResponse(std::move(resp));
 
  318    Protocol::ItemFetchScope fetchScope;
 
  319    fetchScope.setAncestorDepth(Protocol::ItemFetchScope::ParentAncestor);
 
  320    fetchScope.setFetch(Protocol::ItemFetchScope::AllAttributes | Protocol::ItemFetchScope::FullPayload | Protocol::ItemFetchScope::CacheOnly
 
  321                        | Protocol::ItemFetchScope::Flags | Protocol::ItemFetchScope::GID | Protocol::ItemFetchScope::MTime | Protocol::ItemFetchScope::RemoteID
 
  322                        | Protocol::ItemFetchScope::RemoteRevision | Protocol::ItemFetchScope::Size | Protocol::ItemFetchScope::Tags);
 
  323    ItemFetchHelper fetchHelper(connection(), Scope{item.id()}, fetchScope, Protocol::TagFetchScope{}, akonadi());
 
  324    if (!fetchHelper.fetchItems()) {
 
  325        return failureResponse(
"Failed to retrieve item");
 
  331bool ItemCreateHandler::notify(
const PimItem &item, 
bool seen, 
const Collection &collection)
 
  333    storageBackend()->notificationCollector()->itemAdded(item, seen, collection);
 
  335    if (akonadi().preprocessorManager().isActive()) {
 
  337        akonadi().preprocessorManager().beginHandleItem(item, storageBackend());
 
  342bool ItemCreateHandler::notify(
const PimItem &item, 
const Collection &collection, 
const QSet<QByteArray> &changedParts)
 
  345        storageBackend()->notificationCollector()->itemChanged(item, changedParts, collection);
 
  350void ItemCreateHandler::recoverFromMultipleMergeCandidates(
const PimItem::List &items, 
const Collection &collection)
 
  358    int transactionDepth = 0;
 
  359    while (storageBackend()->inTransaction()) {
 
  361        storageBackend()->commitTransaction();
 
  363    const auto restoreTransaction = qScopeGuard([&]() {
 
  364        for (
int i = 0; i < transactionDepth; ++i) {
 
  365            storageBackend()->beginTransaction(QStringLiteral(
"RestoredTransactionAfterMMCRecovery"));
 
  369    Transaction transaction(storageBackend(), QStringLiteral(
"MMC Recovery Transaction"));
 
  375    if (items | Actions::any([](
const auto &item) {
 
  376            return item.dirty() || item.remoteId().isEmpty();
 
  378        qCWarning(AKONADISERVER_LOG) << 
"Automatic multiple merge candidates recovery failed: at least one of the candidates has uncommitted changes!";
 
  383    if (items | Actions::any([collection](
const auto &item) {
 
  384            return item.collectionId() != collection.id();
 
  386        qCWarning(AKONADISERVER_LOG) << 
"Automatic multiple merge candidates recovery failed: all candidates do not belong to the same collection.";
 
  390    storageBackend()->cleanupPimItems(items, DataStore::Silent);
 
  391    if (!transaction.commit()) {
 
  392        qCWarning(AKONADISERVER_LOG) << 
"Automatic multiple merge candidates recovery failed: failed to commit database transaction.";
 
  397    akonadi().itemRetrievalManager().triggerCollectionSync(collection.resource().name(), collection.id());
 
  399    qCInfo(AKONADISERVER_LOG) << 
"Automatic multiple merge candidates recovery successful: conflicting items" 
  400                              << (items | Views::transform([](
const auto &i) {
 
  403                                  | Actions::toQVector)
 
  404                              << 
"in collection" << collection.name() << 
"(ID:" << collection.id()
 
  405                              << 
") were removed and a new sync was scheduled in the resource" << collection.resource().name();
 
  410    const auto &cmd = Protocol::cmdCast<Protocol::CreateItemCommand>(m_command);
 
  416    Transaction transaction(storageBackend(), QStringLiteral(
"ItemCreateHandler"));
 
  417    ExternalPartStorageTransaction storageTrx;
 
  421    if (!buildPimItem(cmd, item, parentCol)) {
 
  425    if ((cmd.mergeModes() & ~Protocol::CreateItemCommand::Silent) == 0) {
 
  426        if (!insertItem(cmd, item, parentCol)) {
 
  429        if (!transaction.
commit()) {
 
  430            return failureResponse(QStringLiteral(
"Failed to commit transaction"));
 
  437        qb.
addValueCondition(PimItem::collectionIdColumn(), Query::Equals, parentCol.id());
 
  441        if (cmd.mergeModes() & Protocol::CreateItemCommand::GID) {
 
  442            mergeCondition.
addValueCondition(PimItem::gidColumn(), Query::Equals, item.gid());
 
  444        if (cmd.mergeModes() & Protocol::CreateItemCommand::RemoteID) {
 
  445            mergeCondition.
addValueCondition(PimItem::remoteIdColumn(), Query::Equals, item.remoteId());
 
  451        if (cmd.mergeModes() & Protocol::CreateItemCommand::GID && !item.remoteId().isEmpty()) {
 
  453            mergeCondition.
addValueCondition(PimItem::remoteIdColumn(), Query::Equals, item.remoteId());
 
  460            return failureResponse(
"Failed to query database for item");
 
  467            if (!insertItem(cmd, item, parentCol)) {
 
  470            if (!transaction.
commit()) {
 
  471                return failureResponse(
"Failed to commit transaction");
 
  475        } 
else if (result.
count() == 1) {
 
  478            PimItem existingItem = result.
at(0);
 
  480            if (!mergeItem(cmd, item, existingItem, parentCol)) {
 
  483            if (!transaction.
commit()) {
 
  484                return failureResponse(
"Failed to commit transaction");
 
  488            qCWarning(AKONADISERVER_LOG) << 
"Multiple merge candidates, will attempt to recover:";
 
  489            for (
const PimItem &item : result) {
 
  490                qCWarning(AKONADISERVER_LOG) << 
"\tID:" << item.id() << 
", RID:" << item.remoteId() << 
", GID:" << item.gid()
 
  491                                             << 
", Collection:" << item.collection().name() << 
"(" << item.collectionId() << 
")" 
  492                                             << 
", Resource:" << item.collection().resource().name() << 
"(" << item.collection().resourceId() << 
")";
 
  496            recoverFromMultipleMergeCandidates(result, parentCol);
 
  500            return failureResponse(QStringLiteral(
"Multiple merge candidates in collection '%1', aborting").arg(item.collection().name()));
 
  504    return successResponse<Protocol::CreateItemResponse>();
 
 
Represents a collection of PIM items.
 
static Flag::List resolveFlags(const QSet< QByteArray > &flagNames)
Converts a bytearray list of flag names into flag records.
 
The handler interfaces describes an entity capable of handling an AkonadiIMAP command.
 
bool parseStream() override
Parse and handle the IMAP message using the streaming parser.
 
void addValueCondition(const QString &column, Query::CompareOperator op, const QVariant &value, ConditionType type=WhereCondition)
Add a WHERE or HAVING condition which compares a column with a given value.
 
bool exec()
Executes the query, returns true on success.
 
void addCondition(const Query::Condition &condition, ConditionType type=WhereCondition)
Add a WHERE condition.
 
void setForUpdate(bool forUpdate=true)
Indicate to the database to acquire an exclusive lock on the rows already during SELECT statement.
 
Represents a WHERE condition tree.
 
void addValueCondition(const QString &column, CompareOperator op, const QVariant &value)
Add a WHERE condition which compares a column with a given value.
 
void addCondition(const Condition &condition)
Add a WHERE condition.
 
Helper class for creating and executing database SELECT queries.
 
QList< T > result()
Returns the result of this SELECT query.
 
Helper class for DataStore transaction handling.
 
bool commit()
Commits the transaction.
 
bool insert(Part *part, qint64 *insertId=nullptr)
Adds a new part to the database and if necessary to the filesystem.
 
PartType fromFqName(const QString &fqName)
Retrieve (or create) PartType for the given fully qualified name.
 
QString fullName(const PartType &type)
Returns full part name.
 
Helper integration between Akonadi and Qt.
 
KCALUTILS_EXPORT QString mimeType()
 
bool isValid(QStringView ifopt)
 
QDateTime currentDateTimeUtc()
 
const_reference at(qsizetype i) const const
 
bool contains(const AT &value) const const
 
qsizetype count() const const
 
bool isEmpty() const const
 
iterator insert(const Key &key, const T &value)
 
bool contains(const QSet< T > &other) const const
 
iterator insert(const T &value)
 
bool isEmpty() const const
 
QByteArray toLatin1() const const