8 #include "notificationmanager.h"
9 #include "aggregatedfetchscope.h"
10 #include "akonadiserver_debug.h"
11 #include "handlerhelper.h"
12 #include "notificationsubscriber.h"
13 #include "storage/collectionstatistics.h"
14 #include "storage/notificationcollector.h"
17 #include <private/scope_p.h>
18 #include <private/standarddirs_p.h>
19 #include <shared/akranges.h>
23 #include <QThreadPool>
27 using namespace Akonadi::Server;
28 using namespace AkRanges;
30 NotificationManager::NotificationManager(StartMode startMode)
31 : AkThread(QStringLiteral(
"NotificationManager"), startMode)
33 , mNotifyThreadPool(nullptr)
34 , mDebugNotifications(0)
38 NotificationManager::~NotificationManager()
43 void NotificationManager::init()
47 const QString serverConfigFile = StandardDirs::serverConfigFile(StandardDirs::ReadWrite);
51 mTimer->setInterval(settings.value(QStringLiteral(
"NotificationManager/Interval"), 50).toInt());
52 mTimer->setSingleShot(
true);
53 connect(mTimer, &
QTimer::timeout,
this, &NotificationManager::emitPendingNotifications);
56 mNotifyThreadPool->setMaxThreadCount(5);
58 mCollectionFetchScope =
new AggregatedCollectionFetchScope();
59 mItemFetchScope =
new AggregatedItemFetchScope();
60 mTagFetchScope =
new AggregatedTagFetchScope();
63 void NotificationManager::quit()
70 mNotifyThreadPool->clear();
71 mNotifyThreadPool->waitForDone();
72 delete mNotifyThreadPool;
74 qDeleteAll(mSubscribers);
76 delete mCollectionFetchScope;
77 delete mItemFetchScope;
78 delete mTagFetchScope;
83 void NotificationManager::registerConnection(quintptr socketDescriptor)
87 auto subscriber =
new NotificationSubscriber(
this, socketDescriptor);
88 qCInfo(AKONADISERVER_LOG) <<
"New notification connection (registered as" << subscriber <<
")";
89 connect(subscriber, &NotificationSubscriber::notificationDebuggingChanged,
this, [
this](
bool enabled) {
91 ++mDebugNotifications;
93 --mDebugNotifications;
95 Q_ASSERT(mDebugNotifications >= 0);
96 Q_ASSERT(mDebugNotifications <= mSubscribers.count());
99 mSubscribers.push_back(subscriber);
101 void NotificationManager::forgetSubscriber(NotificationSubscriber *subscriber)
104 mSubscribers.removeAll(subscriber);
107 void NotificationManager::slotNotify(
const Protocol::ChangeNotificationList &msgs)
110 for (
const auto &msg : msgs) {
111 switch (msg->type()) {
112 case Protocol::Command::CollectionChangeNotification:
113 Protocol::CollectionChangeNotification::appendAndCompress(mNotifications, msg);
115 case Protocol::Command::ItemChangeNotification:
116 case Protocol::Command::TagChangeNotification:
117 case Protocol::Command::RelationChangeNotification:
118 case Protocol::Command::SubscriptionChangeNotification:
119 case Protocol::Command::DebugChangeNotification:
120 mNotifications.push_back(msg);
124 Q_ASSERT_X(
false,
"slotNotify",
"Invalid notification type!");
129 if (!mTimer->isActive()) {
137 explicit NotifyRunnable(NotificationSubscriber *subscriber,
const Protocol::ChangeNotificationList ¬ifications)
138 : mSubscriber(subscriber)
139 , mNotifications(notifications)
143 ~NotifyRunnable()
override =
default;
147 for (
const auto &ntf : std::as_const(mNotifications)) {
149 mSubscriber->notify(ntf);
157 Q_DISABLE_COPY_MOVE(NotifyRunnable)
160 Protocol::ChangeNotificationList mNotifications;
163 void NotificationManager::emitPendingNotifications()
167 if (mNotifications.isEmpty()) {
171 if (mDebugNotifications == 0) {
172 mSubscribers | Views::filter(IsNotNull) | Actions::forEach([
this](
const auto &subscriber) {
173 mNotifyThreadPool->start(
new NotifyRunnable(subscriber, mNotifications));
178 for (
const auto ¬ification : std::as_const(mNotifications)) {
180 for (NotificationSubscriber *subscriber : std::as_const(mSubscribers)) {
181 if (subscriber && subscriber->notify(notification)) {
182 listeners.
push_back(subscriber->subscriber());
186 emitDebugNotification(notification, listeners);
190 mNotifications.clear();
195 auto debugNtf = Protocol::DebugChangeNotificationPtr::create();
196 debugNtf->setNotification(ntf);
197 debugNtf->setListeners(listeners);
199 mSubscribers | Views::filter(IsNotNull) | Actions::forEach([
this, &debugNtf](
const auto &subscriber) {
200 mNotifyThreadPool->start(
new NotifyRunnable(subscriber, {debugNtf}));