8#include "notificationmanager.h" 
    9#include "aggregatedfetchscope.h" 
   10#include "akonadiserver_debug.h" 
   11#include "handlerhelper.h" 
   12#include "notificationsubscriber.h" 
   13#include "storage/collectionstatistics.h" 
   14#include "storage/notificationcollector.h" 
   17#include "private/scope_p.h" 
   18#include "private/standarddirs_p.h" 
   19#include "shared/akranges.h" 
   27using namespace Akonadi::Server;
 
   28using namespace AkRanges;
 
   30NotificationManager::NotificationManager(StartMode startMode)
 
   31    : AkThread(QStringLiteral(
"NotificationManager"), startMode)
 
   33    , mNotifyThreadPool(nullptr)
 
   34    , mDebugNotifications(0)
 
   38NotificationManager::~NotificationManager()
 
   43void NotificationManager::init()
 
   47    const QString serverConfigFile = StandardDirs::serverConfigFile(StandardDirs::ReadWrite);
 
   50    mTimer = 
new QTimer(
this);
 
   51    mTimer->setInterval(settings.value(QStringLiteral(
"NotificationManager/Interval"), 50).toInt());
 
   52    mTimer->setSingleShot(
true);
 
   55    mNotifyThreadPool = 
new QThreadPool(
this);
 
   56    mNotifyThreadPool->setMaxThreadCount(5);
 
   58    mCollectionFetchScope = 
new AggregatedCollectionFetchScope();
 
   59    mItemFetchScope = 
new AggregatedItemFetchScope();
 
   60    mTagFetchScope = 
new AggregatedTagFetchScope();
 
   63void NotificationManager::quit()
 
   70    mNotifyThreadPool->clear();
 
   71    mNotifyThreadPool->waitForDone();
 
   72    delete mNotifyThreadPool;
 
   74    qDeleteAll(mSubscribers);
 
   76    delete mCollectionFetchScope;
 
   77    delete mItemFetchScope;
 
   78    delete mTagFetchScope;
 
   83void NotificationManager::registerConnection(quintptr socketDescriptor)
 
   87    auto subscriber = 
new NotificationSubscriber(
this, socketDescriptor);
 
   88    qCInfo(AKONADISERVER_LOG) << 
"New notification connection (registered as" << subscriber << 
")";
 
   89    connect(subscriber, &NotificationSubscriber::notificationDebuggingChanged, 
this, [
this](
bool enabled) {
 
   91            ++mDebugNotifications;
 
   93            --mDebugNotifications;
 
   95        Q_ASSERT(mDebugNotifications >= 0);
 
   96        Q_ASSERT(mDebugNotifications <= mSubscribers.count());
 
   99    mSubscribers.push_back(subscriber);
 
  101void NotificationManager::forgetSubscriber(NotificationSubscriber *subscriber)
 
  104    mSubscribers.removeAll(subscriber);
 
  107void NotificationManager::slotNotify(
const Protocol::ChangeNotificationList &msgs)
 
  110    for (
const auto &msg : msgs) {
 
  111        switch (msg->type()) {
 
  112        case Protocol::Command::CollectionChangeNotification:
 
  113            Protocol::CollectionChangeNotification::appendAndCompress(mNotifications, msg);
 
  115        case Protocol::Command::ItemChangeNotification:
 
  116        case Protocol::Command::TagChangeNotification:
 
  117        case Protocol::Command::SubscriptionChangeNotification:
 
  118        case Protocol::Command::DebugChangeNotification:
 
  119            mNotifications.push_back(msg);
 
  123            Q_ASSERT_X(
false, 
"slotNotify", 
"Invalid notification type!");
 
  128    if (!mTimer->isActive()) {
 
  133class NotifyRunnable : 
public QRunnable
 
  136    explicit NotifyRunnable(NotificationSubscriber *subscriber, 
const Protocol::ChangeNotificationList ¬ifications)
 
  137        : mSubscriber(subscriber)
 
  138        , mNotifications(notifications)
 
  142    ~NotifyRunnable() 
override = 
default;
 
  146        for (
const auto &ntf : std::as_const(mNotifications)) {
 
  148                mSubscriber->notify(ntf);
 
  156    Q_DISABLE_COPY_MOVE(NotifyRunnable)
 
  158    QPointer<NotificationSubscriber> mSubscriber;
 
  159    Protocol::ChangeNotificationList mNotifications;
 
  162void NotificationManager::emitPendingNotifications()
 
  166    if (mNotifications.isEmpty()) {
 
  170    if (mDebugNotifications == 0) {
 
  171        mSubscribers | Views::filter(IsNotNull) | Actions::forEach([
this](
const auto &subscriber) {
 
  172            mNotifyThreadPool->start(
new NotifyRunnable(subscriber, mNotifications));
 
  177        for (
const auto ¬ification : std::as_const(mNotifications)) {
 
  178            QList<QByteArray> listeners;
 
  179            for (NotificationSubscriber *subscriber : std::as_const(mSubscribers)) {
 
  180                if (subscriber && subscriber->notify(notification)) {
 
  181                    listeners.
push_back(subscriber->subscriber());
 
  185            emitDebugNotification(notification, listeners);
 
  189    mNotifications.clear();
 
  192void NotificationManager::emitDebugNotification(
const Protocol::ChangeNotificationPtr &ntf, 
const QList<QByteArray> &listeners)
 
  194    auto debugNtf = Protocol::DebugChangeNotificationPtr::create();
 
  195    debugNtf->setNotification(ntf);
 
  196    debugNtf->setListeners(listeners);
 
  198    mSubscribers | Views::filter(IsNotNull) | Actions::forEach([
this, &debugNtf](
const auto &subscriber) {
 
  199        mNotifyThreadPool->start(
new NotifyRunnable(subscriber, {debugNtf}));
 
  203#include "moc_notificationmanager.cpp" 
Helper integration between Akonadi and Qt.
 
qint64 currentMSecsSinceEpoch()
 
void push_back(parameter_type value)
 
QMetaObject::Connection connect(const QObject *sender, PointerToMemberFunction signal, Functor functor)
 
QThread * thread() const const
 
QFuture< T > run(Function function,...)
 
QThread * currentThread()