Akonadi

notificationmanager.cpp
1 /*
2  Copyright (c) 2006 - 2007 Volker Krause <[email protected]>
3  Copyright (c) 2010 Michael Jansen <[email protected]>
4 
5  This library is free software; you can redistribute it and/or modify it
6  under the terms of the GNU Library General Public License as published by
7  the Free Software Foundation; either version 2 of the License, or (at your
8  option) any later version.
9 
10  This library is distributed in the hope that it will be useful, but WITHOUT
11  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
13  License for more details.
14 
15  You should have received a copy of the GNU Library General Public License
16  along with this library; see the file COPYING.LIB. If not, write to the
17  Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18  02110-1301, USA.
19 */
20 
21 #include "notificationmanager.h"
22 #include "notificationsubscriber.h"
23 #include "storage/notificationcollector.h"
24 #include "tracer.h"
25 #include "akonadiserver_debug.h"
26 #include "aggregatedfetchscope.h"
27 #include "storage/collectionstatistics.h"
28 #include "handlerhelper.h"
29 
30 #include <shared/akranges.h>
31 #include <private/standarddirs_p.h>
32 #include <private/scope_p.h>
33 
34 #include <QSettings>
35 #include <QThreadPool>
36 #include <QPointer>
37 #include <QDateTime>
38 
39 using namespace Akonadi;
40 using namespace Akonadi::Server;
41 using namespace AkRanges;
42 
43 NotificationManager::NotificationManager(StartMode startMode)
44  : AkThread(QStringLiteral("NotificationManager"), startMode)
45  , mTimer(nullptr)
46  , mNotifyThreadPool(nullptr)
47  , mDebugNotifications(0)
48 {
49 }
50 
51 NotificationManager::~NotificationManager()
52 {
53  quitThread();
54 }
55 
56 void NotificationManager::init()
57 {
58  AkThread::init();
59 
60  const QString serverConfigFile = StandardDirs::serverConfigFile(StandardDirs::ReadWrite);
61  QSettings settings(serverConfigFile, QSettings::IniFormat);
62 
63  mTimer = new QTimer(this);
64  mTimer->setInterval(settings.value(QStringLiteral("NotificationManager/Interval"), 50).toInt());
65  mTimer->setSingleShot(true);
66  connect(mTimer, &QTimer::timeout,
67  this, &NotificationManager::emitPendingNotifications);
68 
69  mNotifyThreadPool = new QThreadPool(this);
70  mNotifyThreadPool->setMaxThreadCount(5);
71 
72  mCollectionFetchScope = new AggregatedCollectionFetchScope();
73  mItemFetchScope = new AggregatedItemFetchScope();
74  mTagFetchScope = new AggregatedTagFetchScope();
75 }
76 
77 void NotificationManager::quit()
78 {
79  mQuitting = true;
80 
81  mTimer->stop();
82  delete mTimer;
83 
84  mNotifyThreadPool->clear();
85  mNotifyThreadPool->waitForDone();
86  delete mNotifyThreadPool;
87 
88  qDeleteAll(mSubscribers);
89 
90  delete mCollectionFetchScope;
91  delete mItemFetchScope;
92  delete mTagFetchScope;
93 
94  AkThread::quit();
95 }
96 
97 void NotificationManager::registerConnection(quintptr socketDescriptor)
98 {
99  Q_ASSERT(thread() == QThread::currentThread());
100 
101  NotificationSubscriber *subscriber = new NotificationSubscriber(this, socketDescriptor);
102  qCInfo(AKONADISERVER_LOG) << "New notification connection (registered as" << subscriber << ")";
103  connect(subscriber, &NotificationSubscriber::notificationDebuggingChanged,
104  this, [this](bool enabled) {
105  if (enabled) {
106  ++mDebugNotifications;
107  } else {
108  --mDebugNotifications;
109  }
110  Q_ASSERT(mDebugNotifications >= 0);
111  Q_ASSERT(mDebugNotifications <= mSubscribers.count());
112  });
113 
114  mSubscribers.push_back(subscriber);
115 }
116 void NotificationManager::forgetSubscriber(NotificationSubscriber *subscriber)
117 {
118  Q_ASSERT(QThread::currentThread() == thread());
119  mSubscribers.removeAll(subscriber);
120 }
121 
122 void NotificationManager::slotNotify(const Protocol::ChangeNotificationList &msgs)
123 {
124  Q_ASSERT(QThread::currentThread() == thread());
125  for (const auto &msg : msgs) {
126  switch (msg->type()) {
127  case Protocol::Command::CollectionChangeNotification:
128  Protocol::CollectionChangeNotification::appendAndCompress(mNotifications, msg);
129  continue;
130  case Protocol::Command::ItemChangeNotification:
131  case Protocol::Command::TagChangeNotification:
132  case Protocol::Command::RelationChangeNotification:
133  case Protocol::Command::SubscriptionChangeNotification:
134  case Protocol::Command::DebugChangeNotification:
135  mNotifications.push_back(msg);
136  continue;
137 
138  default:
139  Q_ASSERT_X(false, "slotNotify", "Invalid notification type!");
140  continue;
141  }
142  }
143 
144  if (!mTimer->isActive()) {
145  mTimer->start();
146  }
147 }
148 
149 class NotifyRunnable : public QRunnable
150 {
151 public:
152  explicit NotifyRunnable(NotificationSubscriber *subscriber,
153  const Protocol::ChangeNotificationList &notifications)
154  : mSubscriber(subscriber)
155  , mNotifications(notifications)
156  {
157  }
158 
159  ~NotifyRunnable() = default;
160 
161  void run() override {
162  for (const auto &ntf : qAsConst(mNotifications))
163  {
164  if (mSubscriber) {
165  mSubscriber->notify(ntf);
166  } else {
167  break;
168  }
169  }
170  }
171 
172 private:
173  Q_DISABLE_COPY_MOVE(NotifyRunnable);
174 
176  Protocol::ChangeNotificationList mNotifications;
177 };
178 
179 void NotificationManager::emitPendingNotifications()
180 {
181  Q_ASSERT(QThread::currentThread() == thread());
182 
183  if (mNotifications.isEmpty()) {
184  return;
185  }
186 
187  if (mDebugNotifications == 0) {
188  mSubscribers
189  | Views::filter(IsNotNull)
190  | Actions::forEach([this](const auto &subscriber) {
191  mNotifyThreadPool->start(new NotifyRunnable(subscriber, mNotifications));
192  });
193  } else {
194  // When debugging notification we have to use a non-threaded approach
195  // so that we can work with return value of notify()
196  for (const auto &notification : qAsConst(mNotifications)) {
197  QVector<QByteArray> listeners;
198  for (NotificationSubscriber *subscriber : qAsConst(mSubscribers)) {
199  if (subscriber && subscriber->notify(notification)) {
200  listeners.push_back(subscriber->subscriber());
201  }
202  }
203 
204  emitDebugNotification(notification, listeners);
205  }
206  }
207 
208  mNotifications.clear();
209 }
210 
211 void NotificationManager::emitDebugNotification(const Protocol::ChangeNotificationPtr &ntf,
212  const QVector<QByteArray> &listeners)
213 {
214  auto debugNtf = Protocol::DebugChangeNotificationPtr::create();
215  debugNtf->setNotification(ntf);
216  debugNtf->setListeners(listeners);
217  debugNtf->setTimestamp(QDateTime::currentMSecsSinceEpoch());
218  mSubscribers
219  | Views::filter(IsNotNull)
220  | Actions::forEach([this, &debugNtf](const auto &subscriber) {
221  mNotifyThreadPool->start(new NotifyRunnable(subscriber, {debugNtf}));
222  });
223 }
void timeout()
qint64 currentMSecsSinceEpoch()
KIOWIDGETS_EXPORT bool run(const QUrl &_url, bool _is_local)
QThread * currentThread()
Helper integration between Akonadi and Qt.
void push_back(const T &value)
This file is part of the KDE documentation.
Documentation copyright © 1996-2020 The KDE developers.
Generated on Fri Jun 5 2020 23:08:55 by doxygen 1.8.11 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.