8#include "notificationmanager.h"
9#include "aggregatedfetchscope.h"
10#include "akonadiserver_debug.h"
11#include "handlerhelper.h"
12#include "notificationsubscriber.h"
13#include "storage/collectionstatistics.h"
14#include "storage/notificationcollector.h"
17#include "private/scope_p.h"
18#include "private/standarddirs_p.h"
19#include "shared/akranges.h"
27using namespace Akonadi::Server;
28using namespace AkRanges;
30NotificationManager::NotificationManager(StartMode startMode)
31 : AkThread(QStringLiteral(
"NotificationManager"), startMode)
33 , mNotifyThreadPool(nullptr)
34 , mDebugNotifications(0)
38NotificationManager::~NotificationManager()
43void NotificationManager::init()
51 mTimer->
setInterval(settings.value(QStringLiteral(
"NotificationManager/Interval"), 50).toInt());
58 mCollectionFetchScope =
new AggregatedCollectionFetchScope();
59 mItemFetchScope =
new AggregatedItemFetchScope();
60 mTagFetchScope =
new AggregatedTagFetchScope();
63void NotificationManager::quit()
70 mNotifyThreadPool->
clear();
72 delete mNotifyThreadPool;
76 delete mCollectionFetchScope;
77 delete mItemFetchScope;
78 delete mTagFetchScope;
83void NotificationManager::registerConnection(quintptr socketDescriptor)
87 auto subscriber =
new NotificationSubscriber(
this, socketDescriptor);
88 qCInfo(
AKONADISERVER_LOG) <<
"New notification connection (registered as" << subscriber <<
")";
89 connect(subscriber, &NotificationSubscriber::notificationDebuggingChanged,
this, [
this](
bool enabled) {
91 ++mDebugNotifications;
93 --mDebugNotifications;
101void NotificationManager::forgetSubscriber(NotificationSubscriber *subscriber)
107void NotificationManager::slotNotify(
const Protocol::ChangeNotificationList &msgs)
110 for (
const auto &msg :
msgs) {
111 switch (msg->type()) {
112 case Protocol::Command::CollectionChangeNotification:
113 Protocol::CollectionChangeNotification::appendAndCompress(mNotifications, msg);
115 case Protocol::Command::ItemChangeNotification:
116 case Protocol::Command::TagChangeNotification:
117 case Protocol::Command::RelationChangeNotification:
118 case Protocol::Command::SubscriptionChangeNotification:
119 case Protocol::Command::DebugChangeNotification:
120 mNotifications.push_back(msg);
124 Q_ASSERT_X(
false,
"slotNotify",
"Invalid notification type!");
137 explicit NotifyRunnable(NotificationSubscriber *subscriber,
const Protocol::ChangeNotificationList ¬ifications)
138 : mSubscriber(subscriber)
139 , mNotifications(notifications)
143 ~NotifyRunnable()
override =
default;
147 for (
const auto &ntf :
std::as_const(mNotifications)) {
149 mSubscriber->notify(ntf);
157 Q_DISABLE_COPY_MOVE(NotifyRunnable)
160 Protocol::ChangeNotificationList mNotifications;
163void NotificationManager::emitPendingNotifications()
167 if (mNotifications.isEmpty()) {
171 if (mDebugNotifications == 0) {
172 mSubscribers | Views::filter(IsNotNull) | Actions::forEach([
this](
const auto &subscriber) {
173 mNotifyThreadPool->
start(
new NotifyRunnable(subscriber, mNotifications));
178 for (
const auto ¬ification :
std::
as_const(mNotifications)) {
180 for (NotificationSubscriber *subscriber :
std::
as_const(mSubscribers)) {
181 if (subscriber && subscriber->notify(notification)) {
182 listeners.
push_back(subscriber->subscriber());
186 emitDebugNotification(notification, listeners);
190 mNotifications.clear();
195 auto debugNtf = Protocol::DebugChangeNotificationPtr::create();
199 mSubscribers | Views::filter(IsNotNull) | Actions::forEach([
this, &
debugNtf](
const auto &subscriber) {
200 mNotifyThreadPool->
start(
new NotifyRunnable(subscriber, {
debugNtf}));
204#include "moc_notificationmanager.cpp"
Helper integration between Akonadi and Qt.
A glue between Qt and the standard library.
qint64 currentMSecsSinceEpoch()
qsizetype count() const const
void push_back(parameter_type value)
qsizetype removeAll(const AT &t)
QMetaObject::Connection connect(const QObject *sender, PointerToMemberFunction signal, Functor functor)
T qobject_cast(QObject *object)
QThread * thread() const const
QFuture< T > run(Function function,...)
QThread * currentThread()
void setMaxThreadCount(int maxThreadCount)
void start(Callable &&callableToRun, int priority)
bool waitForDone(int msecs)
void setInterval(int msec)
bool isActive() const const
void setSingleShot(bool singleShot)