8#include "notificationmanager.h"
9#include "aggregatedfetchscope.h"
10#include "akonadiserver_debug.h"
11#include "handlerhelper.h"
12#include "notificationsubscriber.h"
13#include "storage/collectionstatistics.h"
14#include "storage/notificationcollector.h"
17#include "private/scope_p.h"
18#include "private/standarddirs_p.h"
19#include "shared/akranges.h"
27using namespace Akonadi::Server;
28using namespace AkRanges;
30NotificationManager::NotificationManager(StartMode startMode)
31 : AkThread(QStringLiteral(
"NotificationManager"), startMode)
33 , mNotifyThreadPool(nullptr)
34 , mDebugNotifications(0)
38NotificationManager::~NotificationManager()
43void NotificationManager::init()
47 const QString serverConfigFile = StandardDirs::serverConfigFile(StandardDirs::ReadWrite);
51 mTimer->
setInterval(settings.value(QStringLiteral(
"NotificationManager/Interval"), 50).toInt());
58 mCollectionFetchScope =
new AggregatedCollectionFetchScope();
59 mItemFetchScope =
new AggregatedItemFetchScope();
60 mTagFetchScope =
new AggregatedTagFetchScope();
63void NotificationManager::quit()
70 mNotifyThreadPool->
clear();
72 delete mNotifyThreadPool;
74 qDeleteAll(mSubscribers);
76 delete mCollectionFetchScope;
77 delete mItemFetchScope;
78 delete mTagFetchScope;
83void NotificationManager::registerConnection(quintptr socketDescriptor)
87 auto subscriber =
new NotificationSubscriber(
this, socketDescriptor);
88 qCInfo(AKONADISERVER_LOG) <<
"New notification connection (registered as" << subscriber <<
")";
89 connect(subscriber, &NotificationSubscriber::notificationDebuggingChanged,
this, [
this](
bool enabled) {
91 ++mDebugNotifications;
93 --mDebugNotifications;
95 Q_ASSERT(mDebugNotifications >= 0);
96 Q_ASSERT(mDebugNotifications <= mSubscribers.
count());
101void NotificationManager::forgetSubscriber(NotificationSubscriber *subscriber)
107void NotificationManager::slotNotify(
const Protocol::ChangeNotificationList &msgs)
110 for (
const auto &msg : msgs) {
111 switch (msg->type()) {
112 case Protocol::Command::CollectionChangeNotification:
113 Protocol::CollectionChangeNotification::appendAndCompress(mNotifications, msg);
115 case Protocol::Command::ItemChangeNotification:
116 case Protocol::Command::TagChangeNotification:
117 case Protocol::Command::SubscriptionChangeNotification:
118 case Protocol::Command::DebugChangeNotification:
119 mNotifications.push_back(msg);
123 Q_ASSERT_X(
false,
"slotNotify",
"Invalid notification type!");
136 explicit NotifyRunnable(NotificationSubscriber *subscriber,
const Protocol::ChangeNotificationList ¬ifications)
137 : mSubscriber(subscriber)
138 , mNotifications(notifications)
142 ~NotifyRunnable()
override =
default;
146 for (
const auto &ntf : std::as_const(mNotifications)) {
148 mSubscriber->notify(ntf);
156 Q_DISABLE_COPY_MOVE(NotifyRunnable)
159 Protocol::ChangeNotificationList mNotifications;
162void NotificationManager::emitPendingNotifications()
166 if (mNotifications.isEmpty()) {
170 if (mDebugNotifications == 0) {
171 mSubscribers | Views::filter(IsNotNull) | Actions::forEach([
this](
const auto &subscriber) {
172 mNotifyThreadPool->
start(
new NotifyRunnable(subscriber, mNotifications));
177 for (
const auto ¬ification : std::as_const(mNotifications)) {
179 for (NotificationSubscriber *subscriber : std::as_const(mSubscribers)) {
180 if (subscriber && subscriber->notify(notification)) {
181 listeners.
push_back(subscriber->subscriber());
185 emitDebugNotification(notification, listeners);
189 mNotifications.clear();
194 auto debugNtf = Protocol::DebugChangeNotificationPtr::create();
195 debugNtf->setNotification(ntf);
196 debugNtf->setListeners(listeners);
198 mSubscribers | Views::filter(IsNotNull) | Actions::forEach([
this, &debugNtf](
const auto &subscriber) {
199 mNotifyThreadPool->
start(
new NotifyRunnable(subscriber, {debugNtf}));
203#include "moc_notificationmanager.cpp"
Helper integration between Akonadi and Qt.
qint64 currentMSecsSinceEpoch()
qsizetype count() const const
void push_back(parameter_type value)
qsizetype removeAll(const AT &t)
QMetaObject::Connection connect(const QObject *sender, PointerToMemberFunction signal, Functor functor)
QThread * thread() const const
QFuture< T > run(Function function,...)
QThread * currentThread()
void setMaxThreadCount(int maxThreadCount)
void start(Callable &&callableToRun, int priority)
bool waitForDone(int msecs)
void setInterval(int msec)
bool isActive() const const
void setSingleShot(bool singleShot)