12#include "private/protocol_p.h"
13#include "protocolhelper_p.h"
14#include "servermanager.h"
15#include "servermanager_p.h"
16#include "sessionthread_p.h"
18#include "akonadicore_debug.h"
20#include <KLocalizedString>
22#include <QCoreApplication>
24#include <QRandomGenerator>
26#include <QThreadStorage>
29#include <QApplication>
30#include <QHostAddress>
35#define PIPELINE_LENGTH 0
39using namespace std::chrono_literals;
42void SessionPrivate::startNext()
49void SessionPrivate::reconnect()
52 connection =
new Connection(Connection::CommandConnection, sessionId, &mCommandBuffer);
53 sessionThread()->addConnection(connection);
54 mParent->connect(connection, &Connection::reconnected, mParent, &Session::reconnected,
Qt::QueuedConnection);
57 &Connection::socketDisconnected,
65 &Connection::socketError,
73 connection->reconnect();
76void SessionPrivate::socketError(
const QString &error)
78 qCWarning(AKONADICORE_LOG) <<
"Socket error occurred:" <<
error;
82void SessionPrivate::socketDisconnected()
85 currentJob->d_ptr->lostConnection();
90bool SessionPrivate::handleCommands()
92 CommandBufferLocker lock(&mCommandBuffer);
93 CommandBufferNotifyBlocker notify(&mCommandBuffer);
94 while (!mCommandBuffer.isEmpty()) {
95 const auto command = mCommandBuffer.dequeue();
97 const auto cmd = command.command;
98 const auto tag = command.tag;
101 if (cmd->type() == Protocol::Command::Hello) {
102 const auto &hello = Protocol::cmdCast<Protocol::HelloResponse>(cmd);
103 if (hello.isError()) {
104 qCWarning(AKONADICORE_LOG) <<
"Error when establishing connection with Akonadi server:" << hello.errorMessage();
105 connection->closeConnection();
110 qCDebug(AKONADICORE_LOG) <<
"Connected to" << hello.serverName() <<
", using protocol version" << hello.protocolVersion();
111 qCDebug(AKONADICORE_LOG) <<
"Server generation:" << hello.generation();
112 qCDebug(AKONADICORE_LOG) <<
"Server says:" << hello.message();
115 protocolVersion = hello.protocolVersion();
116 Internal::setServerProtocolVersion(protocolVersion);
117 Internal::setGeneration(hello.generation());
119 sendCommand(nextTag(), Protocol::LoginCommandPtr::create(sessionId));
120 }
else if (cmd->type() == Protocol::Command::Login) {
121 const auto &login = Protocol::cmdCast<Protocol::LoginResponse>(cmd);
122 if (login.isError()) {
123 qCWarning(AKONADICORE_LOG) <<
"Unable to login to Akonadi server:" << login.errorMessage();
124 connection->closeConnection();
133 }
else if (currentJob) {
134 currentJob->d_ptr->handleResponse(tag, cmd);
143bool SessionPrivate::canPipelineNext()
145 if (queue.isEmpty() || pipeline.count() >= PIPELINE_LENGTH) {
148 if (pipeline.isEmpty() && currentJob) {
149 return currentJob->d_ptr->mWriteFinished;
151 if (!pipeline.isEmpty()) {
152 return pipeline.last()->d_ptr->mWriteFinished;
157void SessionPrivate::doStartNext()
159 if (!connected || (queue.isEmpty() && pipeline.isEmpty())) {
162 if (canPipelineNext()) {
164 pipeline.enqueue(nextJob);
171 if (!pipeline.isEmpty()) {
172 currentJob = pipeline.dequeue();
174 currentJob = queue.dequeue();
175 startJob(currentJob);
179void SessionPrivate::startJob(
Job *job)
181 if (protocolVersion != Protocol::version()) {
183 if (protocolVersion < Protocol::version()) {
185 i18n(
"Protocol version mismatch. Server version is older (%1) than ours (%2). "
186 "If you updated your system recently please restart the Akonadi server.",
188 Protocol::version()));
189 qCWarning(AKONADICORE_LOG) <<
"Protocol version mismatch. Server version is older (" << protocolVersion <<
") than ours (" << Protocol::version()
191 "If you updated your system recently please restart the Akonadi server.";
194 i18n(
"Protocol version mismatch. Server version is newer (%1) than ours (%2). "
195 "If you updated your system recently please restart all KDE PIM applications.",
197 Protocol::version()));
198 qCWarning(AKONADICORE_LOG) <<
"Protocol version mismatch. Server version is newer (" << protocolVersion <<
") than ours (" << Protocol::version()
200 "If you updated your system recently please restart all KDE PIM applications.";
204 job->d_ptr->startQueued();
208void SessionPrivate::endJob(
Job *job)
213void SessionPrivate::jobDone(
KJob *job)
217 if (job == currentJob) {
218 if (pipeline.isEmpty()) {
220 currentJob =
nullptr;
222 currentJob = pipeline.dequeue();
235 Q_ASSERT((job == currentJob && pipeline.isEmpty()) || (job = pipeline.last()));
241void SessionPrivate::jobDestroyed(
QObject *job)
244 jobDone(
static_cast<KJob *
>(job));
247void SessionPrivate::addJob(
Job *job)
254 jobWriteFinished(job);
262void SessionPrivate::publishOtherJobs(
Job *thanThisJob)
265 for (
const auto &job : std::as_const(queue)) {
266 if (job != thanThisJob) {
267 job->d_ptr->publishJob();
272 qCDebug(AKONADICORE_LOG) <<
"published" << count <<
"pending jobs to the job tracker";
274 if (currentJob && currentJob != thanThisJob) {
275 currentJob->d_ptr->signalStartedToJobTracker();
279qint64 SessionPrivate::nextTag()
286 connection->sendCommand(tag, command);
291 if (state == ServerManager::Running && !connected) {
293 }
else if (!connected && state == ServerManager::Broken) {
299 job->
kill(KJob::EmitResult);
301 }
else if (state == ServerManager::Stopping) {
302 sessionThread()->destroyConnection(connection);
303 connection =
nullptr;
307void SessionPrivate::itemRevisionChanged(
Akonadi::Item::Id itemId,
int oldRevision,
int newRevision)
311 for (
Job *job : std::as_const(queue)) {
312 job->d_ptr->updateItemRevision(itemId, oldRevision, newRevision);
318SessionPrivate::SessionPrivate(
Session *parent)
320 , mSessionThread(new SessionThread)
321 , connection(nullptr)
323 , mCommandBuffer(parent,
"handleCommands")
324 , currentJob(nullptr)
330 socketDisconnected();
331 connection =
nullptr;
333 delete mSessionThread;
334 mSessionThread =
nullptr;
338SessionPrivate::~SessionPrivate()
341 delete mSessionThread;
344void SessionPrivate::init(
const QByteArray &
id)
352 qCDebug(AKONADICORE_LOG) <<
"Initializing session with ID" << id;
358 if (ServerManager::state() == ServerManager::NotRunning) {
359 ServerManager::start();
362 serverStateChanged(state);
367void SessionPrivate::forceReconnect()
372 connection->forceReconnect();
384 , d(new SessionPrivate(this))
409void SessionPrivate::createDefaultSession(
const QByteArray &sessionId)
411 Q_ASSERT_X(!sessionId.
isEmpty(),
"SessionPrivate::createDefaultSession",
"You tried to create a default session with empty session id!");
412 Q_ASSERT_X(!instances()->hasLocalData(),
"SessionPrivate::createDefaultSession",
"You tried to create a default session twice!");
414 auto session =
new Session(sessionId);
415 setDefaultSession(session);
418void SessionPrivate::setDefaultSession(
Session *session)
420 instances()->setLocalData({session});
422 instances()->setLocalData({});
428 if (!instances()->hasLocalData()) {
430 SessionPrivate::setDefaultSession(session);
432 return instances()->localData().data();
440void SessionPrivate::clear(
bool forceReconnect)
444 job->
kill(KJob::EmitResult);
449 job->d_ptr->mStarted =
false;
450 job->
kill(KJob::EmitResult);
454 currentJob->d_ptr->mStarted =
false;
455 currentJob->kill(KJob::EmitResult);
458 if (forceReconnect) {
459 this->forceReconnect();
463#include "moc_session.cpp"
qint64 Id
Describes the unique id type.
Base class for all actions in the Akonadi storage.
@ ProtocolVersionMismatch
The server protocol version is too old or too new.
@ ConnectionFailed
The connection to the Akonadi server failed.
void writeFinished(Akonadi::Job *job)
This signal is emitted if the job has finished all write operations, ie.
State
Enum for the various states the server can be in.
A communication session with the Akonadi storage.
~Session() override
Destroys the session.
void clear()
Stops all jobs queued for execution.
static Session * defaultSession()
Returns the default session for this thread.
Session(const QByteArray &sessionId=QByteArray(), QObject *parent=nullptr)
Creates a new session.
QByteArray sessionId() const
Returns the session identifier.
void setErrorText(const QString &errorText)
void setError(int errorCode)
bool kill(KJob::KillVerbosity verbosity=KJob::Quietly)
QString i18n(const char *text, const TYPE &arg...)
Helper integration between Akonadi and Qt.
void error(QWidget *parent, const QString &text, const QString &title, const KGuiItem &buttonOk, Options options=Notify)
KLEO_EXPORT std::unique_ptr< GpgME::DefaultAssuanTransaction > sendCommand(std::shared_ptr< GpgME::Context > &assuanContext, const std::string &command, GpgME::Error &err)
bool isEmpty() const const
QByteArray number(double n, char format, int precision)
QCoreApplication * instance()
QMetaObject::Connection connect(const QObject *sender, PointerToMemberFunction signal, Functor functor)
void destroyed(QObject *obj)
bool disconnect(const QMetaObject::Connection &connection)
QRandomGenerator * global()