Akonadi

server/notificationsubscriber.cpp
1 /*
2  SPDX-FileCopyrightText: 2015 Daniel Vr├ítil <[email protected]>
3 
4  SPDX-License-Identifier: LGPL-2.0-or-later
5 */
6 
7 #include "notificationsubscriber.h"
8 #include "aggregatedfetchscope.h"
9 #include "akonadiserver_debug.h"
10 #include "notificationmanager.h"
11 #include "utils.h"
12 
13 #include <QLocalSocket>
14 #include <QPointer>
15 
16 #include <private/datastream_p_p.h>
17 #include <private/protocol_exception_p.h>
18 #include <shared/akranges.h>
19 
20 using namespace Akonadi;
21 using namespace Akonadi::Server;
22 using namespace AkRanges;
23 
24 #define TRACE_NTF(x)
25 //#define TRACE_NTF(x) qCDebug(AKONADISERVER_LOG) << mSubscriber << x
26 
27 NotificationSubscriber::NotificationSubscriber(NotificationManager *manager)
28  : mManager(manager)
29  , mSocket(nullptr)
30  , mAllMonitored(false)
31  , mExclusive(false)
32  , mNotificationDebugging(false)
33 {
34  if (mManager) {
35  mManager->itemFetchScope()->addSubscriber();
36  mManager->collectionFetchScope()->addSubscriber();
37  mManager->tagFetchScope()->addSubscriber();
38  }
39 }
40 
41 NotificationSubscriber::NotificationSubscriber(NotificationManager *manager, quintptr socketDescriptor)
42  : NotificationSubscriber(manager)
43 {
44  mSocket = new QLocalSocket(this);
45  connect(mSocket, &QLocalSocket::readyRead, this, &NotificationSubscriber::handleIncomingData);
46  connect(mSocket, &QLocalSocket::disconnected, this, &NotificationSubscriber::socketDisconnected);
47  mSocket->setSocketDescriptor(socketDescriptor);
48 
49  const SchemaVersion schema = SchemaVersion::retrieveAll().at(0);
50 
51  auto hello = Protocol::HelloResponsePtr::create();
52  hello->setServerName(QStringLiteral("Akonadi"));
53  hello->setMessage(QStringLiteral("Not really IMAP server"));
54  hello->setProtocolVersion(Protocol::version());
55  hello->setGeneration(schema.generation());
56  writeCommand(0, hello);
57 }
58 
59 NotificationSubscriber::~NotificationSubscriber()
60 {
61  QMutexLocker locker(&mLock);
62 
63  if (mNotificationDebugging) {
64  Q_EMIT notificationDebuggingChanged(false);
65  }
66 }
67 
68 void NotificationSubscriber::handleIncomingData()
69 {
70  while (mSocket->bytesAvailable() > static_cast<int>(sizeof(qint64))) {
71  Protocol::DataStream stream(mSocket);
72 
73  // Ignored atm
74  qint64 tag = -1;
75  stream >> tag;
76 
78  try {
79  cmd = Protocol::deserialize(mSocket);
80  } catch (const Akonadi::ProtocolException &e) {
81  qCWarning(AKONADISERVER_LOG) << "ProtocolException while reading from notification bus for" << mSubscriber << ":" << e.what();
82  disconnectSubscriber();
83  return;
84  } catch (const std::exception &e) {
85  qCWarning(AKONADISERVER_LOG) << "Unknown exception while reading from notification bus for" << mSubscriber << ":" << e.what();
86  disconnectSubscriber();
87  return;
88  }
89  if (cmd->type() == Protocol::Command::Invalid) {
90  qCWarning(AKONADISERVER_LOG) << "Invalid command while reading from notification bus for " << mSubscriber << ", resetting connection";
91  disconnectSubscriber();
92  return;
93  }
94 
95  switch (cmd->type()) {
96  case Protocol::Command::CreateSubscription:
97  registerSubscriber(Protocol::cmdCast<Protocol::CreateSubscriptionCommand>(cmd));
98  writeCommand(tag, Protocol::CreateSubscriptionResponsePtr::create());
99  break;
100  case Protocol::Command::ModifySubscription:
101  if (mSubscriber.isEmpty()) {
102  qCWarning(AKONADISERVER_LOG) << "Notification subscriber received ModifySubscription command before RegisterSubscriber";
103  disconnectSubscriber();
104  return;
105  }
106  modifySubscription(Protocol::cmdCast<Protocol::ModifySubscriptionCommand>(cmd));
107  writeCommand(tag, Protocol::ModifySubscriptionResponsePtr::create());
108  break;
109  case Protocol::Command::Logout:
110  disconnectSubscriber();
111  break;
112  default:
113  qCWarning(AKONADISERVER_LOG) << "Notification subscriber for" << mSubscriber << "received an invalid command" << cmd->type();
114  disconnectSubscriber();
115  break;
116  }
117  }
118 }
119 
120 void NotificationSubscriber::socketDisconnected()
121 {
122  qCInfo(AKONADISERVER_LOG) << "Subscriber" << mSubscriber << "disconnected";
123  disconnectSubscriber();
124 }
125 
126 void NotificationSubscriber::disconnectSubscriber()
127 {
128  QMutexLocker locker(&mLock);
129 
130  auto changeNtf = Protocol::SubscriptionChangeNotificationPtr::create();
131  changeNtf->setSubscriber(mSubscriber);
132  changeNtf->setSessionId(mSession);
133  changeNtf->setOperation(Protocol::SubscriptionChangeNotification::Remove);
134  mManager->slotNotify({changeNtf});
135 
136  if (mSocket) {
137  disconnect(mSocket, &QLocalSocket::disconnected, this, &NotificationSubscriber::socketDisconnected);
138  mSocket->close();
139  }
140 
141  // Unregister ourselves from the aggregated collection fetch scope
142  auto cfs = mManager->collectionFetchScope();
143  cfs->apply(mCollectionFetchScope, Protocol::CollectionFetchScope());
144  cfs->removeSubscriber();
145 
146  auto tfs = mManager->tagFetchScope();
147  tfs->apply(mTagFetchScope, Protocol::TagFetchScope());
148  tfs->removeSubscriber();
149 
150  auto ifs = mManager->itemFetchScope();
151  ifs->apply(mItemFetchScope, Protocol::ItemFetchScope());
152  ifs->removeSubscriber();
153 
154  mManager->forgetSubscriber(this);
155  deleteLater();
156 }
157 
158 void NotificationSubscriber::registerSubscriber(const Protocol::CreateSubscriptionCommand &command)
159 {
160  QMutexLocker locker(&mLock);
161 
162  qCInfo(AKONADISERVER_LOG) << "Subscriber" << this << "identified as" << command.subscriberName();
163  mSubscriber = command.subscriberName();
164  mSession = command.session();
165 
166  auto changeNtf = Protocol::SubscriptionChangeNotificationPtr::create();
167  changeNtf->setSubscriber(mSubscriber);
168  changeNtf->setSessionId(mSession);
169  changeNtf->setOperation(Protocol::SubscriptionChangeNotification::Add);
170  mManager->slotNotify({changeNtf});
171 }
172 
173 static QStringList canonicalMimeTypes(const QStringList &mimes)
174 {
175  static QMimeDatabase sMimeDatabase;
176  QStringList ret;
177  ret.reserve(mimes.count());
178  auto canonicalMime = [](const QString &mime) {
179  return sMimeDatabase.mimeTypeForName(mime).name();
180  };
181  std::transform(mimes.begin(), mimes.end(), std::back_inserter(ret), canonicalMime);
182  return ret;
183 }
184 
185 void NotificationSubscriber::modifySubscription(const Protocol::ModifySubscriptionCommand &command)
186 {
187  QMutexLocker locker(&mLock);
188 
189  const auto modifiedParts = command.modifiedParts();
190 
191 #define START_MONITORING(type) \
192  (modifiedParts & Protocol::ModifySubscriptionCommand::ModifiedParts(Protocol::ModifySubscriptionCommand::type | Protocol::ModifySubscriptionCommand::Add))
193 #define STOP_MONITORING(type) \
194  (modifiedParts \
195  & Protocol::ModifySubscriptionCommand::ModifiedParts(Protocol::ModifySubscriptionCommand::type | Protocol::ModifySubscriptionCommand::Remove))
196 
197 #define APPEND(set, newItemsFetch) \
198  { \
199  const auto newItems = newItemsFetch; \
200  for (const auto &entity : newItems) { \
201  (set).insert(entity); \
202  } \
203  }
204 
205 #define REMOVE(set, itemsFetch) \
206  { \
207  const auto items = itemsFetch; \
208  for (const auto &entity : items) { \
209  (set).remove(entity); \
210  } \
211  }
212 
213  if (START_MONITORING(Types)) {
214  APPEND(mMonitoredTypes, command.startMonitoringTypes())
215  }
216  if (STOP_MONITORING(Types)) {
217  REMOVE(mMonitoredTypes, command.stopMonitoringTypes())
218  }
219  if (START_MONITORING(Collections)) {
220  APPEND(mMonitoredCollections, command.startMonitoringCollections())
221  }
222  if (STOP_MONITORING(Collections)) {
223  REMOVE(mMonitoredCollections, command.stopMonitoringCollections())
224  }
225  if (START_MONITORING(Items)) {
226  APPEND(mMonitoredItems, command.startMonitoringItems())
227  }
228  if (STOP_MONITORING(Items)) {
229  REMOVE(mMonitoredItems, command.stopMonitoringItems())
230  }
231  if (START_MONITORING(Tags)) {
232  APPEND(mMonitoredTags, command.startMonitoringTags())
233  }
234  if (STOP_MONITORING(Tags)) {
235  REMOVE(mMonitoredTags, command.stopMonitoringTags())
236  }
237  if (START_MONITORING(Resources)) {
238  APPEND(mMonitoredResources, command.startMonitoringResources())
239  }
240  if (STOP_MONITORING(Resources)) {
241  REMOVE(mMonitoredResources, command.stopMonitoringResources())
242  }
243  if (START_MONITORING(MimeTypes)) {
244  APPEND(mMonitoredMimeTypes, canonicalMimeTypes(command.startMonitoringMimeTypes()))
245  }
246  if (STOP_MONITORING(MimeTypes)) {
247  REMOVE(mMonitoredMimeTypes, canonicalMimeTypes(command.stopMonitoringMimeTypes()))
248  }
249  if (START_MONITORING(Sessions)) {
250  APPEND(mIgnoredSessions, command.startIgnoringSessions())
251  }
252  if (STOP_MONITORING(Sessions)) {
253  REMOVE(mIgnoredSessions, command.stopIgnoringSessions())
254  }
255  if (modifiedParts & Protocol::ModifySubscriptionCommand::AllFlag) {
256  mAllMonitored = command.allMonitored();
257  }
258  if (modifiedParts & Protocol::ModifySubscriptionCommand::ExclusiveFlag) {
259  mExclusive = command.isExclusive();
260  }
261  if (modifiedParts & Protocol::ModifySubscriptionCommand::ItemFetchScope) {
262  const auto newScope = command.itemFetchScope();
263  mManager->itemFetchScope()->apply(mItemFetchScope, newScope);
264  mItemFetchScope = newScope;
265  }
266  if (modifiedParts & Protocol::ModifySubscriptionCommand::CollectionFetchScope) {
267  const auto newScope = command.collectionFetchScope();
268  mManager->collectionFetchScope()->apply(mCollectionFetchScope, newScope);
269  mCollectionFetchScope = newScope;
270  }
271  if (modifiedParts & Protocol::ModifySubscriptionCommand::TagFetchScope) {
272  const auto newScope = command.tagFetchScope();
273  mManager->tagFetchScope()->apply(mTagFetchScope, newScope);
274  mTagFetchScope = newScope;
275  if (!newScope.fetchIdOnly()) {
276  Q_ASSERT(!mManager->tagFetchScope()->fetchIdOnly());
277  }
278  }
279 
280  if (mManager) {
281  if (modifiedParts & Protocol::ModifySubscriptionCommand::Types) {
282  // Did the caller just subscribed to subscription changes?
283  if (command.startMonitoringTypes().contains(Protocol::ModifySubscriptionCommand::SubscriptionChanges)) {
284  // If yes, then send them list of all existing subscribers
285  mManager->mSubscribers | Views::filter(IsNotNull) | Actions::forEach([this](const auto &subscriber) {
287  "notify",
289  Q_ARG(Akonadi::Protocol::ChangeNotificationPtr, subscriber->toChangeNotification()));
290  });
291  }
292  if (command.startMonitoringTypes().contains(Protocol::ModifySubscriptionCommand::ChangeNotifications)) {
293  if (!mNotificationDebugging) {
294  mNotificationDebugging = true;
295  Q_EMIT notificationDebuggingChanged(true);
296  }
297  } else if (command.stopMonitoringTypes().contains(Protocol::ModifySubscriptionCommand::ChangeNotifications)) {
298  if (mNotificationDebugging) {
299  mNotificationDebugging = false;
300  Q_EMIT notificationDebuggingChanged(false);
301  }
302  }
303  }
304 
305  // Emit subscription change notification
306  auto changeNtf = toChangeNotification();
307  changeNtf->setOperation(Protocol::SubscriptionChangeNotification::Modify);
308  mManager->slotNotify({changeNtf});
309  }
310 
311 #undef START_MONITORING
312 #undef STOP_MONITORING
313 #undef APPEND
314 #undef REMOVE
315 }
316 
317 Protocol::SubscriptionChangeNotificationPtr NotificationSubscriber::toChangeNotification() const
318 {
319  // Assumes mLock being locked by caller
320 
321  auto ntf = Protocol::SubscriptionChangeNotificationPtr::create();
322  ntf->setSessionId(mSession);
323  ntf->setSubscriber(mSubscriber);
324  ntf->setOperation(Protocol::SubscriptionChangeNotification::Add);
325  ntf->setCollections(mMonitoredCollections);
326  ntf->setItems(mMonitoredItems);
327  ntf->setTags(mMonitoredTags);
328  ntf->setTypes(mMonitoredTypes);
329  ntf->setMimeTypes(mMonitoredMimeTypes);
330  ntf->setResources(mMonitoredResources);
331  ntf->setIgnoredSessions(mIgnoredSessions);
332  ntf->setAllMonitored(mAllMonitored);
333  ntf->setExclusive(mExclusive);
334  ntf->setItemFetchScope(mItemFetchScope);
335  ntf->setTagFetchScope(mTagFetchScope);
336  ntf->setCollectionFetchScope(mCollectionFetchScope);
337  return ntf;
338 }
339 
340 bool NotificationSubscriber::isCollectionMonitored(Entity::Id id) const
341 {
342  // Assumes mLock being locked by caller
343 
344  if (id < 0) {
345  return false;
346  }
347 
348  return mMonitoredCollections.contains(id) || mMonitoredCollections.contains(0);
349 }
350 
351 bool NotificationSubscriber::isMimeTypeMonitored(const QString &mimeType) const
352 {
353  // Assumes mLock being locked by caller
354 
355  // KContacts::Addressee::mimeType() unfortunately uses an alias
356  if (mimeType == QLatin1String("text/directory")) {
357  return mMonitoredMimeTypes.contains(QStringLiteral("text/vcard"));
358  }
359  return mMonitoredMimeTypes.contains(mimeType);
360 }
361 
362 bool NotificationSubscriber::isMoveDestinationResourceMonitored(const Protocol::ItemChangeNotification &msg) const
363 {
364  // Assumes mLock being locked by caller
365 
366  if (msg.operation() != Protocol::ItemChangeNotification::Move) {
367  return false;
368  }
369  return mMonitoredResources.contains(msg.destinationResource());
370 }
371 
372 bool NotificationSubscriber::isMoveDestinationResourceMonitored(const Protocol::CollectionChangeNotification &msg) const
373 {
374  // Assumes mLock being locked by caller
375 
376  if (msg.operation() != Protocol::CollectionChangeNotification::Move) {
377  return false;
378  }
379  return mMonitoredResources.contains(msg.destinationResource());
380 }
381 
382 bool NotificationSubscriber::acceptsItemNotification(const Protocol::ItemChangeNotification &msg) const
383 {
384  // Assumes mLock being locked by caller
385 
386  if (msg.items().isEmpty()) {
387  return false;
388  }
389 
390  if (mAllMonitored) {
391  TRACE_NTF("ACCEPTS ITEM: all monitored");
392  return true;
393  }
394 
395  if (!mMonitoredTypes.isEmpty() && !mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::ItemChanges)) {
396  TRACE_NTF("ACCEPTS ITEM: REJECTED - Item changes not monitored");
397  return false;
398  }
399 
400  // we have a resource or mimetype filter
401  if (!mMonitoredResources.isEmpty() || !mMonitoredMimeTypes.isEmpty()) {
402  if (mMonitoredResources.contains(msg.resource())) {
403  TRACE_NTF("ACCEPTS ITEM: ACCEPTED - resource monitored");
404  return true;
405  }
406 
407  if (isMoveDestinationResourceMonitored(msg)) {
408  TRACE_NTF("ACCEPTS ITEM: ACCEPTED: move destination monitored");
409  return true;
410  }
411 
412  const auto items = msg.items();
413  for (const auto &item : items) {
414  if (isMimeTypeMonitored(item.mimeType())) {
415  TRACE_NTF("ACCEPTS ITEM: ACCEPTED - mimetype monitored");
416  return true;
417  }
418  }
419 
420  TRACE_NTF("ACCEPTS ITEM: REJECTED: resource nor mimetype monitored");
421  return false;
422  }
423 
424  // we explicitly monitor that item or the collections it's in
425  const auto items = msg.items();
426  for (const auto &item : items) {
427  if (mMonitoredItems.contains(item.id())) {
428  TRACE_NTF("ACCEPTS ITEM: ACCEPTED: item explicitly monitored");
429  return true;
430  }
431  }
432 
433  if (isCollectionMonitored(msg.parentCollection())) {
434  TRACE_NTF("ACCEPTS ITEM: ACCEPTED: parent collection monitored");
435  return true;
436  }
437  if (isCollectionMonitored(msg.parentDestCollection())) {
438  TRACE_NTF("ACCEPTS ITEM: ACCEPTED: destination collection monitored");
439  return true;
440  }
441 
442  TRACE_NTF("ACCEPTS ITEM: REJECTED");
443  return false;
444 }
445 
446 bool NotificationSubscriber::acceptsCollectionNotification(const Protocol::CollectionChangeNotification &msg) const
447 {
448  // Assumes mLock being locked by caller
449 
450  const auto &collection = msg.collection();
451  if (collection.id() < 0) {
452  return false;
453  }
454 
455  // HACK: We need to dispatch notifications about disabled collections to SOME
456  // agents (that's what we have the exclusive subscription for) - but because
457  // querying each Collection from database would be expensive, we use the
458  // metadata hack to transfer this information from NotificationCollector
459  if (msg.metadata().contains("DISABLED") && (msg.operation() != Protocol::CollectionChangeNotification::Unsubscribe)
460  && !msg.changedParts().contains("ENABLED")) {
461  // Exclusive subscriber always gets it
462  // If the subscriber is not exclusive (i.e. if we got here), then the subscriber does
463  // not care about this one, so drop it
464  return mExclusive;
465  }
466 
467  if (mAllMonitored) {
468  return true;
469  }
470 
471  if (!mMonitoredTypes.isEmpty() && !mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::CollectionChanges)) {
472  return false;
473  }
474 
475  // we have a resource filter
476  if (!mMonitoredResources.isEmpty()) {
477  const bool resourceMatches = mMonitoredResources.contains(msg.resource()) || isMoveDestinationResourceMonitored(msg);
478 
479  // a bit hacky, but match the behaviour from the item case,
480  // if resource is the only thing we are filtering on, stop here, and if the resource filter matched, of course
481  if (mMonitoredMimeTypes.isEmpty() || resourceMatches) {
482  return resourceMatches;
483  }
484  // else continue
485  }
486 
487  // we explicitly monitor that collection, or all of them
488  if (isCollectionMonitored(collection.id())) {
489  return true;
490  }
491 
492  return isCollectionMonitored(msg.parentCollection()) || isCollectionMonitored(msg.parentDestCollection());
493 }
494 
495 bool NotificationSubscriber::acceptsTagNotification(const Protocol::TagChangeNotification &msg) const
496 {
497  // Assumes mLock being locked by caller
498 
499  if (msg.tag().id() < 0) {
500  return false;
501  }
502 
503  // Special handling for Tag removal notifications: When a Tag is removed,
504  // a notification is emitted for each Resource that owns the tag (i.e.
505  // each resource that owns a Tag RID - Tag RIDs are resource-specific).
506  // Additionally then we send one more notification without any RID that is
507  // destined for regular applications (which don't know anything about Tag RIDs)
508  if (msg.operation() == Protocol::TagChangeNotification::Remove) {
509  // HACK: Since have no way to determine which resource this NotificationSource
510  // belongs to, we are abusing the fact that each resource ignores it's own
511  // main session, which is called the same name as the resource.
512 
513  // If there are any ignored sessions, but this notification does not have
514  // a specific resource set, then we ignore it, as this notification is
515  // for clients, not resources (does not have tag RID)
516  if (!mIgnoredSessions.isEmpty() && msg.resource().isEmpty()) {
517  return false;
518  }
519 
520  // If this source ignores a session (i.e. we assume it is a resource),
521  // but this notification is for another resource, then we ignore it
522  if (!msg.resource().isEmpty() && !mIgnoredSessions.contains(msg.resource())) {
523  return false;
524  }
525  // Now we got here, which means that this notification either has empty
526  // resource, i.e. it is destined for a client applications, or it's
527  // destined for resource that we *think* (see the hack above) this
528  // NotificationSource belongs too. Which means we approve this notification,
529  // but it can still be discarded in the generic Tag notification filter
530  // below
531  }
532 
533  if (mAllMonitored) {
534  return true;
535  }
536 
537  if (!mMonitoredTypes.isEmpty() && !mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::TagChanges)) {
538  return false;
539  }
540 
541  if (mMonitoredTags.isEmpty()) {
542  return true;
543  }
544 
545  if (mMonitoredTags.contains(msg.tag().id())) {
546  return true;
547  }
548 
549  return true;
550 }
551 
552 bool NotificationSubscriber::acceptsRelationNotification(const Protocol::RelationChangeNotification &msg) const
553 {
554  // Assumes mLock being locked by caller
555 
556  Q_UNUSED(msg)
557 
558  if (mAllMonitored) {
559  return true;
560  }
561 
562  return !(!mMonitoredTypes.isEmpty() && !mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::RelationChanges));
563 }
564 
565 bool NotificationSubscriber::acceptsSubscriptionNotification(const Protocol::SubscriptionChangeNotification &msg) const
566 {
567  // Assumes mLock being locked by caller
568 
569  Q_UNUSED(msg)
570 
571  // Unlike other types, subscription notifications must be explicitly enabled
572  // by caller and are excluded from "monitor all" as well
573  return mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::SubscriptionChanges);
574 }
575 
576 bool NotificationSubscriber::acceptsDebugChangeNotification(const Protocol::DebugChangeNotification &msg) const
577 {
578  // Assumes mLock being locked by caller
579 
580  // We should never end up sending debug notification about a debug notification.
581  // This could get very messy very quickly...
582  Q_ASSERT(msg.notification()->type() != Protocol::Command::DebugChangeNotification);
583  if (msg.notification()->type() == Protocol::Command::DebugChangeNotification) {
584  return false;
585  }
586 
587  // Unlike other types, debug change notifications must be explicitly enabled
588  // by caller and are excluded from "monitor all" as well
589  return mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::ChangeNotifications);
590 }
591 
592 bool NotificationSubscriber::acceptsNotification(const Protocol::ChangeNotification &msg) const
593 {
594  // Assumes mLock being locked
595 
596  // Uninitialized subscriber gets nothing
597  if (mSubscriber.isEmpty()) {
598  return false;
599  }
600 
601  // session is ignored
602  // TODO: Should this affect SubscriptionChangeNotification and DebugChangeNotification?
603  if (mIgnoredSessions.contains(msg.sessionId())) {
604  return false;
605  }
606 
607  switch (msg.type()) {
608  case Protocol::Command::ItemChangeNotification:
609  return acceptsItemNotification(static_cast<const Protocol::ItemChangeNotification &>(msg));
610  case Protocol::Command::CollectionChangeNotification:
611  return acceptsCollectionNotification(static_cast<const Protocol::CollectionChangeNotification &>(msg));
612  case Protocol::Command::TagChangeNotification:
613  return acceptsTagNotification(static_cast<const Protocol::TagChangeNotification &>(msg));
614  case Protocol::Command::RelationChangeNotification:
615  return acceptsRelationNotification(static_cast<const Protocol::RelationChangeNotification &>(msg));
616  case Protocol::Command::SubscriptionChangeNotification:
617  return acceptsSubscriptionNotification(static_cast<const Protocol::SubscriptionChangeNotification &>(msg));
618  case Protocol::Command::DebugChangeNotification:
619  return acceptsDebugChangeNotification(static_cast<const Protocol::DebugChangeNotification &>(msg));
620 
621  default:
622  qCWarning(AKONADISERVER_LOG) << "NotificationSubscriber" << mSubscriber << "received an invalid notification type" << msg.type();
623  return false;
624  }
625 }
626 
627 bool NotificationSubscriber::notify(const Protocol::ChangeNotificationPtr &notification)
628 {
629  // Guard against this object being deleted while we are waiting for the lock
631  QMutexLocker locker(&mLock);
632  if (!ptr) {
633  return false;
634  }
635 
636  if (acceptsNotification(*notification)) {
637  QMetaObject::invokeMethod(this, "writeNotification", Qt::QueuedConnection, Q_ARG(Akonadi::Protocol::ChangeNotificationPtr, notification));
638  return true;
639  }
640  return false;
641 }
642 
643 void NotificationSubscriber::writeNotification(const Protocol::ChangeNotificationPtr &notification)
644 {
645  // tag chosen by fair dice roll
646  writeCommand(4, notification);
647 }
648 
649 void NotificationSubscriber::writeCommand(qint64 tag, const Protocol::CommandPtr &cmd)
650 {
651  Q_ASSERT(QThread::currentThread() == thread());
652 
653  Protocol::DataStream stream(mSocket);
654  stream << tag;
655  try {
656  Protocol::serialize(stream, cmd);
657  stream.flush();
658  if (!mSocket->waitForBytesWritten()) {
659  if (mSocket->state() == QLocalSocket::ConnectedState) {
660  qCWarning(AKONADISERVER_LOG) << "NotificationSubscriber for" << mSubscriber << ": timeout writing into stream";
661  } else {
662  // client has disconnected, just discard the message
663  }
664  }
665  } catch (const ProtocolException &e) {
666  qCWarning(AKONADISERVER_LOG) << "ProtocolException while writing into stream for subscriber" << mSubscriber << ":" << e.what();
667  }
668 }
int count(const T &value) const const
void reserve(int alloc)
QMimeType mimeTypeForName(const QString &nameOrAlias) const const
void disconnected()
void readyRead()
QThread * currentThread()
QueuedConnection
bool invokeMethod(QObject *obj, const char *member, Qt::ConnectionType type, QGenericReturnArgument ret, QGenericArgument val0, QGenericArgument val1, QGenericArgument val2, QGenericArgument val3, QGenericArgument val4, QGenericArgument val5, QGenericArgument val6, QGenericArgument val7, QGenericArgument val8, QGenericArgument val9)
QList::iterator begin()
QList::iterator end()
Helper integration between Akonadi and Qt.
This file is part of the KDE documentation.
Documentation copyright © 1996-2022 The KDE developers.
Generated on Sat Jul 2 2022 06:41:48 by doxygen 1.8.17 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.