7#include "notificationsubscriber.h"
8#include "aggregatedfetchscope.h"
9#include "akonadiserver_debug.h"
10#include "notificationmanager.h"
13#include <QLocalSocket>
16#include "private/datastream_p_p.h"
17#include "private/protocol_exception_p.h"
18#include "shared/akranges.h"
21using namespace Akonadi::Server;
22using namespace AkRanges;
30 , mAllMonitored(false)
32 , mNotificationDebugging(false)
35 mManager->itemFetchScope()->addSubscriber();
36 mManager->collectionFetchScope()->addSubscriber();
37 mManager->tagFetchScope()->addSubscriber();
41NotificationSubscriber::NotificationSubscriber(
NotificationManager *manager, quintptr socketDescriptor)
42 : NotificationSubscriber(manager)
47 mSocket->setSocketDescriptor(socketDescriptor);
49 const SchemaVersion schema = SchemaVersion::retrieveAll().at(0);
51 auto hello = Protocol::HelloResponsePtr::create();
52 hello->setServerName(QStringLiteral(
"Akonadi"));
53 hello->setMessage(QStringLiteral(
"Not really IMAP server"));
54 hello->setProtocolVersion(Protocol::version());
55 hello->setGeneration(schema.generation());
56 writeCommand(0, hello);
59NotificationSubscriber::~NotificationSubscriber()
63 if (mNotificationDebugging) {
64 Q_EMIT notificationDebuggingChanged(
false);
68void NotificationSubscriber::handleIncomingData()
70 while (mSocket->bytesAvailable() >
static_cast<int>(
sizeof(qint64))) {
71 Protocol::DataStream stream(mSocket);
79 cmd = Protocol::deserialize(mSocket);
80 }
catch (
const Akonadi::ProtocolException &e) {
81 qCWarning(AKONADISERVER_LOG) <<
"ProtocolException while reading from notification bus for" << mSubscriber <<
":" << e.what();
82 disconnectSubscriber();
84 }
catch (
const std::exception &e) {
85 qCWarning(AKONADISERVER_LOG) <<
"Unknown exception while reading from notification bus for" << mSubscriber <<
":" << e.what();
86 disconnectSubscriber();
89 if (cmd->type() == Protocol::Command::Invalid) {
90 qCWarning(AKONADISERVER_LOG) <<
"Invalid command while reading from notification bus for " << mSubscriber <<
", resetting connection";
91 disconnectSubscriber();
95 switch (cmd->type()) {
96 case Protocol::Command::CreateSubscription:
97 registerSubscriber(Protocol::cmdCast<Protocol::CreateSubscriptionCommand>(cmd));
98 writeCommand(tag, Protocol::CreateSubscriptionResponsePtr::create());
100 case Protocol::Command::ModifySubscription:
101 if (mSubscriber.isEmpty()) {
102 qCWarning(AKONADISERVER_LOG) <<
"Notification subscriber received ModifySubscription command before RegisterSubscriber";
103 disconnectSubscriber();
106 modifySubscription(Protocol::cmdCast<Protocol::ModifySubscriptionCommand>(cmd));
107 writeCommand(tag, Protocol::ModifySubscriptionResponsePtr::create());
109 case Protocol::Command::Logout:
110 disconnectSubscriber();
113 qCWarning(AKONADISERVER_LOG) <<
"Notification subscriber for" << mSubscriber <<
"received an invalid command" << cmd->type();
114 disconnectSubscriber();
120void NotificationSubscriber::socketDisconnected()
122 qCInfo(AKONADISERVER_LOG) <<
"Subscriber" << mSubscriber <<
"disconnected";
123 disconnectSubscriber();
126void NotificationSubscriber::disconnectSubscriber()
130 auto changeNtf = Protocol::SubscriptionChangeNotificationPtr::create();
131 changeNtf->setSubscriber(mSubscriber);
132 changeNtf->setSessionId(mSession);
133 changeNtf->setOperation(Protocol::SubscriptionChangeNotification::Remove);
134 mManager->slotNotify({changeNtf});
142 auto cfs = mManager->collectionFetchScope();
143 cfs->apply(mCollectionFetchScope, Protocol::CollectionFetchScope());
144 cfs->removeSubscriber();
146 auto tfs = mManager->tagFetchScope();
147 tfs->apply(mTagFetchScope, Protocol::TagFetchScope());
148 tfs->removeSubscriber();
150 auto ifs = mManager->itemFetchScope();
151 ifs->apply(mItemFetchScope, Protocol::ItemFetchScope());
152 ifs->removeSubscriber();
154 mManager->forgetSubscriber(
this);
158void NotificationSubscriber::registerSubscriber(
const Protocol::CreateSubscriptionCommand &command)
162 qCInfo(AKONADISERVER_LOG) <<
"Subscriber" <<
this <<
"identified as" << command.subscriberName();
163 mSubscriber = command.subscriberName();
164 mSession = command.session();
166 auto changeNtf = Protocol::SubscriptionChangeNotificationPtr::create();
167 changeNtf->setSubscriber(mSubscriber);
168 changeNtf->setSessionId(mSession);
169 changeNtf->setOperation(Protocol::SubscriptionChangeNotification::Add);
170 mManager->slotNotify({changeNtf});
178 auto canonicalMime = [](
const QString &mime) {
181 std::transform(mimes.
begin(), mimes.
end(), std::back_inserter(ret), canonicalMime);
185void NotificationSubscriber::modifySubscription(
const Protocol::ModifySubscriptionCommand &command)
189 const auto modifiedParts = command.modifiedParts();
191#define START_MONITORING(type) \
192 (modifiedParts & Protocol::ModifySubscriptionCommand::ModifiedParts(Protocol::ModifySubscriptionCommand::type | Protocol::ModifySubscriptionCommand::Add))
193#define STOP_MONITORING(type) \
195 & Protocol::ModifySubscriptionCommand::ModifiedParts(Protocol::ModifySubscriptionCommand::type | Protocol::ModifySubscriptionCommand::Remove))
197#define APPEND(set, newItemsFetch) \
199 const auto newItems = newItemsFetch; \
200 for (const auto &entity : newItems) { \
201 (set).insert(entity); \
205#define REMOVE(set, itemsFetch) \
207 const auto items = itemsFetch; \
208 for (const auto &entity : items) { \
209 (set).remove(entity); \
213 if (START_MONITORING(Types)) {
214 APPEND(mMonitoredTypes, command.startMonitoringTypes())
216 if (STOP_MONITORING(Types)) {
217 REMOVE(mMonitoredTypes, command.stopMonitoringTypes())
219 if (START_MONITORING(Collections)) {
220 APPEND(mMonitoredCollections, command.startMonitoringCollections())
222 if (STOP_MONITORING(Collections)) {
223 REMOVE(mMonitoredCollections, command.stopMonitoringCollections())
225 if (START_MONITORING(Items)) {
226 APPEND(mMonitoredItems, command.startMonitoringItems())
228 if (STOP_MONITORING(Items)) {
229 REMOVE(mMonitoredItems, command.stopMonitoringItems())
231 if (START_MONITORING(Tags)) {
232 APPEND(mMonitoredTags, command.startMonitoringTags())
234 if (STOP_MONITORING(Tags)) {
235 REMOVE(mMonitoredTags, command.stopMonitoringTags())
237 if (START_MONITORING(Resources)) {
238 APPEND(mMonitoredResources, command.startMonitoringResources())
240 if (STOP_MONITORING(Resources)) {
241 REMOVE(mMonitoredResources, command.stopMonitoringResources())
243 if (START_MONITORING(MimeTypes)) {
244 APPEND(mMonitoredMimeTypes, canonicalMimeTypes(command.startMonitoringMimeTypes()))
246 if (STOP_MONITORING(MimeTypes)) {
247 REMOVE(mMonitoredMimeTypes, canonicalMimeTypes(command.stopMonitoringMimeTypes()))
249 if (START_MONITORING(Sessions)) {
250 APPEND(mIgnoredSessions, command.startIgnoringSessions())
252 if (STOP_MONITORING(Sessions)) {
253 REMOVE(mIgnoredSessions, command.stopIgnoringSessions())
255 if (modifiedParts & Protocol::ModifySubscriptionCommand::AllFlag) {
256 mAllMonitored = command.allMonitored();
258 if (modifiedParts & Protocol::ModifySubscriptionCommand::ExclusiveFlag) {
259 mExclusive = command.isExclusive();
261 if (modifiedParts & Protocol::ModifySubscriptionCommand::ItemFetchScope) {
262 const auto newScope = command.itemFetchScope();
263 mManager->itemFetchScope()->apply(mItemFetchScope, newScope);
264 mItemFetchScope = newScope;
266 if (modifiedParts & Protocol::ModifySubscriptionCommand::CollectionFetchScope) {
267 const auto newScope = command.collectionFetchScope();
268 mManager->collectionFetchScope()->apply(mCollectionFetchScope, newScope);
269 mCollectionFetchScope = newScope;
271 if (modifiedParts & Protocol::ModifySubscriptionCommand::TagFetchScope) {
272 const auto newScope = command.tagFetchScope();
273 mManager->tagFetchScope()->apply(mTagFetchScope, newScope);
274 mTagFetchScope = newScope;
275 if (!newScope.fetchIdOnly()) {
276 Q_ASSERT(!mManager->tagFetchScope()->fetchIdOnly());
281 if (modifiedParts & Protocol::ModifySubscriptionCommand::Types) {
283 if (command.startMonitoringTypes().contains(Protocol::ModifySubscriptionCommand::SubscriptionChanges)) {
285 mManager->mSubscribers | Views::filter(IsNotNull) | Actions::forEach([
this](
const auto &subscriber) {
292 if (command.startMonitoringTypes().contains(Protocol::ModifySubscriptionCommand::ChangeNotifications)) {
293 if (!mNotificationDebugging) {
294 mNotificationDebugging =
true;
295 Q_EMIT notificationDebuggingChanged(
true);
297 }
else if (command.stopMonitoringTypes().contains(Protocol::ModifySubscriptionCommand::ChangeNotifications)) {
298 if (mNotificationDebugging) {
299 mNotificationDebugging =
false;
300 Q_EMIT notificationDebuggingChanged(
false);
306 auto changeNtf = toChangeNotification();
307 changeNtf->setOperation(Protocol::SubscriptionChangeNotification::Modify);
308 mManager->slotNotify({changeNtf});
311#undef START_MONITORING
312#undef STOP_MONITORING
317Protocol::SubscriptionChangeNotificationPtr NotificationSubscriber::toChangeNotification()
const
321 auto ntf = Protocol::SubscriptionChangeNotificationPtr::create();
322 ntf->setSessionId(mSession);
323 ntf->setSubscriber(mSubscriber);
324 ntf->setOperation(Protocol::SubscriptionChangeNotification::Add);
325 ntf->setCollections(mMonitoredCollections);
326 ntf->setItems(mMonitoredItems);
327 ntf->setTags(mMonitoredTags);
328 ntf->setTypes(mMonitoredTypes);
329 ntf->setMimeTypes(mMonitoredMimeTypes);
330 ntf->setResources(mMonitoredResources);
331 ntf->setIgnoredSessions(mIgnoredSessions);
332 ntf->setAllMonitored(mAllMonitored);
333 ntf->setExclusive(mExclusive);
334 ntf->setItemFetchScope(mItemFetchScope);
335 ntf->setTagFetchScope(mTagFetchScope);
336 ntf->setCollectionFetchScope(mCollectionFetchScope);
340bool NotificationSubscriber::isCollectionMonitored(Entity::Id
id)
const
348 return mMonitoredCollections.contains(
id) || mMonitoredCollections.contains(0);
351bool NotificationSubscriber::isMimeTypeMonitored(
const QString &mimeType)
const
357 return mMonitoredMimeTypes.contains(QStringLiteral(
"text/vcard"));
359 return mMonitoredMimeTypes.contains(mimeType);
362bool NotificationSubscriber::isMoveDestinationResourceMonitored(
const Protocol::ItemChangeNotification &msg)
const
366 if (msg.operation() != Protocol::ItemChangeNotification::Move) {
369 return mMonitoredResources.contains(msg.destinationResource());
372bool NotificationSubscriber::isMoveDestinationResourceMonitored(
const Protocol::CollectionChangeNotification &msg)
const
376 if (msg.operation() != Protocol::CollectionChangeNotification::Move) {
379 return mMonitoredResources.contains(msg.destinationResource());
382bool NotificationSubscriber::acceptsItemNotification(
const Protocol::ItemChangeNotification &msg)
const
386 if (msg.items().isEmpty()) {
391 TRACE_NTF(
"ACCEPTS ITEM: all monitored");
395 if (!mMonitoredTypes.isEmpty() && !mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::ItemChanges)) {
396 TRACE_NTF(
"ACCEPTS ITEM: REJECTED - Item changes not monitored");
401 if (!mMonitoredResources.isEmpty() || !mMonitoredMimeTypes.isEmpty()) {
402 if (mMonitoredResources.contains(msg.resource())) {
403 TRACE_NTF(
"ACCEPTS ITEM: ACCEPTED - resource monitored");
407 if (isMoveDestinationResourceMonitored(msg)) {
408 TRACE_NTF(
"ACCEPTS ITEM: ACCEPTED: move destination monitored");
412 const auto items = msg.items();
413 for (
const auto &item : items) {
414 if (isMimeTypeMonitored(item.mimeType())) {
415 TRACE_NTF(
"ACCEPTS ITEM: ACCEPTED - mimetype monitored");
420 TRACE_NTF(
"ACCEPTS ITEM: REJECTED: resource nor mimetype monitored");
425 const auto items = msg.items();
426 for (
const auto &item : items) {
427 if (mMonitoredItems.contains(item.id())) {
428 TRACE_NTF(
"ACCEPTS ITEM: ACCEPTED: item explicitly monitored");
433 if (isCollectionMonitored(msg.parentCollection())) {
434 TRACE_NTF(
"ACCEPTS ITEM: ACCEPTED: parent collection monitored");
437 if (isCollectionMonitored(msg.parentDestCollection())) {
438 TRACE_NTF(
"ACCEPTS ITEM: ACCEPTED: destination collection monitored");
442 TRACE_NTF(
"ACCEPTS ITEM: REJECTED");
446bool NotificationSubscriber::acceptsCollectionNotification(
const Protocol::CollectionChangeNotification &msg)
const
450 const auto &collection = msg.collection();
451 if (collection.id() < 0) {
459 if (msg.metadata().contains(
"DISABLED") && (msg.operation() != Protocol::CollectionChangeNotification::Unsubscribe)
460 && !msg.changedParts().contains(
"ENABLED")) {
471 if (!mMonitoredTypes.isEmpty() && !mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::CollectionChanges)) {
476 if (!mMonitoredResources.isEmpty()) {
477 const bool resourceMatches = mMonitoredResources.contains(msg.resource()) || isMoveDestinationResourceMonitored(msg);
481 if (mMonitoredMimeTypes.isEmpty() || resourceMatches) {
482 return resourceMatches;
488 if (isCollectionMonitored(collection.id())) {
492 return isCollectionMonitored(msg.parentCollection()) || isCollectionMonitored(msg.parentDestCollection());
495bool NotificationSubscriber::acceptsTagNotification(
const Protocol::TagChangeNotification &msg)
const
499 if (msg.tag().id() < 0) {
508 if (msg.operation() == Protocol::TagChangeNotification::Remove) {
516 if (!mIgnoredSessions.isEmpty() && msg.resource().isEmpty()) {
522 if (!msg.resource().isEmpty() && !mIgnoredSessions.contains(msg.resource())) {
537 if (!mMonitoredTypes.isEmpty() && !mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::TagChanges)) {
541 if (mMonitoredTags.isEmpty()) {
545 if (mMonitoredTags.contains(msg.tag().id())) {
552bool NotificationSubscriber::acceptsSubscriptionNotification(
const Protocol::SubscriptionChangeNotification &msg)
const
560 return mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::SubscriptionChanges);
563bool NotificationSubscriber::acceptsDebugChangeNotification(
const Protocol::DebugChangeNotification &msg)
const
569 Q_ASSERT(msg.notification()->type() != Protocol::Command::DebugChangeNotification);
570 if (msg.notification()->type() == Protocol::Command::DebugChangeNotification) {
576 return mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::ChangeNotifications);
579bool NotificationSubscriber::acceptsNotification(
const Protocol::ChangeNotification &msg)
const
584 if (mSubscriber.isEmpty()) {
590 if (mIgnoredSessions.contains(msg.sessionId())) {
594 switch (msg.type()) {
595 case Protocol::Command::ItemChangeNotification:
596 return acceptsItemNotification(
static_cast<const Protocol::ItemChangeNotification &
>(msg));
597 case Protocol::Command::CollectionChangeNotification:
598 return acceptsCollectionNotification(
static_cast<const Protocol::CollectionChangeNotification &
>(msg));
599 case Protocol::Command::TagChangeNotification:
600 return acceptsTagNotification(
static_cast<const Protocol::TagChangeNotification &
>(msg));
601 case Protocol::Command::SubscriptionChangeNotification:
602 return acceptsSubscriptionNotification(
static_cast<const Protocol::SubscriptionChangeNotification &
>(msg));
603 case Protocol::Command::DebugChangeNotification:
604 return acceptsDebugChangeNotification(
static_cast<const Protocol::DebugChangeNotification &
>(msg));
607 qCWarning(AKONADISERVER_LOG) <<
"NotificationSubscriber" << mSubscriber <<
"received an invalid notification type" << msg.type();
621 if (acceptsNotification(*notification)) {
631 writeCommand(4, notification);
638 Protocol::DataStream stream(mSocket);
641 Protocol::serialize(stream, cmd);
643 if (!mSocket->waitForBytesWritten()) {
645 qCWarning(AKONADISERVER_LOG) <<
"NotificationSubscriber for" << mSubscriber <<
": timeout writing into stream";
650 }
catch (
const ProtocolException &e) {
651 qCWarning(AKONADISERVER_LOG) <<
"ProtocolException while writing into stream for subscriber" << mSubscriber <<
":" << e.what();
655#include "moc_notificationsubscriber.cpp"
Helper integration between Akonadi and Qt.
qsizetype count() const const
void reserve(qsizetype size)
QMimeType mimeTypeForName(const QString &nameOrAlias) const const
QFuture< ArgsType< Signal > > connect(Sender *sender, Signal signal)
QThread * currentThread()