12#include "private/protocol_p.h" 
   13#include "protocolhelper_p.h" 
   14#include "servermanager.h" 
   15#include "servermanager_p.h" 
   16#include "sessionthread_p.h" 
   18#include "akonadicore_debug.h" 
   20#include <KLocalizedString> 
   22#include <QCoreApplication> 
   24#include <QRandomGenerator> 
   26#include <QThreadStorage> 
   29#include <QCoreApplication> 
   30#include <QHostAddress> 
   35#define PIPELINE_LENGTH 0 
   39using namespace std::chrono_literals;
 
   42void SessionPrivate::startNext()
 
   49void SessionPrivate::reconnect()
 
   52        connection = 
new Connection(Connection::CommandConnection, sessionId, &mCommandBuffer);
 
   53        sessionThread()->addConnection(connection);
 
   57            &Connection::socketDisconnected,
 
   65            &Connection::socketError,
 
   73    connection->reconnect();
 
   76void SessionPrivate::socketError(
const QString &error)
 
   78    qCWarning(AKONADICORE_LOG) << 
"Socket error occurred:" << 
error;
 
   82void SessionPrivate::socketDisconnected()
 
   85        currentJob->d_ptr->lostConnection();
 
   90bool SessionPrivate::handleCommands()
 
   92    CommandBufferLocker lock(&mCommandBuffer);
 
   93    CommandBufferNotifyBlocker notify(&mCommandBuffer);
 
   94    while (!mCommandBuffer.isEmpty()) {
 
   95        const auto command = mCommandBuffer.dequeue();
 
   97        const auto cmd = command.command;
 
   98        const auto tag = command.tag;
 
  101        if (cmd->type() == Protocol::Command::Hello) {
 
  102            const auto &hello = Protocol::cmdCast<Protocol::HelloResponse>(cmd);
 
  103            if (hello.isError()) {
 
  104                qCWarning(AKONADICORE_LOG) << 
"Error when establishing connection with Akonadi server:" << hello.errorMessage();
 
  105                connection->closeConnection();
 
  110            qCDebug(AKONADICORE_LOG) << 
"Connected to" << hello.serverName() << 
", using protocol version" << hello.protocolVersion();
 
  111            qCDebug(AKONADICORE_LOG) << 
"Server generation:" << hello.generation();
 
  112            qCDebug(AKONADICORE_LOG) << 
"Server says:" << hello.message();
 
  115            protocolVersion = hello.protocolVersion();
 
  116            Internal::setServerProtocolVersion(protocolVersion);
 
  117            Internal::setGeneration(hello.generation());
 
  119            sendCommand(nextTag(), Protocol::LoginCommandPtr::create(sessionId));
 
  120        } 
else if (cmd->type() == Protocol::Command::Login) {
 
  121            const auto &login = Protocol::cmdCast<Protocol::LoginResponse>(cmd);
 
  122            if (login.isError()) {
 
  123                qCWarning(AKONADICORE_LOG) << 
"Unable to login to Akonadi server:" << login.errorMessage();
 
  124                connection->closeConnection();
 
  133        } 
else if (currentJob) {
 
  134            currentJob->d_ptr->handleResponse(tag, cmd);
 
  143bool SessionPrivate::canPipelineNext()
 
  145    if (queue.isEmpty() || pipeline.count() >= PIPELINE_LENGTH) {
 
  148    if (pipeline.isEmpty() && currentJob) {
 
  149        return currentJob->d_ptr->mWriteFinished;
 
  151    if (!pipeline.isEmpty()) {
 
  152        return pipeline.last()->d_ptr->mWriteFinished;
 
  157void SessionPrivate::doStartNext()
 
  159    if (!connected || (queue.isEmpty() && pipeline.isEmpty())) {
 
  162    if (canPipelineNext()) {
 
  164        pipeline.enqueue(nextJob);
 
  171    if (!pipeline.isEmpty()) {
 
  172        currentJob = pipeline.dequeue();
 
  174        currentJob = queue.dequeue();
 
  175        startJob(currentJob);
 
  179void SessionPrivate::startJob(
Job *job)
 
  181    if (protocolVersion != Protocol::version()) {
 
  183        if (protocolVersion < Protocol::version()) {
 
  185                i18n(
"Protocol version mismatch. Server version is older (%1) than ours (%2). " 
  186                     "If you updated your system recently please restart the Akonadi server.",
 
  188                     Protocol::version()));
 
  189            qCWarning(AKONADICORE_LOG) << 
"Protocol version mismatch. Server version is older (" << protocolVersion << 
") than ours (" << Protocol::version()
 
  191                                          "If you updated your system recently please restart the Akonadi server.";
 
  194                i18n(
"Protocol version mismatch. Server version is newer (%1) than ours (%2). " 
  195                     "If you updated your system recently please restart all KDE PIM applications.",
 
  197                     Protocol::version()));
 
  198            qCWarning(AKONADICORE_LOG) << 
"Protocol version mismatch. Server version is newer (" << protocolVersion << 
") than ours (" << Protocol::version()
 
  200                                          "If you updated your system recently please restart all KDE PIM applications.";
 
  204        job->d_ptr->startQueued();
 
  208void SessionPrivate::endJob(
Job *job)
 
  213void SessionPrivate::jobDone(
KJob *job)
 
  217    if (job == currentJob) {
 
  218        if (pipeline.isEmpty()) {
 
  220            currentJob = 
nullptr;
 
  222            currentJob = pipeline.dequeue();
 
  235    Q_ASSERT((job == currentJob && pipeline.isEmpty()) || (job = pipeline.last()));
 
  241void SessionPrivate::jobDestroyed(
QObject *job)
 
  244    jobDone(
static_cast<KJob *
>(job));
 
  247void SessionPrivate::addJob(
Job *job)
 
  254        jobWriteFinished(job);
 
  262void SessionPrivate::publishOtherJobs(
Job *thanThisJob)
 
  265    for (
const auto &job : std::as_const(queue)) {
 
  266        if (job != thanThisJob) {
 
  267            job->d_ptr->publishJob();
 
  272        qCDebug(AKONADICORE_LOG) << 
"published" << count << 
"pending jobs to the job tracker";
 
  274    if (currentJob && currentJob != thanThisJob) {
 
  275        currentJob->d_ptr->signalStartedToJobTracker();
 
  279qint64 SessionPrivate::nextTag()
 
  284void SessionPrivate::sendCommand(qint64 tag, 
const Protocol::CommandPtr &command)
 
  286    connection->sendCommand(tag, command);
 
  299            job->
kill(KJob::EmitResult);
 
  302        sessionThread()->destroyConnection(connection);
 
  303        connection = 
nullptr;
 
  307void SessionPrivate::itemRevisionChanged(
Akonadi::Item::Id itemId, 
int oldRevision, 
int newRevision)
 
  311    for (
Job *job : std::as_const(queue)) {
 
  312        job->d_ptr->updateItemRevision(itemId, oldRevision, newRevision);
 
  318SessionPrivate::SessionPrivate(
Session *parent)
 
  320    , mSessionThread(new SessionThread)
 
  321    , connection(nullptr)
 
  323    , mCommandBuffer(parent, 
"handleCommands")
 
  324    , currentJob(nullptr)
 
  330        socketDisconnected();
 
  331        connection = 
nullptr;
 
  333        delete mSessionThread;
 
  334        mSessionThread = 
nullptr;
 
  338SessionPrivate::~SessionPrivate()
 
  341    delete mSessionThread;
 
  344void SessionPrivate::init(
const QByteArray &
id)
 
  352    qCDebug(AKONADICORE_LOG) << 
"Initializing session with ID" << id;
 
  362        serverStateChanged(state);
 
  367void SessionPrivate::forceReconnect()
 
  372        connection->forceReconnect();
 
  384    , d(new SessionPrivate(this))
 
 
  409void SessionPrivate::createDefaultSession(
const QByteArray &sessionId)
 
  411    Q_ASSERT_X(!sessionId.
isEmpty(), 
"SessionPrivate::createDefaultSession", 
"You tried to create a default session with empty session id!");
 
  412    Q_ASSERT_X(!instances()->hasLocalData(), 
"SessionPrivate::createDefaultSession", 
"You tried to create a default session twice!");
 
  414    auto session = 
new Session(sessionId);
 
  415    setDefaultSession(session);
 
  418void SessionPrivate::setDefaultSession(
Session *session)
 
  420    instances()->setLocalData({session});
 
  422        instances()->setLocalData({});
 
  428    if (!instances()->hasLocalData()) {
 
  430        SessionPrivate::setDefaultSession(session);
 
  432    return instances()->localData().data();
 
 
  440void SessionPrivate::clear(
bool forceReconnect)
 
  444        job->
kill(KJob::EmitResult); 
 
  449        job->d_ptr->mStarted = 
false; 
 
  450        job->
kill(KJob::EmitResult);
 
  454        currentJob->d_ptr->mStarted = 
false; 
 
  455        currentJob->kill(KJob::EmitResult);
 
  458    if (forceReconnect) {
 
  459        this->forceReconnect();
 
  463#include "moc_session.cpp" 
qint64 Id
Describes the unique id type.
 
Base class for all actions in the Akonadi storage.
 
@ ProtocolVersionMismatch
The server protocol version is too old or too new.
 
@ ConnectionFailed
The connection to the Akonadi server failed.
 
void writeFinished(Akonadi::Job *job)
This signal is emitted if the job has finished all write operations, ie.
 
static State state()
Returns the state of the server.
 
static bool start()
Starts the server.
 
State
Enum for the various states the server can be in.
 
@ Running
Server is running and operational.
 
@ Broken
Server is not operational and an error has been detected.
 
@ NotRunning
Server is not running, could be no one started it yet or it failed to start.
 
@ Stopping
Server is shutting down.
 
void stateChanged(Akonadi::ServerManager::State state)
Emitted whenever the server state changes.
 
static ServerManager * self()
Returns the singleton instance of this class, for connecting to its signals.
 
A communication session with the Akonadi storage.
 
~Session() override
Destroys the session.
 
void clear()
Stops all jobs queued for execution.
 
static Session * defaultSession()
Returns the default session for this thread.
 
Session(const QByteArray &sessionId=QByteArray(), QObject *parent=nullptr)
Creates a new session.
 
QByteArray sessionId() const
Returns the session identifier.
 
void reconnected()
This signal is emitted whenever the session has been reconnected to the server (e....
 
void setErrorText(const QString &errorText)
 
void setError(int errorCode)
 
bool kill(KJob::KillVerbosity verbosity=KJob::Quietly)
 
QString i18n(const char *text, const TYPE &arg...)
 
Helper integration between Akonadi and Qt.
 
void error(QWidget *parent, const QString &text, const QString &title, const KGuiItem &buttonOk, Options options=Notify)
 
KLEO_EXPORT std::unique_ptr< GpgME::DefaultAssuanTransaction > sendCommand(std::shared_ptr< GpgME::Context > &assuanContext, const std::string &command, GpgME::Error &err)
 
bool isEmpty() const const
 
QByteArray number(double n, char format, int precision)
 
QCoreApplication * instance()
 
QMetaObject::Connection connect(const QObject *sender, PointerToMemberFunction signal, Functor functor)
 
void destroyed(QObject *obj)
 
bool disconnect(const QMetaObject::Connection &connection)
 
QObject * parent() const const
 
QRandomGenerator * global()