Akonadi

server/notificationsubscriber.cpp
1/*
2 SPDX-FileCopyrightText: 2015 Daniel Vrátil <dvratil@kde.org>
3
4 SPDX-License-Identifier: LGPL-2.0-or-later
5*/
6
7#include "notificationsubscriber.h"
8#include "aggregatedfetchscope.h"
9#include "akonadiserver_debug.h"
10#include "notificationmanager.h"
11#include "utils.h"
12
13#include <QLocalSocket>
14#include <QPointer>
15
16#include "private/datastream_p_p.h"
17#include "private/protocol_exception_p.h"
18#include "shared/akranges.h"
19
20using namespace Akonadi;
21using namespace Akonadi::Server;
22using namespace AkRanges;
23
24#define TRACE_NTF(x)
25// #define TRACE_NTF(x) qCDebug(AKONADISERVER_LOG) << mSubscriber << x
26
27NotificationSubscriber::NotificationSubscriber(NotificationManager *manager)
28 : mManager(manager)
29 , mSocket(nullptr)
30 , mAllMonitored(false)
31 , mExclusive(false)
32 , mNotificationDebugging(false)
33{
34 if (mManager) {
35 mManager->itemFetchScope()->addSubscriber();
36 mManager->collectionFetchScope()->addSubscriber();
37 mManager->tagFetchScope()->addSubscriber();
38 }
39}
40
41NotificationSubscriber::NotificationSubscriber(NotificationManager *manager, quintptr socketDescriptor)
42 : NotificationSubscriber(manager)
43{
44 mSocket = new QLocalSocket(this);
45 connect(mSocket, &QLocalSocket::readyRead, this, &NotificationSubscriber::handleIncomingData);
46 connect(mSocket, &QLocalSocket::disconnected, this, &NotificationSubscriber::socketDisconnected);
47 mSocket->setSocketDescriptor(socketDescriptor);
48
49 const SchemaVersion schema = SchemaVersion::retrieveAll().at(0);
50
51 auto hello = Protocol::HelloResponsePtr::create();
52 hello->setServerName(QStringLiteral("Akonadi"));
53 hello->setMessage(QStringLiteral("Not really IMAP server"));
54 hello->setProtocolVersion(Protocol::version());
55 hello->setGeneration(schema.generation());
56 writeCommand(0, hello);
57}
58
59NotificationSubscriber::~NotificationSubscriber()
60{
61 QMutexLocker locker(&mLock);
62
63 if (mNotificationDebugging) {
64 Q_EMIT notificationDebuggingChanged(false);
65 }
66}
67
68void NotificationSubscriber::handleIncomingData()
69{
70 while (mSocket->bytesAvailable() > static_cast<int>(sizeof(qint64))) {
71 Protocol::DataStream stream(mSocket);
72
73 // Ignored atm
74 qint64 tag = -1;
75 stream >> tag;
76
78 try {
79 cmd = Protocol::deserialize(mSocket);
80 } catch (const Akonadi::ProtocolException &e) {
81 qCWarning(AKONADISERVER_LOG) << "ProtocolException while reading from notification bus for" << mSubscriber << ":" << e.what();
82 disconnectSubscriber();
83 return;
84 } catch (const std::exception &e) {
85 qCWarning(AKONADISERVER_LOG) << "Unknown exception while reading from notification bus for" << mSubscriber << ":" << e.what();
86 disconnectSubscriber();
87 return;
88 }
89 if (cmd->type() == Protocol::Command::Invalid) {
90 qCWarning(AKONADISERVER_LOG) << "Invalid command while reading from notification bus for " << mSubscriber << ", resetting connection";
91 disconnectSubscriber();
92 return;
93 }
94
95 switch (cmd->type()) {
96 case Protocol::Command::CreateSubscription:
97 registerSubscriber(Protocol::cmdCast<Protocol::CreateSubscriptionCommand>(cmd));
98 writeCommand(tag, Protocol::CreateSubscriptionResponsePtr::create());
99 break;
100 case Protocol::Command::ModifySubscription:
101 if (mSubscriber.isEmpty()) {
102 qCWarning(AKONADISERVER_LOG) << "Notification subscriber received ModifySubscription command before RegisterSubscriber";
103 disconnectSubscriber();
104 return;
105 }
106 modifySubscription(Protocol::cmdCast<Protocol::ModifySubscriptionCommand>(cmd));
107 writeCommand(tag, Protocol::ModifySubscriptionResponsePtr::create());
108 break;
109 case Protocol::Command::Logout:
110 disconnectSubscriber();
111 break;
112 default:
113 qCWarning(AKONADISERVER_LOG) << "Notification subscriber for" << mSubscriber << "received an invalid command" << cmd->type();
114 disconnectSubscriber();
115 break;
116 }
117 }
118}
119
120void NotificationSubscriber::socketDisconnected()
121{
122 qCInfo(AKONADISERVER_LOG) << "Subscriber" << mSubscriber << "disconnected";
123 disconnectSubscriber();
124}
125
126void NotificationSubscriber::disconnectSubscriber()
127{
128 QMutexLocker locker(&mLock);
129
130 auto changeNtf = Protocol::SubscriptionChangeNotificationPtr::create();
131 changeNtf->setSubscriber(mSubscriber);
132 changeNtf->setSessionId(mSession);
133 changeNtf->setOperation(Protocol::SubscriptionChangeNotification::Remove);
134 mManager->slotNotify({changeNtf});
135
136 if (mSocket) {
137 disconnect(mSocket, &QLocalSocket::disconnected, this, &NotificationSubscriber::socketDisconnected);
138 mSocket->close();
139 }
140
141 // Unregister ourselves from the aggregated collection fetch scope
142 auto cfs = mManager->collectionFetchScope();
143 cfs->apply(mCollectionFetchScope, Protocol::CollectionFetchScope());
144 cfs->removeSubscriber();
145
146 auto tfs = mManager->tagFetchScope();
147 tfs->apply(mTagFetchScope, Protocol::TagFetchScope());
148 tfs->removeSubscriber();
149
150 auto ifs = mManager->itemFetchScope();
151 ifs->apply(mItemFetchScope, Protocol::ItemFetchScope());
152 ifs->removeSubscriber();
153
154 mManager->forgetSubscriber(this);
155 deleteLater();
156}
157
158void NotificationSubscriber::registerSubscriber(const Protocol::CreateSubscriptionCommand &command)
159{
160 QMutexLocker locker(&mLock);
161
162 qCInfo(AKONADISERVER_LOG) << "Subscriber" << this << "identified as" << command.subscriberName();
163 mSubscriber = command.subscriberName();
164 mSession = command.session();
165
166 auto changeNtf = Protocol::SubscriptionChangeNotificationPtr::create();
167 changeNtf->setSubscriber(mSubscriber);
168 changeNtf->setSessionId(mSession);
169 changeNtf->setOperation(Protocol::SubscriptionChangeNotification::Add);
170 mManager->slotNotify({changeNtf});
171}
172
173static QStringList canonicalMimeTypes(const QStringList &mimes)
174{
175 static QMimeDatabase sMimeDatabase;
176 QStringList ret;
177 ret.reserve(mimes.count());
178 auto canonicalMime = [](const QString &mime) {
179 return sMimeDatabase.mimeTypeForName(mime).name();
180 };
181 std::transform(mimes.begin(), mimes.end(), std::back_inserter(ret), canonicalMime);
182 return ret;
183}
184
185void NotificationSubscriber::modifySubscription(const Protocol::ModifySubscriptionCommand &command)
186{
187 QMutexLocker locker(&mLock);
188
189 const auto modifiedParts = command.modifiedParts();
190
191#define START_MONITORING(type) \
192 (modifiedParts & Protocol::ModifySubscriptionCommand::ModifiedParts(Protocol::ModifySubscriptionCommand::type | Protocol::ModifySubscriptionCommand::Add))
193#define STOP_MONITORING(type) \
194 (modifiedParts \
195 & Protocol::ModifySubscriptionCommand::ModifiedParts(Protocol::ModifySubscriptionCommand::type | Protocol::ModifySubscriptionCommand::Remove))
196
197#define APPEND(set, newItemsFetch) \
198 { \
199 const auto newItems = newItemsFetch; \
200 for (const auto &entity : newItems) { \
201 (set).insert(entity); \
202 } \
203 }
204
205#define REMOVE(set, itemsFetch) \
206 { \
207 const auto items = itemsFetch; \
208 for (const auto &entity : items) { \
209 (set).remove(entity); \
210 } \
211 }
212
213 if (START_MONITORING(Types)) {
214 APPEND(mMonitoredTypes, command.startMonitoringTypes())
215 }
216 if (STOP_MONITORING(Types)) {
217 REMOVE(mMonitoredTypes, command.stopMonitoringTypes())
218 }
219 if (START_MONITORING(Collections)) {
220 APPEND(mMonitoredCollections, command.startMonitoringCollections())
221 }
222 if (STOP_MONITORING(Collections)) {
223 REMOVE(mMonitoredCollections, command.stopMonitoringCollections())
224 }
225 if (START_MONITORING(Items)) {
226 APPEND(mMonitoredItems, command.startMonitoringItems())
227 }
228 if (STOP_MONITORING(Items)) {
229 REMOVE(mMonitoredItems, command.stopMonitoringItems())
230 }
231 if (START_MONITORING(Tags)) {
232 APPEND(mMonitoredTags, command.startMonitoringTags())
233 }
234 if (STOP_MONITORING(Tags)) {
235 REMOVE(mMonitoredTags, command.stopMonitoringTags())
236 }
237 if (START_MONITORING(Resources)) {
238 APPEND(mMonitoredResources, command.startMonitoringResources())
239 }
240 if (STOP_MONITORING(Resources)) {
241 REMOVE(mMonitoredResources, command.stopMonitoringResources())
242 }
243 if (START_MONITORING(MimeTypes)) {
244 APPEND(mMonitoredMimeTypes, canonicalMimeTypes(command.startMonitoringMimeTypes()))
245 }
246 if (STOP_MONITORING(MimeTypes)) {
247 REMOVE(mMonitoredMimeTypes, canonicalMimeTypes(command.stopMonitoringMimeTypes()))
248 }
249 if (START_MONITORING(Sessions)) {
250 APPEND(mIgnoredSessions, command.startIgnoringSessions())
251 }
252 if (STOP_MONITORING(Sessions)) {
253 REMOVE(mIgnoredSessions, command.stopIgnoringSessions())
254 }
255 if (modifiedParts & Protocol::ModifySubscriptionCommand::AllFlag) {
256 mAllMonitored = command.allMonitored();
257 }
258 if (modifiedParts & Protocol::ModifySubscriptionCommand::ExclusiveFlag) {
259 mExclusive = command.isExclusive();
260 }
261 if (modifiedParts & Protocol::ModifySubscriptionCommand::ItemFetchScope) {
262 const auto newScope = command.itemFetchScope();
263 mManager->itemFetchScope()->apply(mItemFetchScope, newScope);
264 mItemFetchScope = newScope;
265 }
266 if (modifiedParts & Protocol::ModifySubscriptionCommand::CollectionFetchScope) {
267 const auto newScope = command.collectionFetchScope();
268 mManager->collectionFetchScope()->apply(mCollectionFetchScope, newScope);
269 mCollectionFetchScope = newScope;
270 }
271 if (modifiedParts & Protocol::ModifySubscriptionCommand::TagFetchScope) {
272 const auto newScope = command.tagFetchScope();
273 mManager->tagFetchScope()->apply(mTagFetchScope, newScope);
274 mTagFetchScope = newScope;
275 if (!newScope.fetchIdOnly()) {
276 Q_ASSERT(!mManager->tagFetchScope()->fetchIdOnly());
277 }
278 }
279
280 if (mManager) {
281 if (modifiedParts & Protocol::ModifySubscriptionCommand::Types) {
282 // Did the caller just subscribed to subscription changes?
283 if (command.startMonitoringTypes().contains(Protocol::ModifySubscriptionCommand::SubscriptionChanges)) {
284 // If yes, then send them list of all existing subscribers
285 mManager->mSubscribers | Views::filter(IsNotNull) | Actions::forEach([this](const auto &subscriber) {
287 "notify",
289 Q_ARG(Akonadi::Protocol::ChangeNotificationPtr, subscriber->toChangeNotification()));
290 });
291 }
292 if (command.startMonitoringTypes().contains(Protocol::ModifySubscriptionCommand::ChangeNotifications)) {
293 if (!mNotificationDebugging) {
294 mNotificationDebugging = true;
295 Q_EMIT notificationDebuggingChanged(true);
296 }
297 } else if (command.stopMonitoringTypes().contains(Protocol::ModifySubscriptionCommand::ChangeNotifications)) {
298 if (mNotificationDebugging) {
299 mNotificationDebugging = false;
300 Q_EMIT notificationDebuggingChanged(false);
301 }
302 }
303 }
304
305 // Emit subscription change notification
306 auto changeNtf = toChangeNotification();
307 changeNtf->setOperation(Protocol::SubscriptionChangeNotification::Modify);
308 mManager->slotNotify({changeNtf});
309 }
310
311#undef START_MONITORING
312#undef STOP_MONITORING
313#undef APPEND
314#undef REMOVE
315}
316
317Protocol::SubscriptionChangeNotificationPtr NotificationSubscriber::toChangeNotification() const
318{
319 // Assumes mLock being locked by caller
320
321 auto ntf = Protocol::SubscriptionChangeNotificationPtr::create();
322 ntf->setSessionId(mSession);
323 ntf->setSubscriber(mSubscriber);
324 ntf->setOperation(Protocol::SubscriptionChangeNotification::Add);
325 ntf->setCollections(mMonitoredCollections);
326 ntf->setItems(mMonitoredItems);
327 ntf->setTags(mMonitoredTags);
328 ntf->setTypes(mMonitoredTypes);
329 ntf->setMimeTypes(mMonitoredMimeTypes);
330 ntf->setResources(mMonitoredResources);
331 ntf->setIgnoredSessions(mIgnoredSessions);
332 ntf->setAllMonitored(mAllMonitored);
333 ntf->setExclusive(mExclusive);
334 ntf->setItemFetchScope(mItemFetchScope);
335 ntf->setTagFetchScope(mTagFetchScope);
336 ntf->setCollectionFetchScope(mCollectionFetchScope);
337 return ntf;
338}
339
340bool NotificationSubscriber::isCollectionMonitored(Entity::Id id) const
341{
342 // Assumes mLock being locked by caller
343
344 if (id < 0) {
345 return false;
346 }
347
348 return mMonitoredCollections.contains(id) || mMonitoredCollections.contains(0);
349}
350
351bool NotificationSubscriber::isMimeTypeMonitored(const QString &mimeType) const
352{
353 // Assumes mLock being locked by caller
354
355 // KContacts::Addressee::mimeType() unfortunately uses an alias
356 if (mimeType == QLatin1StringView("text/directory")) {
357 return mMonitoredMimeTypes.contains(QStringLiteral("text/vcard"));
358 }
359 return mMonitoredMimeTypes.contains(mimeType);
360}
361
362bool NotificationSubscriber::isMoveDestinationResourceMonitored(const Protocol::ItemChangeNotification &msg) const
363{
364 // Assumes mLock being locked by caller
365
366 if (msg.operation() != Protocol::ItemChangeNotification::Move) {
367 return false;
368 }
369 return mMonitoredResources.contains(msg.destinationResource());
370}
371
372bool NotificationSubscriber::isMoveDestinationResourceMonitored(const Protocol::CollectionChangeNotification &msg) const
373{
374 // Assumes mLock being locked by caller
375
376 if (msg.operation() != Protocol::CollectionChangeNotification::Move) {
377 return false;
378 }
379 return mMonitoredResources.contains(msg.destinationResource());
380}
381
382bool NotificationSubscriber::acceptsItemNotification(const Protocol::ItemChangeNotification &msg) const
383{
384 // Assumes mLock being locked by caller
385
386 if (msg.items().isEmpty()) {
387 return false;
388 }
389
390 if (mAllMonitored) {
391 TRACE_NTF("ACCEPTS ITEM: all monitored");
392 return true;
393 }
394
395 if (!mMonitoredTypes.isEmpty() && !mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::ItemChanges)) {
396 TRACE_NTF("ACCEPTS ITEM: REJECTED - Item changes not monitored");
397 return false;
398 }
399
400 // we have a resource or mimetype filter
401 if (!mMonitoredResources.isEmpty() || !mMonitoredMimeTypes.isEmpty()) {
402 if (mMonitoredResources.contains(msg.resource())) {
403 TRACE_NTF("ACCEPTS ITEM: ACCEPTED - resource monitored");
404 return true;
405 }
406
407 if (isMoveDestinationResourceMonitored(msg)) {
408 TRACE_NTF("ACCEPTS ITEM: ACCEPTED: move destination monitored");
409 return true;
410 }
411
412 const auto items = msg.items();
413 for (const auto &item : items) {
414 if (isMimeTypeMonitored(item.mimeType())) {
415 TRACE_NTF("ACCEPTS ITEM: ACCEPTED - mimetype monitored");
416 return true;
417 }
418 }
419
420 TRACE_NTF("ACCEPTS ITEM: REJECTED: resource nor mimetype monitored");
421 return false;
422 }
423
424 // we explicitly monitor that item or the collections it's in
425 const auto items = msg.items();
426 for (const auto &item : items) {
427 if (mMonitoredItems.contains(item.id())) {
428 TRACE_NTF("ACCEPTS ITEM: ACCEPTED: item explicitly monitored");
429 return true;
430 }
431 }
432
433 if (isCollectionMonitored(msg.parentCollection())) {
434 TRACE_NTF("ACCEPTS ITEM: ACCEPTED: parent collection monitored");
435 return true;
436 }
437 if (isCollectionMonitored(msg.parentDestCollection())) {
438 TRACE_NTF("ACCEPTS ITEM: ACCEPTED: destination collection monitored");
439 return true;
440 }
441
442 TRACE_NTF("ACCEPTS ITEM: REJECTED");
443 return false;
444}
445
446bool NotificationSubscriber::acceptsCollectionNotification(const Protocol::CollectionChangeNotification &msg) const
447{
448 // Assumes mLock being locked by caller
449
450 const auto &collection = msg.collection();
451 if (collection.id() < 0) {
452 return false;
453 }
454
455 // HACK: We need to dispatch notifications about disabled collections to SOME
456 // agents (that's what we have the exclusive subscription for) - but because
457 // querying each Collection from database would be expensive, we use the
458 // metadata hack to transfer this information from NotificationCollector
459 if (msg.metadata().contains("DISABLED") && (msg.operation() != Protocol::CollectionChangeNotification::Unsubscribe)
460 && !msg.changedParts().contains("ENABLED")) {
461 // Exclusive subscriber always gets it
462 // If the subscriber is not exclusive (i.e. if we got here), then the subscriber does
463 // not care about this one, so drop it
464 return mExclusive;
465 }
466
467 if (mAllMonitored) {
468 return true;
469 }
470
471 if (!mMonitoredTypes.isEmpty() && !mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::CollectionChanges)) {
472 return false;
473 }
474
475 // we have a resource filter
476 if (!mMonitoredResources.isEmpty()) {
477 const bool resourceMatches = mMonitoredResources.contains(msg.resource()) || isMoveDestinationResourceMonitored(msg);
478
479 // a bit hacky, but match the behaviour from the item case,
480 // if resource is the only thing we are filtering on, stop here, and if the resource filter matched, of course
481 if (mMonitoredMimeTypes.isEmpty() || resourceMatches) {
482 return resourceMatches;
483 }
484 // else continue
485 }
486
487 // we explicitly monitor that collection, or all of them
488 if (isCollectionMonitored(collection.id())) {
489 return true;
490 }
491
492 return isCollectionMonitored(msg.parentCollection()) || isCollectionMonitored(msg.parentDestCollection());
493}
494
495bool NotificationSubscriber::acceptsTagNotification(const Protocol::TagChangeNotification &msg) const
496{
497 // Assumes mLock being locked by caller
498
499 if (msg.tag().id() < 0) {
500 return false;
501 }
502
503 // Special handling for Tag removal notifications: When a Tag is removed,
504 // a notification is emitted for each Resource that owns the tag (i.e.
505 // each resource that owns a Tag RID - Tag RIDs are resource-specific).
506 // Additionally then we send one more notification without any RID that is
507 // destined for regular applications (which don't know anything about Tag RIDs)
508 if (msg.operation() == Protocol::TagChangeNotification::Remove) {
509 // HACK: Since have no way to determine which resource this NotificationSource
510 // belongs to, we are abusing the fact that each resource ignores it's own
511 // main session, which is called the same name as the resource.
512
513 // If there are any ignored sessions, but this notification does not have
514 // a specific resource set, then we ignore it, as this notification is
515 // for clients, not resources (does not have tag RID)
516 if (!mIgnoredSessions.isEmpty() && msg.resource().isEmpty()) {
517 return false;
518 }
519
520 // If this source ignores a session (i.e. we assume it is a resource),
521 // but this notification is for another resource, then we ignore it
522 if (!msg.resource().isEmpty() && !mIgnoredSessions.contains(msg.resource())) {
523 return false;
524 }
525 // Now we got here, which means that this notification either has empty
526 // resource, i.e. it is destined for a client applications, or it's
527 // destined for resource that we *think* (see the hack above) this
528 // NotificationSource belongs too. Which means we approve this notification,
529 // but it can still be discarded in the generic Tag notification filter
530 // below
531 }
532
533 if (mAllMonitored) {
534 return true;
535 }
536
537 if (!mMonitoredTypes.isEmpty() && !mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::TagChanges)) {
538 return false;
539 }
540
541 if (mMonitoredTags.isEmpty()) {
542 return true;
543 }
544
545 if (mMonitoredTags.contains(msg.tag().id())) {
546 return true;
547 }
548
549 return true;
550}
551
552bool NotificationSubscriber::acceptsRelationNotification(const Protocol::RelationChangeNotification &msg) const
553{
554 // Assumes mLock being locked by caller
555
556 Q_UNUSED(msg)
557
558 if (mAllMonitored) {
559 return true;
560 }
561
562 return !(!mMonitoredTypes.isEmpty() && !mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::RelationChanges));
563}
564
565bool NotificationSubscriber::acceptsSubscriptionNotification(const Protocol::SubscriptionChangeNotification &msg) const
566{
567 // Assumes mLock being locked by caller
568
569 Q_UNUSED(msg)
570
571 // Unlike other types, subscription notifications must be explicitly enabled
572 // by caller and are excluded from "monitor all" as well
573 return mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::SubscriptionChanges);
574}
575
576bool NotificationSubscriber::acceptsDebugChangeNotification(const Protocol::DebugChangeNotification &msg) const
577{
578 // Assumes mLock being locked by caller
579
580 // We should never end up sending debug notification about a debug notification.
581 // This could get very messy very quickly...
582 Q_ASSERT(msg.notification()->type() != Protocol::Command::DebugChangeNotification);
583 if (msg.notification()->type() == Protocol::Command::DebugChangeNotification) {
584 return false;
585 }
586
587 // Unlike other types, debug change notifications must be explicitly enabled
588 // by caller and are excluded from "monitor all" as well
589 return mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::ChangeNotifications);
590}
591
592bool NotificationSubscriber::acceptsNotification(const Protocol::ChangeNotification &msg) const
593{
594 // Assumes mLock being locked
595
596 // Uninitialized subscriber gets nothing
597 if (mSubscriber.isEmpty()) {
598 return false;
599 }
600
601 // session is ignored
602 // TODO: Should this affect SubscriptionChangeNotification and DebugChangeNotification?
603 if (mIgnoredSessions.contains(msg.sessionId())) {
604 return false;
605 }
606
607 switch (msg.type()) {
608 case Protocol::Command::ItemChangeNotification:
609 return acceptsItemNotification(static_cast<const Protocol::ItemChangeNotification &>(msg));
610 case Protocol::Command::CollectionChangeNotification:
611 return acceptsCollectionNotification(static_cast<const Protocol::CollectionChangeNotification &>(msg));
612 case Protocol::Command::TagChangeNotification:
613 return acceptsTagNotification(static_cast<const Protocol::TagChangeNotification &>(msg));
614 case Protocol::Command::RelationChangeNotification:
615 return acceptsRelationNotification(static_cast<const Protocol::RelationChangeNotification &>(msg));
616 case Protocol::Command::SubscriptionChangeNotification:
617 return acceptsSubscriptionNotification(static_cast<const Protocol::SubscriptionChangeNotification &>(msg));
618 case Protocol::Command::DebugChangeNotification:
619 return acceptsDebugChangeNotification(static_cast<const Protocol::DebugChangeNotification &>(msg));
620
621 default:
622 qCWarning(AKONADISERVER_LOG) << "NotificationSubscriber" << mSubscriber << "received an invalid notification type" << msg.type();
623 return false;
624 }
625}
626
627bool NotificationSubscriber::notify(const Protocol::ChangeNotificationPtr &notification)
628{
629 // Guard against this object being deleted while we are waiting for the lock
631 QMutexLocker locker(&mLock);
632 if (!ptr) {
633 return false;
634 }
635
636 if (acceptsNotification(*notification)) {
637 QMetaObject::invokeMethod(this, "writeNotification", Qt::QueuedConnection, Q_ARG(Akonadi::Protocol::ChangeNotificationPtr, notification));
638 return true;
639 }
640 return false;
641}
642
643void NotificationSubscriber::writeNotification(const Protocol::ChangeNotificationPtr &notification)
644{
645 // tag chosen by fair dice roll
646 writeCommand(4, notification);
647}
648
649void NotificationSubscriber::writeCommand(qint64 tag, const Protocol::CommandPtr &cmd)
650{
651 Q_ASSERT(QThread::currentThread() == thread());
652
653 Protocol::DataStream stream(mSocket);
654 stream << tag;
655 try {
656 Protocol::serialize(stream, cmd);
657 stream.flush();
658 if (!mSocket->waitForBytesWritten()) {
659 if (mSocket->state() == QLocalSocket::ConnectedState) {
660 qCWarning(AKONADISERVER_LOG) << "NotificationSubscriber for" << mSubscriber << ": timeout writing into stream";
661 } else {
662 // client has disconnected, just discard the message
663 }
664 }
665 } catch (const ProtocolException &e) {
666 qCWarning(AKONADISERVER_LOG) << "ProtocolException while writing into stream for subscriber" << mSubscriber << ":" << e.what();
667 }
668}
669
670#include "moc_notificationsubscriber.cpp"
Helper integration between Akonadi and Qt.
void readyRead()
iterator begin()
qsizetype count() const const
iterator end()
void reserve(qsizetype size)
void disconnected()
bool invokeMethod(QObject *context, Functor &&function, FunctorReturnType *ret)
QMimeType mimeTypeForName(const QString &nameOrAlias) const const
QueuedConnection
QFuture< ArgsType< Signal > > connect(Sender *sender, Signal signal)
QThread * currentThread()
This file is part of the KDE documentation.
Documentation copyright © 1996-2024 The KDE developers.
Generated on Tue Mar 26 2024 11:13:38 by doxygen 1.10.0 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.