Akonadi

notificationmanager.cpp
1 /*
2  SPDX-FileCopyrightText: 2006-2007 Volker Krause <[email protected]>
3  SPDX-FileCopyrightText: 2010 Michael Jansen <[email protected]>
4 
5  SPDX-License-Identifier: LGPL-2.0-or-later
6 */
7 
8 #include "notificationmanager.h"
9 #include "aggregatedfetchscope.h"
10 #include "akonadiserver_debug.h"
11 #include "handlerhelper.h"
12 #include "notificationsubscriber.h"
13 #include "storage/collectionstatistics.h"
14 #include "storage/notificationcollector.h"
15 #include "tracer.h"
16 
17 #include <private/scope_p.h>
18 #include <private/standarddirs_p.h>
19 #include <shared/akranges.h>
20 
21 #include <QDateTime>
22 #include <QSettings>
23 #include <QThreadPool>
24 #include <QTimer>
25 
26 using namespace Akonadi;
27 using namespace Akonadi::Server;
28 using namespace AkRanges;
29 
30 NotificationManager::NotificationManager(StartMode startMode)
31  : AkThread(QStringLiteral("NotificationManager"), startMode)
32  , mTimer(nullptr)
33  , mNotifyThreadPool(nullptr)
34  , mDebugNotifications(0)
35 {
36 }
37 
38 NotificationManager::~NotificationManager()
39 {
40  quitThread();
41 }
42 
43 void NotificationManager::init()
44 {
45  AkThread::init();
46 
47  const QString serverConfigFile = StandardDirs::serverConfigFile(StandardDirs::ReadWrite);
48  QSettings settings(serverConfigFile, QSettings::IniFormat);
49 
50  mTimer = new QTimer(this);
51  mTimer->setInterval(settings.value(QStringLiteral("NotificationManager/Interval"), 50).toInt());
52  mTimer->setSingleShot(true);
53  connect(mTimer, &QTimer::timeout, this, &NotificationManager::emitPendingNotifications);
54 
55  mNotifyThreadPool = new QThreadPool(this);
56  mNotifyThreadPool->setMaxThreadCount(5);
57 
58  mCollectionFetchScope = new AggregatedCollectionFetchScope();
59  mItemFetchScope = new AggregatedItemFetchScope();
60  mTagFetchScope = new AggregatedTagFetchScope();
61 }
62 
63 void NotificationManager::quit()
64 {
65  mQuitting = true;
66 
67  mTimer->stop();
68  delete mTimer;
69 
70  mNotifyThreadPool->clear();
71  mNotifyThreadPool->waitForDone();
72  delete mNotifyThreadPool;
73 
74  qDeleteAll(mSubscribers);
75 
76  delete mCollectionFetchScope;
77  delete mItemFetchScope;
78  delete mTagFetchScope;
79 
80  AkThread::quit();
81 }
82 
83 void NotificationManager::registerConnection(quintptr socketDescriptor)
84 {
85  Q_ASSERT(thread() == QThread::currentThread());
86 
87  auto subscriber = new NotificationSubscriber(this, socketDescriptor);
88  qCInfo(AKONADISERVER_LOG) << "New notification connection (registered as" << subscriber << ")";
89  connect(subscriber, &NotificationSubscriber::notificationDebuggingChanged, this, [this](bool enabled) {
90  if (enabled) {
91  ++mDebugNotifications;
92  } else {
93  --mDebugNotifications;
94  }
95  Q_ASSERT(mDebugNotifications >= 0);
96  Q_ASSERT(mDebugNotifications <= mSubscribers.count());
97  });
98 
99  mSubscribers.push_back(subscriber);
100 }
101 void NotificationManager::forgetSubscriber(NotificationSubscriber *subscriber)
102 {
103  Q_ASSERT(QThread::currentThread() == thread());
104  mSubscribers.removeAll(subscriber);
105 }
106 
107 void NotificationManager::slotNotify(const Protocol::ChangeNotificationList &msgs)
108 {
109  Q_ASSERT(QThread::currentThread() == thread());
110  for (const auto &msg : msgs) {
111  switch (msg->type()) {
112  case Protocol::Command::CollectionChangeNotification:
113  Protocol::CollectionChangeNotification::appendAndCompress(mNotifications, msg);
114  continue;
115  case Protocol::Command::ItemChangeNotification:
116  case Protocol::Command::TagChangeNotification:
117  case Protocol::Command::RelationChangeNotification:
118  case Protocol::Command::SubscriptionChangeNotification:
119  case Protocol::Command::DebugChangeNotification:
120  mNotifications.push_back(msg);
121  continue;
122 
123  default:
124  Q_ASSERT_X(false, "slotNotify", "Invalid notification type!");
125  continue;
126  }
127  }
128 
129  if (!mTimer->isActive()) {
130  mTimer->start();
131  }
132 }
133 
134 class NotifyRunnable : public QRunnable
135 {
136 public:
137  explicit NotifyRunnable(NotificationSubscriber *subscriber, const Protocol::ChangeNotificationList &notifications)
138  : mSubscriber(subscriber)
139  , mNotifications(notifications)
140  {
141  }
142 
143  ~NotifyRunnable() override = default;
144 
145  void run() override
146  {
147  for (const auto &ntf : std::as_const(mNotifications)) {
148  if (mSubscriber) {
149  mSubscriber->notify(ntf);
150  } else {
151  break;
152  }
153  }
154  }
155 
156 private:
157  Q_DISABLE_COPY_MOVE(NotifyRunnable)
158 
160  Protocol::ChangeNotificationList mNotifications;
161 };
162 
163 void NotificationManager::emitPendingNotifications()
164 {
165  Q_ASSERT(QThread::currentThread() == thread());
166 
167  if (mNotifications.isEmpty()) {
168  return;
169  }
170 
171  if (mDebugNotifications == 0) {
172  mSubscribers | Views::filter(IsNotNull) | Actions::forEach([this](const auto &subscriber) {
173  mNotifyThreadPool->start(new NotifyRunnable(subscriber, mNotifications));
174  });
175  } else {
176  // When debugging notification we have to use a non-threaded approach
177  // so that we can work with return value of notify()
178  for (const auto &notification : std::as_const(mNotifications)) {
179  QVector<QByteArray> listeners;
180  for (NotificationSubscriber *subscriber : std::as_const(mSubscribers)) {
181  if (subscriber && subscriber->notify(notification)) {
182  listeners.push_back(subscriber->subscriber());
183  }
184  }
185 
186  emitDebugNotification(notification, listeners);
187  }
188  }
189 
190  mNotifications.clear();
191 }
192 
193 void NotificationManager::emitDebugNotification(const Protocol::ChangeNotificationPtr &ntf, const QVector<QByteArray> &listeners)
194 {
195  auto debugNtf = Protocol::DebugChangeNotificationPtr::create();
196  debugNtf->setNotification(ntf);
197  debugNtf->setListeners(listeners);
198  debugNtf->setTimestamp(QDateTime::currentMSecsSinceEpoch());
199  mSubscribers | Views::filter(IsNotNull) | Actions::forEach([this, &debugNtf](const auto &subscriber) {
200  mNotifyThreadPool->start(new NotifyRunnable(subscriber, {debugNtf}));
201  });
202 }
QFuture< T > run(Function function,...)
void push_back(const T &value)
qint64 currentMSecsSinceEpoch()
void timeout()
QThread * currentThread()
Helper integration between Akonadi and Qt.
This file is part of the KDE documentation.
Documentation copyright © 1996-2023 The KDE developers.
Generated on Sun Jun 4 2023 03:52:47 by doxygen 1.8.17 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.