Akonadi

server/notificationsubscriber.cpp
1 /*
2  Copyright (c) 2015 Daniel Vrátil <[email protected]>
3 
4  This library is free software; you can redistribute it and/or modify it
5  under the terms of the GNU Library General Public License as published by
6  the Free Software Foundation; either version 2 of the License, or (at your
7  option) any later version.
8 
9  This library is distributed in the hope that it will be useful, but WITHOUT
10  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
12  License for more details.
13 
14  You should have received a copy of the GNU Library General Public License
15  along with this library; see the file COPYING.LIB. If not, write to the
16  Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17  02110-1301, USA.
18 */
19 
20 #include "notificationsubscriber.h"
21 #include "akonadiserver_debug.h"
22 #include "notificationmanager.h"
23 #include "aggregatedfetchscope.h"
24 #include "utils.h"
25 
26 #include <QLocalSocket>
27 #include <QPointer>
28 
29 #include <shared/akranges.h>
30 #include <private/protocol_p.h>
31 #include <private/datastream_p_p.h>
32 #include <private/protocol_exception_p.h>
33 
34 using namespace Akonadi;
35 using namespace Akonadi::Server;
36 using namespace AkRanges;
37 
38 QMimeDatabase NotificationSubscriber::sMimeDatabase;
39 
40 #define TRACE_NTF(x)
41 //#define TRACE_NTF(x) qCDebug(AKONADISERVER_LOG) << mSubscriber << x
42 
43 NotificationSubscriber::NotificationSubscriber(NotificationManager *manager)
44  : QObject()
45  , mManager(manager)
46  , mSocket(nullptr)
47  , mAllMonitored(false)
48  , mExclusive(false)
49  , mNotificationDebugging(false)
50 {
51  if (mManager) {
52  mManager->itemFetchScope()->addSubscriber();
53  mManager->collectionFetchScope()->addSubscriber();
54  mManager->tagFetchScope()->addSubscriber();
55  }
56 }
57 
58 NotificationSubscriber::NotificationSubscriber(NotificationManager *manager, quintptr socketDescriptor)
59  : NotificationSubscriber(manager)
60 {
61  mSocket = new QLocalSocket(this);
62  connect(mSocket, &QLocalSocket::readyRead,
63  this, &NotificationSubscriber::handleIncomingData);
64  connect(mSocket, &QLocalSocket::disconnected,
65  this, &NotificationSubscriber::socketDisconnected);
66  mSocket->setSocketDescriptor(socketDescriptor);
67 
68  const SchemaVersion schema = SchemaVersion::retrieveAll().at(0);
69 
70  auto hello = Protocol::HelloResponsePtr::create();
71  hello->setServerName(QStringLiteral("Akonadi"));
72  hello->setMessage(QStringLiteral("Not really IMAP server"));
73  hello->setProtocolVersion(Protocol::version());
74  hello->setGeneration(schema.generation());
75  writeCommand(0, hello);
76 }
77 
78 NotificationSubscriber::~NotificationSubscriber()
79 {
80  QMutexLocker locker(&mLock);
81 
82  if (mNotificationDebugging) {
83  Q_EMIT notificationDebuggingChanged(false);
84  }
85 }
86 
87 void NotificationSubscriber::handleIncomingData()
88 {
89  while (mSocket->bytesAvailable() > (int) sizeof(qint64)) {
90  Protocol::DataStream stream(mSocket);
91 
92  // Ignored atm
93  qint64 tag = -1;
94  stream >> tag;
95 
97  try {
98  cmd = Protocol::deserialize(mSocket);
99  } catch (const Akonadi::ProtocolException &e) {
100  qCWarning(AKONADISERVER_LOG) << "ProtocolException while reading from notification bus for" << mSubscriber << ":" << e.what();
101  disconnectSubscriber();
102  return;
103  } catch (const std::exception &e) {
104  qCWarning(AKONADISERVER_LOG) << "Unknown exception while reading from notification bus for" << mSubscriber << ":" << e.what();
105  disconnectSubscriber();
106  return;
107  }
108  if (cmd->type() == Protocol::Command::Invalid) {
109  qCWarning(AKONADISERVER_LOG) << "Invalid command while reading from notification bus for " << mSubscriber << ", resetting connection";
110  disconnectSubscriber();
111  return;
112  }
113 
114  switch (cmd->type()) {
115  case Protocol::Command::CreateSubscription:
116  registerSubscriber(Protocol::cmdCast<Protocol::CreateSubscriptionCommand>(cmd));
117  writeCommand(tag, Protocol::CreateSubscriptionResponsePtr::create());
118  break;
119  case Protocol::Command::ModifySubscription:
120  if (mSubscriber.isEmpty()) {
121  qCWarning(AKONADISERVER_LOG) << "Notification subscriber received ModifySubscription command before RegisterSubscriber";
122  disconnectSubscriber();
123  return;
124  }
125  modifySubscription(Protocol::cmdCast<Protocol::ModifySubscriptionCommand>(cmd));
126  writeCommand(tag, Protocol::ModifySubscriptionResponsePtr::create());
127  break;
128  case Protocol::Command::Logout:
129  disconnectSubscriber();
130  break;
131  default:
132  qCWarning(AKONADISERVER_LOG) << "Notification subscriber for" << mSubscriber << "received an invalid command" << cmd->type();
133  disconnectSubscriber();
134  break;
135  }
136  }
137 }
138 
139 void NotificationSubscriber::socketDisconnected()
140 {
141  qCInfo(AKONADISERVER_LOG) << "Subscriber" << mSubscriber << "disconnected";
142  disconnectSubscriber();
143 }
144 
145 void NotificationSubscriber::disconnectSubscriber()
146 {
147  QMutexLocker locker(&mLock);
148 
149  auto changeNtf = Protocol::SubscriptionChangeNotificationPtr::create();
150  changeNtf->setSubscriber(mSubscriber);
151  changeNtf->setSessionId(mSession);
152  changeNtf->setOperation(Protocol::SubscriptionChangeNotification::Remove);
153  mManager->slotNotify({ changeNtf });
154 
155  if (mSocket) {
156  disconnect(mSocket, &QLocalSocket::disconnected,
157  this, &NotificationSubscriber::socketDisconnected);
158  mSocket->close();
159  }
160 
161  // Unregister ourselves from the aggregated collection fetch scope
162  auto cfs = mManager->collectionFetchScope();
163  cfs->apply(mCollectionFetchScope, Protocol::CollectionFetchScope());
164  cfs->removeSubscriber();
165 
166  auto tfs = mManager->tagFetchScope();
167  tfs->apply(mTagFetchScope, Protocol::TagFetchScope());
168  tfs->removeSubscriber();
169 
170  auto ifs = mManager->itemFetchScope();
171  ifs->apply(mItemFetchScope, Protocol::ItemFetchScope());
172  ifs->removeSubscriber();
173 
174  mManager->forgetSubscriber(this);
175  deleteLater();
176 }
177 
178 void NotificationSubscriber::registerSubscriber(const Protocol::CreateSubscriptionCommand &command)
179 {
180  QMutexLocker locker(&mLock);
181 
182  qCInfo(AKONADISERVER_LOG) << "Subscriber" << this << "identified as" << command.subscriberName();
183  mSubscriber = command.subscriberName();
184  mSession = command.session();
185 
186  auto changeNtf = Protocol::SubscriptionChangeNotificationPtr::create();
187  changeNtf->setSubscriber(mSubscriber);
188  changeNtf->setSessionId(mSession);
189  changeNtf->setOperation(Protocol::SubscriptionChangeNotification::Add);
190  mManager->slotNotify({ changeNtf });
191 }
192 
193 void NotificationSubscriber::modifySubscription(const Protocol::ModifySubscriptionCommand &command)
194 {
195  QMutexLocker locker(&mLock);
196 
197  const auto modifiedParts = command.modifiedParts();
198 
199 #define START_MONITORING(type) \
200  (modifiedParts & Protocol::ModifySubscriptionCommand::ModifiedParts( \
201  Protocol::ModifySubscriptionCommand::type | Protocol::ModifySubscriptionCommand::Add))
202 #define STOP_MONITORING(type) \
203  (modifiedParts & Protocol::ModifySubscriptionCommand::ModifiedParts( \
204  Protocol::ModifySubscriptionCommand::type | Protocol::ModifySubscriptionCommand::Remove))
205 
206 #define APPEND(set, newItems) \
207  Q_FOREACH (const auto &entity, newItems) { \
208  set.insert(entity); \
209  }
210 
211 #define REMOVE(set, items) \
212  Q_FOREACH (const auto &entity, items) { \
213  set.remove(entity); \
214  }
215 
216  if (START_MONITORING(Types)) {
217  APPEND(mMonitoredTypes, command.startMonitoringTypes())
218  }
219  if (STOP_MONITORING(Types)) {
220  REMOVE(mMonitoredTypes, command.stopMonitoringTypes())
221  }
222  if (START_MONITORING(Collections)) {
223  APPEND(mMonitoredCollections, command.startMonitoringCollections())
224  }
225  if (STOP_MONITORING(Collections)) {
226  REMOVE(mMonitoredCollections, command.stopMonitoringCollections())
227  }
228  if (START_MONITORING(Items)) {
229  APPEND(mMonitoredItems, command.startMonitoringItems())
230  }
231  if (STOP_MONITORING(Items)) {
232  REMOVE(mMonitoredItems, command.stopMonitoringItems())
233  }
234  if (START_MONITORING(Tags)) {
235  APPEND(mMonitoredTags, command.startMonitoringTags())
236  }
237  if (STOP_MONITORING(Tags)) {
238  REMOVE(mMonitoredTags, command.stopMonitoringTags())
239  }
240  if (START_MONITORING(Resources)) {
241  APPEND(mMonitoredResources, command.startMonitoringResources())
242  }
243  if (STOP_MONITORING(Resources)) {
244  REMOVE(mMonitoredResources, command.stopMonitoringResources())
245  }
246  if (START_MONITORING(MimeTypes)) {
247  APPEND(mMonitoredMimeTypes, command.startMonitoringMimeTypes())
248  }
249  if (STOP_MONITORING(MimeTypes)) {
250  REMOVE(mMonitoredMimeTypes, command.stopMonitoringMimeTypes())
251  }
252  if (START_MONITORING(Sessions)) {
253  APPEND(mIgnoredSessions, command.startIgnoringSessions())
254  }
255  if (STOP_MONITORING(Sessions)) {
256  REMOVE(mIgnoredSessions, command.stopIgnoringSessions())
257  }
258  if (modifiedParts & Protocol::ModifySubscriptionCommand::AllFlag) {
259  mAllMonitored = command.allMonitored();
260  }
261  if (modifiedParts & Protocol::ModifySubscriptionCommand::ExclusiveFlag) {
262  mExclusive = command.isExclusive();
263  }
264  if (modifiedParts & Protocol::ModifySubscriptionCommand::ItemFetchScope) {
265  const auto newScope = command.itemFetchScope();
266  mManager->itemFetchScope()->apply(mItemFetchScope, newScope);
267  mItemFetchScope = newScope;
268  }
269  if (modifiedParts & Protocol::ModifySubscriptionCommand::CollectionFetchScope) {
270  const auto newScope = command.collectionFetchScope();
271  mManager->collectionFetchScope()->apply(mCollectionFetchScope, newScope);
272  mCollectionFetchScope = newScope;
273  }
274  if (modifiedParts & Protocol::ModifySubscriptionCommand::TagFetchScope) {
275  const auto newScope = command.tagFetchScope();
276  mManager->tagFetchScope()->apply(mTagFetchScope, newScope);
277  mTagFetchScope = newScope;
278  if (!newScope.fetchIdOnly())
279  Q_ASSERT(!mManager->tagFetchScope()->fetchIdOnly());
280  }
281 
282  if (mManager) {
283  if (modifiedParts & Protocol::ModifySubscriptionCommand::Types) {
284  // Did the caller just subscribed to subscription changes?
285  if (command.startMonitoringTypes().contains(Protocol::ModifySubscriptionCommand::SubscriptionChanges)) {
286  // If yes, then send them list of all existing subscribers
287  mManager->mSubscribers
288  | Views::filter(IsNotNull)
289  | Actions::forEach([this](const auto &subscriber) {
290  QMetaObject::invokeMethod(this, "notify", Qt::QueuedConnection,
292  subscriber->toChangeNotification()));
293  });
294  }
295  if (command.startMonitoringTypes().contains(Protocol::ModifySubscriptionCommand::ChangeNotifications)) {
296  if (!mNotificationDebugging) {
297  mNotificationDebugging = true;
298  Q_EMIT notificationDebuggingChanged(true);
299  }
300  } else if (command.stopMonitoringTypes().contains(Protocol::ModifySubscriptionCommand::ChangeNotifications)) {
301  if (mNotificationDebugging) {
302  mNotificationDebugging = false;
303  Q_EMIT notificationDebuggingChanged(false);
304  }
305  }
306  }
307 
308  // Emit subscription change notification
309  auto changeNtf = toChangeNotification();
310  changeNtf->setOperation(Protocol::SubscriptionChangeNotification::Modify);
311  mManager->slotNotify({ changeNtf });
312  }
313 
314 #undef START_MONITORING
315 #undef STOP_MONITORING
316 #undef APPEND
317 #undef REMOVE
318 }
319 
320 Protocol::SubscriptionChangeNotificationPtr NotificationSubscriber::toChangeNotification() const
321 {
322  // Assumes mLock being locked by caller
323 
324  auto ntf = Protocol::SubscriptionChangeNotificationPtr::create();
325  ntf->setSessionId(mSession);
326  ntf->setSubscriber(mSubscriber);
327  ntf->setOperation(Protocol::SubscriptionChangeNotification::Add);
328  ntf->setCollections(mMonitoredCollections);
329  ntf->setItems(mMonitoredItems);
330  ntf->setTags(mMonitoredTags);
331  ntf->setTypes(mMonitoredTypes);
332  ntf->setMimeTypes(mMonitoredMimeTypes);
333  ntf->setResources(mMonitoredResources);
334  ntf->setIgnoredSessions(mIgnoredSessions);
335  ntf->setAllMonitored(mAllMonitored);
336  ntf->setExclusive(mExclusive);
337  ntf->setItemFetchScope(mItemFetchScope);
338  ntf->setTagFetchScope(mTagFetchScope);
339  ntf->setCollectionFetchScope(mCollectionFetchScope);
340  return ntf;
341 }
342 
343 bool NotificationSubscriber::isCollectionMonitored(Entity::Id id) const
344 {
345  // Assumes mLock being locked by caller
346 
347  if (id < 0) {
348  return false;
349  } else if (mMonitoredCollections.contains(id)) {
350  return true;
351  } else if (mMonitoredCollections.contains(0)) {
352  return true;
353  }
354  return false;
355 }
356 
357 bool NotificationSubscriber::isMimeTypeMonitored(const QString &mimeType) const
358 {
359  // Assumes mLock being locked by caller
360 
361  const QMimeType mt = sMimeDatabase.mimeTypeForName(mimeType);
362  if (mMonitoredMimeTypes.contains(mimeType)) {
363  return true;
364  }
365 
366  const QStringList lst = mt.aliases();
367  return std::any_of(lst.cbegin(), lst.cend(),
368  [this](const auto &mt) {
369  return mMonitoredMimeTypes.contains(mt);
370  });
371 }
372 
373 bool NotificationSubscriber::isMoveDestinationResourceMonitored(const Protocol::ItemChangeNotification &msg) const
374 {
375  // Assumes mLock being locked by caller
376 
377  if (msg.operation() != Protocol::ItemChangeNotification::Move) {
378  return false;
379  }
380  return mMonitoredResources.contains(msg.destinationResource());
381 }
382 
383 bool NotificationSubscriber::isMoveDestinationResourceMonitored(const Protocol::CollectionChangeNotification &msg) const
384 {
385  // Assumes mLock being locked by caller
386 
387  if (msg.operation() != Protocol::CollectionChangeNotification::Move) {
388  return false;
389  }
390  return mMonitoredResources.contains(msg.destinationResource());
391 }
392 
393 bool NotificationSubscriber::acceptsItemNotification(const Protocol::ItemChangeNotification &msg) const
394 {
395  // Assumes mLock being locked by caller
396 
397  if (msg.items().isEmpty()) {
398  return false;
399  }
400 
401  if (mAllMonitored) {
402  TRACE_NTF("ACCEPTS ITEM: all monitored");
403  return true;
404  }
405 
406  if (!mMonitoredTypes.isEmpty() && !mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::ItemChanges)) {
407  TRACE_NTF("ACCEPTS ITEM: REJECTED - Item changes not monitored");
408  return false;
409  }
410 
411  // we have a resource or mimetype filter
412  if (!mMonitoredResources.isEmpty() || !mMonitoredMimeTypes.isEmpty()) {
413  if (mMonitoredResources.contains(msg.resource())) {
414  TRACE_NTF("ACCEPTS ITEM: ACCEPTED - resource monitored");
415  return true;
416  }
417 
418  if (isMoveDestinationResourceMonitored(msg)) {
419  TRACE_NTF("ACCEPTS ITEM: ACCEPTED: move destination monitored");
420  return true;
421  }
422 
423  Q_FOREACH (const auto &item, msg.items()) {
424  if (isMimeTypeMonitored(item.mimeType())) {
425  TRACE_NTF("ACCEPTS ITEM: ACCEPTED - mimetype monitored");
426  return true;
427  }
428  }
429 
430  TRACE_NTF("ACCEPTS ITEM: REJECTED: resource nor mimetype monitored");
431  return false;
432  }
433 
434  // we explicitly monitor that item or the collections it's in
435  Q_FOREACH (const auto &item, msg.items()) {
436  if (mMonitoredItems.contains(item.id())) {
437  TRACE_NTF("ACCEPTS ITEM: ACCEPTED: item explicitly monitored");
438  return true;
439  }
440  }
441 
442  if (isCollectionMonitored(msg.parentCollection())) {
443  TRACE_NTF("ACCEPTS ITEM: ACCEPTED: parent collection monitored");
444  return true;
445  }
446  if (isCollectionMonitored(msg.parentDestCollection())) {
447  TRACE_NTF("ACCEPTS ITEM: ACCEPTED: destination collection monitored");
448  return true;
449  }
450 
451  TRACE_NTF("ACCEPTS ITEM: REJECTED");
452  return false;
453 }
454 
455 bool NotificationSubscriber::acceptsCollectionNotification(const Protocol::CollectionChangeNotification &msg) const
456 {
457  // Assumes mLock being locked by caller
458 
459  const auto &collection = msg.collection();
460  if (collection.id() < 0) {
461  return false;
462  }
463 
464  // HACK: We need to dispatch notifications about disabled collections to SOME
465  // agents (that's what we have the exclusive subscription for) - but because
466  // querying each Collection from database would be expensive, we use the
467  // metadata hack to transfer this information from NotificationCollector
468  if (msg.metadata().contains("DISABLED")
469  && (msg.operation() != Protocol::CollectionChangeNotification::Unsubscribe)
470  && !msg.changedParts().contains("ENABLED")) {
471  // Exclusive subscriber always gets it
472  if (mExclusive) {
473  return true;
474  }
475 
476  // If the subscriber is not exclusive (i.e. if we got here), then the subscriber does
477  // not care about this one, so drop it
478  return false;
479  }
480 
481  if (mAllMonitored) {
482  return true;
483  }
484 
485  if (!mMonitoredTypes.isEmpty() && !mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::CollectionChanges)) {
486  return false;
487  }
488 
489  // we have a resource filter
490  if (!mMonitoredResources.isEmpty()) {
491  const bool resourceMatches = mMonitoredResources.contains(msg.resource())
492  || isMoveDestinationResourceMonitored(msg);
493 
494  // a bit hacky, but match the behaviour from the item case,
495  // if resource is the only thing we are filtering on, stop here, and if the resource filter matched, of course
496  if (mMonitoredMimeTypes.isEmpty() || resourceMatches) {
497  return resourceMatches;
498  }
499  // else continue
500  }
501 
502  // we explicitly monitor that collection, or all of them
503  if (isCollectionMonitored(collection.id())) {
504  return true;
505  }
506 
507  return isCollectionMonitored(msg.parentCollection())
508  || isCollectionMonitored(msg.parentDestCollection());
509 
510 }
511 
512 bool NotificationSubscriber::acceptsTagNotification(const Protocol::TagChangeNotification &msg) const
513 {
514  // Assumes mLock being locked by caller
515 
516  if (msg.tag().id() < 0) {
517  return false;
518  }
519 
520  // Special handling for Tag removal notifications: When a Tag is removed,
521  // a notification is emitted for each Resource that owns the tag (i.e.
522  // each resource that owns a Tag RID - Tag RIDs are resource-specific).
523  // Additionally then we send one more notification without any RID that is
524  // destined for regular applications (which don't know anything about Tag RIDs)
525  if (msg.operation() == Protocol::TagChangeNotification::Remove) {
526  // HACK: Since have no way to determine which resource this NotificationSource
527  // belongs to, we are abusing the fact that each resource ignores it's own
528  // main session, which is called the same name as the resource.
529 
530  // If there are any ignored sessions, but this notification does not have
531  // a specific resource set, then we ignore it, as this notification is
532  // for clients, not resources (does not have tag RID)
533  if (!mIgnoredSessions.isEmpty() && msg.resource().isEmpty()) {
534  return false;
535  }
536 
537  // If this source ignores a session (i.e. we assume it is a resource),
538  // but this notification is for another resource, then we ignore it
539  if (!msg.resource().isEmpty() && !mIgnoredSessions.contains(msg.resource())) {
540  return false;
541  }
542  // Now we got here, which means that this notification either has empty
543  // resource, i.e. it is destined for a client applications, or it's
544  // destined for resource that we *think* (see the hack above) this
545  // NotificationSource belongs too. Which means we approve this notification,
546  // but it can still be discarded in the generic Tag notification filter
547  // below
548  }
549 
550  if (mAllMonitored) {
551  return true;
552  }
553 
554  if (!mMonitoredTypes.isEmpty() && !mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::TagChanges)) {
555  return false;
556  }
557 
558  if (mMonitoredTags.isEmpty()) {
559  return true;
560  }
561 
562  if (mMonitoredTags.contains(msg.tag().id())) {
563  return true;
564  }
565 
566  return true;
567 }
568 
569 bool NotificationSubscriber::acceptsRelationNotification(const Protocol::RelationChangeNotification &msg) const
570 {
571  // Assumes mLock being locked by caller
572 
573  Q_UNUSED(msg);
574 
575  if (mAllMonitored) {
576  return true;
577  }
578 
579  if (!mMonitoredTypes.isEmpty() && !mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::RelationChanges)) {
580  return false;
581  }
582 
583  return true;
584 }
585 
586 bool NotificationSubscriber::acceptsSubscriptionNotification(const Protocol::SubscriptionChangeNotification &msg) const
587 {
588  // Assumes mLock being locked by caller
589 
590  Q_UNUSED(msg);
591 
592  // Unlike other types, subscription notifications must be explicitly enabled
593  // by caller and are excluded from "monitor all" as well
594  return mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::SubscriptionChanges);
595 }
596 
597 bool NotificationSubscriber::acceptsDebugChangeNotification(const Protocol::DebugChangeNotification &msg) const
598 {
599  // Assumes mLock being locked by caller
600 
601  // We should never end up sending debug notification about a debug notification.
602  // This could get very messy very quickly...
603  Q_ASSERT(msg.notification()->type() != Protocol::Command::DebugChangeNotification);
604  if (msg.notification()->type() == Protocol::Command::DebugChangeNotification) {
605  return false;
606  }
607 
608  // Unlike other types, debug change notifications must be explicitly enabled
609  // by caller and are excluded from "monitor all" as well
610  return mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::ChangeNotifications);
611 }
612 
613 bool NotificationSubscriber::acceptsNotification(const Protocol::ChangeNotification &msg) const
614 {
615  // Assumes mLock being locked
616 
617  // Uninitialized subscriber gets nothing
618  if (mSubscriber.isEmpty()) {
619  return false;
620  }
621 
622  // session is ignored
623  // TODO: Should this affect SubscriptionChangeNotification and DebugChangeNotification?
624  if (mIgnoredSessions.contains(msg.sessionId())) {
625  return false;
626  }
627 
628  switch (msg.type()) {
629  case Protocol::Command::ItemChangeNotification:
630  return acceptsItemNotification(static_cast<const Protocol::ItemChangeNotification &>(msg));
631  case Protocol::Command::CollectionChangeNotification:
632  return acceptsCollectionNotification(static_cast<const Protocol::CollectionChangeNotification &>(msg));
633  case Protocol::Command::TagChangeNotification:
634  return acceptsTagNotification(static_cast<const Protocol::TagChangeNotification &>(msg));
635  case Protocol::Command::RelationChangeNotification:
636  return acceptsRelationNotification(static_cast<const Protocol::RelationChangeNotification &>(msg));
637  case Protocol::Command::SubscriptionChangeNotification:
638  return acceptsSubscriptionNotification(static_cast<const Protocol::SubscriptionChangeNotification &>(msg));
639  case Protocol::Command::DebugChangeNotification:
640  return acceptsDebugChangeNotification(static_cast<const Protocol::DebugChangeNotification &>(msg));
641 
642  default:
643  qCWarning(AKONADISERVER_LOG) << "NotificationSubscriber" << mSubscriber << "received an invalid notification type" << msg.type();
644  return false;
645  }
646 }
647 
648 bool NotificationSubscriber::notify(const Protocol::ChangeNotificationPtr &notification)
649 {
650  // Guard against this object being deleted while we are waiting for the lock
652  QMutexLocker locker(&mLock);
653  if (!ptr) {
654  return false;
655  }
656 
657  if (acceptsNotification(*notification)) {
658  QMetaObject::invokeMethod(this, "writeNotification", Qt::QueuedConnection,
659  Q_ARG(Akonadi::Protocol::ChangeNotificationPtr, notification));
660  return true;
661  }
662  return false;
663 }
664 
665 void NotificationSubscriber::writeNotification(const Protocol::ChangeNotificationPtr &notification)
666 {
667  // tag chosen by fair dice roll
668  writeCommand(4, notification);
669 }
670 
671 void NotificationSubscriber::writeCommand(qint64 tag, const Protocol::CommandPtr &cmd)
672 {
673  Q_ASSERT(QThread::currentThread() == thread());
674 
675  Protocol::DataStream stream(mSocket);
676  stream << tag;
677  try {
678  Protocol::serialize(stream, cmd);
679  stream.flush();
680  if (!mSocket->waitForBytesWritten()) {
681  if (mSocket->state() == QLocalSocket::ConnectedState) {
682  qCWarning(AKONADISERVER_LOG) << "NotificationSubscriber for" << mSubscriber << ": timeout writing into stream";
683  } else {
684  // client has disconnected, just discard the message
685  }
686  }
687  } catch (const ProtocolException &e) {
688  qCWarning(AKONADISERVER_LOG) << "ProtocolException while writing into stream for subscriber" << mSubscriber << ":" << e.what();
689  }
690 }
bool invokeMethod(QObject *obj, const char *member, Qt::ConnectionType type, QGenericReturnArgument ret, QGenericArgument val0, QGenericArgument val1, QGenericArgument val2, QGenericArgument val3, QGenericArgument val4, QGenericArgument val5, QGenericArgument val6, QGenericArgument val7, QGenericArgument val8, QGenericArgument val9)
QList::const_iterator cend() const const
QList::const_iterator cbegin() const const
QThread * currentThread()
Helper integration between Akonadi and Qt.
void disconnected()
void readyRead()
This file is part of the KDE documentation.
Documentation copyright © 1996-2020 The KDE developers.
Generated on Fri Jun 5 2020 23:08:55 by doxygen 1.8.11 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.