7#include "notificationsubscriber.h" 
    8#include "aggregatedfetchscope.h" 
    9#include "akonadiserver_debug.h" 
   10#include "notificationmanager.h" 
   13#include <QLocalSocket> 
   16#include "private/datastream_p_p.h" 
   17#include "private/protocol_exception_p.h" 
   18#include "shared/akranges.h" 
   21using namespace Akonadi::Server;
 
   22using namespace AkRanges;
 
   30    , mAllMonitored(false)
 
   32    , mNotificationDebugging(false)
 
   35        mManager->itemFetchScope()->addSubscriber();
 
   36        mManager->collectionFetchScope()->addSubscriber();
 
   37        mManager->tagFetchScope()->addSubscriber();
 
   41NotificationSubscriber::NotificationSubscriber(
NotificationManager *manager, quintptr socketDescriptor)
 
   42    : NotificationSubscriber(manager)
 
   47    mSocket->setSocketDescriptor(socketDescriptor);
 
   49    const SchemaVersion schema = SchemaVersion::retrieveAll().at(0);
 
   51    auto hello = Protocol::HelloResponsePtr::create();
 
   52    hello->setServerName(QStringLiteral(
"Akonadi"));
 
   53    hello->setMessage(QStringLiteral(
"Not really IMAP server"));
 
   54    hello->setProtocolVersion(Protocol::version());
 
   55    hello->setGeneration(schema.generation());
 
   56    writeCommand(0, hello);
 
   59NotificationSubscriber::~NotificationSubscriber()
 
   61    QMutexLocker locker(&mLock);
 
   63    if (mNotificationDebugging) {
 
   64        Q_EMIT notificationDebuggingChanged(
false);
 
   68void NotificationSubscriber::handleIncomingData()
 
   70    while (mSocket->bytesAvailable() > 
static_cast<int>(
sizeof(qint64))) {
 
   71        Protocol::DataStream stream(mSocket);
 
   77        Protocol::CommandPtr cmd;
 
   79            cmd = Protocol::deserialize(mSocket);
 
   80        } 
catch (
const Akonadi::ProtocolException &e) {
 
   81            qCWarning(AKONADISERVER_LOG) << 
"ProtocolException while reading from notification bus for" << mSubscriber << 
":" << e.what();
 
   82            disconnectSubscriber();
 
   84        } 
catch (
const std::exception &e) {
 
   85            qCWarning(AKONADISERVER_LOG) << 
"Unknown exception while reading from notification bus for" << mSubscriber << 
":" << e.what();
 
   86            disconnectSubscriber();
 
   89        if (cmd->type() == Protocol::Command::Invalid) {
 
   90            qCWarning(AKONADISERVER_LOG) << 
"Invalid command while reading from notification bus for " << mSubscriber << 
", resetting connection";
 
   91            disconnectSubscriber();
 
   95        switch (cmd->type()) {
 
   96        case Protocol::Command::CreateSubscription:
 
   97            registerSubscriber(Protocol::cmdCast<Protocol::CreateSubscriptionCommand>(cmd));
 
   98            writeCommand(tag, Protocol::CreateSubscriptionResponsePtr::create());
 
  100        case Protocol::Command::ModifySubscription:
 
  101            if (mSubscriber.isEmpty()) {
 
  102                qCWarning(AKONADISERVER_LOG) << 
"Notification subscriber received ModifySubscription command before RegisterSubscriber";
 
  103                disconnectSubscriber();
 
  106            modifySubscription(Protocol::cmdCast<Protocol::ModifySubscriptionCommand>(cmd));
 
  107            writeCommand(tag, Protocol::ModifySubscriptionResponsePtr::create());
 
  109        case Protocol::Command::Logout:
 
  110            disconnectSubscriber();
 
  113            qCWarning(AKONADISERVER_LOG) << 
"Notification subscriber for" << mSubscriber << 
"received an invalid command" << cmd->type();
 
  114            disconnectSubscriber();
 
  120void NotificationSubscriber::socketDisconnected()
 
  122    qCInfo(AKONADISERVER_LOG) << 
"Subscriber" << mSubscriber << 
"disconnected";
 
  123    disconnectSubscriber();
 
  126void NotificationSubscriber::disconnectSubscriber()
 
  128    QMutexLocker locker(&mLock);
 
  130    auto changeNtf = Protocol::SubscriptionChangeNotificationPtr::create();
 
  131    changeNtf->setSubscriber(mSubscriber);
 
  132    changeNtf->setSessionId(mSession);
 
  133    changeNtf->setOperation(Protocol::SubscriptionChangeNotification::Remove);
 
  134    mManager->slotNotify({changeNtf});
 
  142    auto cfs = mManager->collectionFetchScope();
 
  143    cfs->apply(mCollectionFetchScope, Protocol::CollectionFetchScope());
 
  144    cfs->removeSubscriber();
 
  146    auto tfs = mManager->tagFetchScope();
 
  147    tfs->apply(mTagFetchScope, Protocol::TagFetchScope());
 
  148    tfs->removeSubscriber();
 
  150    auto ifs = mManager->itemFetchScope();
 
  151    ifs->apply(mItemFetchScope, Protocol::ItemFetchScope());
 
  152    ifs->removeSubscriber();
 
  154    mManager->forgetSubscriber(
this);
 
  158void NotificationSubscriber::registerSubscriber(
const Protocol::CreateSubscriptionCommand &command)
 
  160    QMutexLocker locker(&mLock);
 
  162    qCInfo(AKONADISERVER_LOG) << 
"Subscriber" << 
this << 
"identified as" << command.subscriberName();
 
  163    mSubscriber = command.subscriberName();
 
  164    mSession = command.session();
 
  166    auto changeNtf = Protocol::SubscriptionChangeNotificationPtr::create();
 
  167    changeNtf->setSubscriber(mSubscriber);
 
  168    changeNtf->setSessionId(mSession);
 
  169    changeNtf->setOperation(Protocol::SubscriptionChangeNotification::Add);
 
  170    mManager->slotNotify({changeNtf});
 
  173static QStringList canonicalMimeTypes(
const QStringList &mimes)
 
  175    static QMimeDatabase sMimeDatabase;
 
  178    auto canonicalMime = [](
const QString &mime) {
 
  181    std::transform(mimes.
begin(), mimes.
end(), std::back_inserter(ret), canonicalMime);
 
  185void NotificationSubscriber::modifySubscription(
const Protocol::ModifySubscriptionCommand &command)
 
  187    QMutexLocker locker(&mLock);
 
  189    const auto modifiedParts = command.modifiedParts();
 
  191#define START_MONITORING(type)                                                                                                                                 \ 
  192    (modifiedParts & Protocol::ModifySubscriptionCommand::ModifiedParts(Protocol::ModifySubscriptionCommand::type | Protocol::ModifySubscriptionCommand::Add)) 
  193#define STOP_MONITORING(type)                                                                                                                                  \ 
  195     & Protocol::ModifySubscriptionCommand::ModifiedParts(Protocol::ModifySubscriptionCommand::type | Protocol::ModifySubscriptionCommand::Remove)) 
  197#define APPEND(set, newItemsFetch)                                                                                                                             \ 
  199        const auto newItems = newItemsFetch;                                                                                                                   \ 
  200        for (const auto &entity : newItems) {                                                                                                                  \ 
  201            (set).insert(entity);                                                                                                                              \ 
  205#define REMOVE(set, itemsFetch)                                                                                                                                \ 
  207        const auto items = itemsFetch;                                                                                                                         \ 
  208        for (const auto &entity : items) {                                                                                                                     \ 
  209            (set).remove(entity);                                                                                                                              \ 
  213    if (START_MONITORING(Types)) {
 
  214        APPEND(mMonitoredTypes, command.startMonitoringTypes())
 
  216    if (STOP_MONITORING(Types)) {
 
  217        REMOVE(mMonitoredTypes, command.stopMonitoringTypes())
 
  219    if (START_MONITORING(Collections)) {
 
  220        APPEND(mMonitoredCollections, command.startMonitoringCollections())
 
  222    if (STOP_MONITORING(Collections)) {
 
  223        REMOVE(mMonitoredCollections, command.stopMonitoringCollections())
 
  225    if (START_MONITORING(Items)) {
 
  226        APPEND(mMonitoredItems, command.startMonitoringItems())
 
  228    if (STOP_MONITORING(Items)) {
 
  229        REMOVE(mMonitoredItems, command.stopMonitoringItems())
 
  231    if (START_MONITORING(Tags)) {
 
  232        APPEND(mMonitoredTags, command.startMonitoringTags())
 
  234    if (STOP_MONITORING(Tags)) {
 
  235        REMOVE(mMonitoredTags, command.stopMonitoringTags())
 
  237    if (START_MONITORING(Resources)) {
 
  238        APPEND(mMonitoredResources, command.startMonitoringResources())
 
  240    if (STOP_MONITORING(Resources)) {
 
  241        REMOVE(mMonitoredResources, command.stopMonitoringResources())
 
  243    if (START_MONITORING(MimeTypes)) {
 
  244        APPEND(mMonitoredMimeTypes, canonicalMimeTypes(command.startMonitoringMimeTypes()))
 
  246    if (STOP_MONITORING(MimeTypes)) {
 
  247        REMOVE(mMonitoredMimeTypes, canonicalMimeTypes(command.stopMonitoringMimeTypes()))
 
  249    if (START_MONITORING(Sessions)) {
 
  250        APPEND(mIgnoredSessions, command.startIgnoringSessions())
 
  252    if (STOP_MONITORING(Sessions)) {
 
  253        REMOVE(mIgnoredSessions, command.stopIgnoringSessions())
 
  255    if (modifiedParts & Protocol::ModifySubscriptionCommand::AllFlag) {
 
  256        mAllMonitored = command.allMonitored();
 
  258    if (modifiedParts & Protocol::ModifySubscriptionCommand::ExclusiveFlag) {
 
  259        mExclusive = command.isExclusive();
 
  261    if (modifiedParts & Protocol::ModifySubscriptionCommand::ItemFetchScope) {
 
  262        const auto newScope = command.itemFetchScope();
 
  263        mManager->itemFetchScope()->apply(mItemFetchScope, newScope);
 
  264        mItemFetchScope = newScope;
 
  266    if (modifiedParts & Protocol::ModifySubscriptionCommand::CollectionFetchScope) {
 
  267        const auto newScope = command.collectionFetchScope();
 
  268        mManager->collectionFetchScope()->apply(mCollectionFetchScope, newScope);
 
  269        mCollectionFetchScope = newScope;
 
  271    if (modifiedParts & Protocol::ModifySubscriptionCommand::TagFetchScope) {
 
  272        const auto newScope = command.tagFetchScope();
 
  273        mManager->tagFetchScope()->apply(mTagFetchScope, newScope);
 
  274        mTagFetchScope = newScope;
 
  275        if (!newScope.fetchIdOnly()) {
 
  276            Q_ASSERT(!mManager->tagFetchScope()->fetchIdOnly());
 
  281        if (modifiedParts & Protocol::ModifySubscriptionCommand::Types) {
 
  283            if (command.startMonitoringTypes().contains(Protocol::ModifySubscriptionCommand::SubscriptionChanges)) {
 
  285                mManager->mSubscribers | Views::filter(IsNotNull) | Actions::forEach([
this](
const auto &subscriber) {
 
  289                                              Q_ARG(Akonadi::Protocol::ChangeNotificationPtr, subscriber->toChangeNotification()));
 
  292            if (command.startMonitoringTypes().contains(Protocol::ModifySubscriptionCommand::ChangeNotifications)) {
 
  293                if (!mNotificationDebugging) {
 
  294                    mNotificationDebugging = 
true;
 
  295                    Q_EMIT notificationDebuggingChanged(
true);
 
  297            } 
else if (command.stopMonitoringTypes().contains(Protocol::ModifySubscriptionCommand::ChangeNotifications)) {
 
  298                if (mNotificationDebugging) {
 
  299                    mNotificationDebugging = 
false;
 
  300                    Q_EMIT notificationDebuggingChanged(
false);
 
  306        auto changeNtf = toChangeNotification();
 
  307        changeNtf->setOperation(Protocol::SubscriptionChangeNotification::Modify);
 
  308        mManager->slotNotify({changeNtf});
 
  311#undef START_MONITORING 
  312#undef STOP_MONITORING 
  317Protocol::SubscriptionChangeNotificationPtr NotificationSubscriber::toChangeNotification()
 const 
  321    auto ntf = Protocol::SubscriptionChangeNotificationPtr::create();
 
  322    ntf->setSessionId(mSession);
 
  323    ntf->setSubscriber(mSubscriber);
 
  324    ntf->setOperation(Protocol::SubscriptionChangeNotification::Add);
 
  325    ntf->setCollections(mMonitoredCollections);
 
  326    ntf->setItems(mMonitoredItems);
 
  327    ntf->setTags(mMonitoredTags);
 
  328    ntf->setTypes(mMonitoredTypes);
 
  329    ntf->setMimeTypes(mMonitoredMimeTypes);
 
  330    ntf->setResources(mMonitoredResources);
 
  331    ntf->setIgnoredSessions(mIgnoredSessions);
 
  332    ntf->setAllMonitored(mAllMonitored);
 
  333    ntf->setExclusive(mExclusive);
 
  334    ntf->setItemFetchScope(mItemFetchScope);
 
  335    ntf->setTagFetchScope(mTagFetchScope);
 
  336    ntf->setCollectionFetchScope(mCollectionFetchScope);
 
  340bool NotificationSubscriber::isCollectionMonitored(Entity::Id 
id)
 const 
  348    return mMonitoredCollections.contains(
id) || mMonitoredCollections.contains(0);
 
  351bool NotificationSubscriber::isMimeTypeMonitored(
const QString &mimeType)
 const 
  356    if (mimeType == QLatin1StringView(
"text/directory")) {
 
  357        return mMonitoredMimeTypes.contains(QStringLiteral(
"text/vcard"));
 
  359    return mMonitoredMimeTypes.contains(mimeType);
 
  362bool NotificationSubscriber::isMoveDestinationResourceMonitored(
const Protocol::ItemChangeNotification &msg)
 const 
  366    if (msg.operation() != Protocol::ItemChangeNotification::Move) {
 
  369    return mMonitoredResources.contains(msg.destinationResource());
 
  372bool NotificationSubscriber::isMoveDestinationResourceMonitored(
const Protocol::CollectionChangeNotification &msg)
 const 
  376    if (msg.operation() != Protocol::CollectionChangeNotification::Move) {
 
  379    return mMonitoredResources.contains(msg.destinationResource());
 
  382bool NotificationSubscriber::acceptsItemNotification(
const Protocol::ItemChangeNotification &msg)
 const 
  386    if (msg.items().isEmpty()) {
 
  391        TRACE_NTF(
"ACCEPTS ITEM: all monitored");
 
  395    if (!mMonitoredTypes.isEmpty() && !mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::ItemChanges)) {
 
  396        TRACE_NTF(
"ACCEPTS ITEM: REJECTED - Item changes not monitored");
 
  401    if (!mMonitoredResources.isEmpty() || !mMonitoredMimeTypes.isEmpty()) {
 
  402        if (mMonitoredResources.contains(msg.resource())) {
 
  403            TRACE_NTF(
"ACCEPTS ITEM: ACCEPTED - resource monitored");
 
  407        if (isMoveDestinationResourceMonitored(msg)) {
 
  408            TRACE_NTF(
"ACCEPTS ITEM: ACCEPTED: move destination monitored");
 
  412        const auto items = msg.items();
 
  413        for (
const auto &item : items) {
 
  414            if (isMimeTypeMonitored(item.mimeType())) {
 
  415                TRACE_NTF(
"ACCEPTS ITEM: ACCEPTED - mimetype monitored");
 
  420        TRACE_NTF(
"ACCEPTS ITEM: REJECTED: resource nor mimetype monitored");
 
  425    const auto items = msg.items();
 
  426    for (
const auto &item : items) {
 
  427        if (mMonitoredItems.contains(item.id())) {
 
  428            TRACE_NTF(
"ACCEPTS ITEM: ACCEPTED: item explicitly monitored");
 
  433    if (isCollectionMonitored(msg.parentCollection())) {
 
  434        TRACE_NTF(
"ACCEPTS ITEM: ACCEPTED: parent collection monitored");
 
  437    if (isCollectionMonitored(msg.parentDestCollection())) {
 
  438        TRACE_NTF(
"ACCEPTS ITEM: ACCEPTED: destination collection monitored");
 
  442    TRACE_NTF(
"ACCEPTS ITEM: REJECTED");
 
  446bool NotificationSubscriber::acceptsCollectionNotification(
const Protocol::CollectionChangeNotification &msg)
 const 
  450    const auto &collection = msg.collection();
 
  451    if (collection.id() < 0) {
 
  459    if (msg.metadata().contains(
"DISABLED") && (msg.operation() != Protocol::CollectionChangeNotification::Unsubscribe)
 
  460        && !msg.changedParts().contains(
"ENABLED")) {
 
  471    if (!mMonitoredTypes.isEmpty() && !mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::CollectionChanges)) {
 
  476    if (!mMonitoredResources.isEmpty()) {
 
  477        const bool resourceMatches = mMonitoredResources.contains(msg.resource()) || isMoveDestinationResourceMonitored(msg);
 
  481        if (mMonitoredMimeTypes.isEmpty() || resourceMatches) {
 
  482            return resourceMatches;
 
  488    if (isCollectionMonitored(collection.id())) {
 
  492    return isCollectionMonitored(msg.parentCollection()) || isCollectionMonitored(msg.parentDestCollection());
 
  495bool NotificationSubscriber::acceptsTagNotification(
const Protocol::TagChangeNotification &msg)
 const 
  499    if (msg.tag().id() < 0) {
 
  508    if (msg.operation() == Protocol::TagChangeNotification::Remove) {
 
  516        if (!mIgnoredSessions.isEmpty() && msg.resource().isEmpty()) {
 
  522        if (!msg.resource().isEmpty() && !mIgnoredSessions.contains(msg.resource())) {
 
  537    if (!mMonitoredTypes.isEmpty() && !mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::TagChanges)) {
 
  541    if (mMonitoredTags.isEmpty()) {
 
  545    if (mMonitoredTags.contains(msg.tag().id())) {
 
  552bool NotificationSubscriber::acceptsSubscriptionNotification(
const Protocol::SubscriptionChangeNotification &msg)
 const 
  560    return mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::SubscriptionChanges);
 
  563bool NotificationSubscriber::acceptsDebugChangeNotification(
const Protocol::DebugChangeNotification &msg)
 const 
  569    Q_ASSERT(msg.notification()->type() != Protocol::Command::DebugChangeNotification);
 
  570    if (msg.notification()->type() == Protocol::Command::DebugChangeNotification) {
 
  576    return mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::ChangeNotifications);
 
  579bool NotificationSubscriber::acceptsNotification(
const Protocol::ChangeNotification &msg)
 const 
  584    if (mSubscriber.isEmpty()) {
 
  590    if (mIgnoredSessions.contains(msg.sessionId())) {
 
  594    switch (msg.type()) {
 
  595    case Protocol::Command::ItemChangeNotification:
 
  596        return acceptsItemNotification(
static_cast<const Protocol::ItemChangeNotification &
>(msg));
 
  597    case Protocol::Command::CollectionChangeNotification:
 
  598        return acceptsCollectionNotification(
static_cast<const Protocol::CollectionChangeNotification &
>(msg));
 
  599    case Protocol::Command::TagChangeNotification:
 
  600        return acceptsTagNotification(
static_cast<const Protocol::TagChangeNotification &
>(msg));
 
  601    case Protocol::Command::SubscriptionChangeNotification:
 
  602        return acceptsSubscriptionNotification(
static_cast<const Protocol::SubscriptionChangeNotification &
>(msg));
 
  603    case Protocol::Command::DebugChangeNotification:
 
  604        return acceptsDebugChangeNotification(
static_cast<const Protocol::DebugChangeNotification &
>(msg));
 
  607        qCWarning(AKONADISERVER_LOG) << 
"NotificationSubscriber" << mSubscriber << 
"received an invalid notification type" << msg.type();
 
  612bool NotificationSubscriber::notify(
const Protocol::ChangeNotificationPtr ¬ification)
 
  615    QPointer<NotificationSubscriber> ptr(
this);
 
  616    QMutexLocker locker(&mLock);
 
  621    if (acceptsNotification(*notification)) {
 
  628void NotificationSubscriber::writeNotification(
const Protocol::ChangeNotificationPtr ¬ification)
 
  631    writeCommand(4, notification);
 
  634void NotificationSubscriber::writeCommand(qint64 tag, 
const Protocol::CommandPtr &cmd)
 
  638    Protocol::DataStream stream(mSocket);
 
  641        Protocol::serialize(stream, cmd);
 
  643        if (!mSocket->waitForBytesWritten()) {
 
  645                qCWarning(AKONADISERVER_LOG) << 
"NotificationSubscriber for" << mSubscriber << 
": timeout writing into stream";
 
  650    } 
catch (
const ProtocolException &e) {
 
  651        qCWarning(AKONADISERVER_LOG) << 
"ProtocolException while writing into stream for subscriber" << mSubscriber << 
":" << e.what();
 
  655#include "moc_notificationsubscriber.cpp" 
Helper integration between Akonadi and Qt.
 
qsizetype count() const const
 
void reserve(qsizetype size)
 
QMimeType mimeTypeForName(const QString &nameOrAlias) const const
 
QFuture< ArgsType< Signal > > connect(Sender *sender, Signal signal)
 
QThread * currentThread()