Akonadi

notificationmanager.cpp
1/*
2 SPDX-FileCopyrightText: 2006-2007 Volker Krause <vkrause@kde.org>
3 SPDX-FileCopyrightText: 2010 Michael Jansen <kde@michael-jansen>
4
5 SPDX-License-Identifier: LGPL-2.0-or-later
6*/
7
8#include "notificationmanager.h"
9#include "aggregatedfetchscope.h"
10#include "akonadiserver_debug.h"
11#include "handlerhelper.h"
12#include "notificationsubscriber.h"
13#include "storage/collectionstatistics.h"
14#include "storage/notificationcollector.h"
15#include "tracer.h"
16
17#include "private/scope_p.h"
18#include "private/standarddirs_p.h"
19#include "shared/akranges.h"
20
21#include <QDateTime>
22#include <QSettings>
23#include <QThreadPool>
24#include <QTimer>
25
26using namespace Akonadi;
27using namespace Akonadi::Server;
28using namespace AkRanges;
29
30NotificationManager::NotificationManager(StartMode startMode)
31 : AkThread(QStringLiteral("NotificationManager"), startMode)
32 , mTimer(nullptr)
33 , mNotifyThreadPool(nullptr)
34 , mDebugNotifications(0)
35{
36}
37
38NotificationManager::~NotificationManager()
39{
40 quitThread();
41}
42
43void NotificationManager::init()
44{
45 AkThread::init();
46
47 const QString serverConfigFile = StandardDirs::serverConfigFile(StandardDirs::ReadWrite);
49
50 mTimer = new QTimer(this);
51 mTimer->setInterval(settings.value(QStringLiteral("NotificationManager/Interval"), 50).toInt());
52 mTimer->setSingleShot(true);
53 connect(mTimer, &QTimer::timeout, this, &NotificationManager::emitPendingNotifications);
54
55 mNotifyThreadPool = new QThreadPool(this);
56 mNotifyThreadPool->setMaxThreadCount(5);
57
58 mCollectionFetchScope = new AggregatedCollectionFetchScope();
59 mItemFetchScope = new AggregatedItemFetchScope();
60 mTagFetchScope = new AggregatedTagFetchScope();
61}
62
63void NotificationManager::quit()
64{
65 mQuitting = true;
66
67 mTimer->stop();
68 delete mTimer;
69
70 mNotifyThreadPool->clear();
71 mNotifyThreadPool->waitForDone();
72 delete mNotifyThreadPool;
73
74 qDeleteAll(mSubscribers);
75
76 delete mCollectionFetchScope;
77 delete mItemFetchScope;
78 delete mTagFetchScope;
79
80 AkThread::quit();
81}
82
83void NotificationManager::registerConnection(quintptr socketDescriptor)
84{
86
87 auto subscriber = new NotificationSubscriber(this, socketDescriptor);
88 qCInfo(AKONADISERVER_LOG) << "New notification connection (registered as" << subscriber << ")";
89 connect(subscriber, &NotificationSubscriber::notificationDebuggingChanged, this, [this](bool enabled) {
90 if (enabled) {
91 ++mDebugNotifications;
92 } else {
93 --mDebugNotifications;
94 }
95 Q_ASSERT(mDebugNotifications >= 0);
96 Q_ASSERT(mDebugNotifications <= mSubscribers.count());
97 });
98
99 mSubscribers.push_back(subscriber);
100}
101void NotificationManager::forgetSubscriber(NotificationSubscriber *subscriber)
102{
104 mSubscribers.removeAll(subscriber);
105}
106
107void NotificationManager::slotNotify(const Protocol::ChangeNotificationList &msgs)
108{
110 for (const auto &msg : msgs) {
111 switch (msg->type()) {
112 case Protocol::Command::CollectionChangeNotification:
113 Protocol::CollectionChangeNotification::appendAndCompress(mNotifications, msg);
114 continue;
115 case Protocol::Command::ItemChangeNotification:
116 case Protocol::Command::TagChangeNotification:
117 case Protocol::Command::RelationChangeNotification:
118 case Protocol::Command::SubscriptionChangeNotification:
119 case Protocol::Command::DebugChangeNotification:
120 mNotifications.push_back(msg);
121 continue;
122
123 default:
124 Q_ASSERT_X(false, "slotNotify", "Invalid notification type!");
125 continue;
126 }
127 }
128
129 if (!mTimer->isActive()) {
130 mTimer->start();
131 }
132}
133
134class NotifyRunnable : public QRunnable
135{
136public:
137 explicit NotifyRunnable(NotificationSubscriber *subscriber, const Protocol::ChangeNotificationList &notifications)
138 : mSubscriber(subscriber)
139 , mNotifications(notifications)
140 {
141 }
142
143 ~NotifyRunnable() override = default;
144
145 void run() override
146 {
147 for (const auto &ntf : std::as_const(mNotifications)) {
148 if (mSubscriber) {
149 mSubscriber->notify(ntf);
150 } else {
151 break;
152 }
153 }
154 }
155
156private:
157 Q_DISABLE_COPY_MOVE(NotifyRunnable)
158
160 Protocol::ChangeNotificationList mNotifications;
161};
162
163void NotificationManager::emitPendingNotifications()
164{
166
167 if (mNotifications.isEmpty()) {
168 return;
169 }
170
171 if (mDebugNotifications == 0) {
172 mSubscribers | Views::filter(IsNotNull) | Actions::forEach([this](const auto &subscriber) {
173 mNotifyThreadPool->start(new NotifyRunnable(subscriber, mNotifications));
174 });
175 } else {
176 // When debugging notification we have to use a non-threaded approach
177 // so that we can work with return value of notify()
178 for (const auto &notification : std::as_const(mNotifications)) {
179 QList<QByteArray> listeners;
180 for (NotificationSubscriber *subscriber : std::as_const(mSubscribers)) {
181 if (subscriber && subscriber->notify(notification)) {
182 listeners.push_back(subscriber->subscriber());
183 }
184 }
185
186 emitDebugNotification(notification, listeners);
187 }
188 }
189
190 mNotifications.clear();
191}
192
193void NotificationManager::emitDebugNotification(const Protocol::ChangeNotificationPtr &ntf, const QList<QByteArray> &listeners)
194{
195 auto debugNtf = Protocol::DebugChangeNotificationPtr::create();
196 debugNtf->setNotification(ntf);
197 debugNtf->setListeners(listeners);
199 mSubscribers | Views::filter(IsNotNull) | Actions::forEach([this, &debugNtf](const auto &subscriber) {
200 mNotifyThreadPool->start(new NotifyRunnable(subscriber, {debugNtf}));
201 });
202}
203
204#include "moc_notificationmanager.cpp"
Helper integration between Akonadi and Qt.
A glue between Qt and the standard library.
qint64 currentMSecsSinceEpoch()
qsizetype count() const const
void push_back(parameter_type value)
qsizetype removeAll(const AT &t)
QMetaObject::Connection connect(const QObject *sender, PointerToMemberFunction signal, Functor functor)
T qobject_cast(QObject *object)
QThread * thread() const const
QFuture< T > run(Function function,...)
QThread * currentThread()
void clear()
void setMaxThreadCount(int maxThreadCount)
void start(Callable &&callableToRun, int priority)
bool waitForDone(int msecs)
void setInterval(int msec)
bool isActive() const const
void setSingleShot(bool singleShot)
void start()
void stop()
void timeout()
This file is part of the KDE documentation.
Documentation copyright © 1996-2024 The KDE developers.
Generated on Tue Mar 26 2024 11:13:38 by doxygen 1.10.0 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.