21 #include "session_p.h"
23 #include "imapparser_p.h"
26 #include "servermanager.h"
27 #include "servermanager_p.h"
28 #include "xdgbasedirs_p.h"
31 #include <klocalizedstring.h>
33 #include <QCoreApplication>
34 #include <QtCore/QDir>
35 #include <QtCore/QQueue>
36 #include <QtCore/QThreadStorage>
37 #include <QtCore/QTimer>
38 #include <QtCore/QThread>
41 #include <QtNetwork/QLocalSocket>
42 #include <QtNetwork/QTcpSocket>
43 #include <QtNetwork/QHostAddress>
48 #define PIPELINE_LENGTH 0
51 using namespace Akonadi;
55 void SessionPrivate::startNext()
57 QTimer::singleShot( 0, mParent, SLOT(doStartNext()) );
62 QLocalSocket *localSocket = qobject_cast<QLocalSocket*>( socket );
63 if ( localSocket && (localSocket->state() == QLocalSocket::ConnectedState
64 || localSocket->state() == QLocalSocket::ConnectingState ) ) {
69 QTcpSocket *tcpSocket = qobject_cast<QTcpSocket*>( socket );
70 if ( tcpSocket && (tcpSocket->state() == QTcpSocket::ConnectedState
71 || tcpSocket->state() == QTcpSocket::ConnectingState ) ) {
77 QString serverAddress;
82 const QByteArray serverAddressEnvVar = qgetenv(
"AKONADI_SERVER_ADDRESS" );
83 if ( !serverAddressEnvVar.isEmpty() ) {
84 const int pos = serverAddressEnvVar.indexOf(
':' );
85 const QByteArray protocol = serverAddressEnvVar.left( pos );
86 QMap<QString, QString> options;
87 foreach (
const QString &entry, QString::fromLatin1( serverAddressEnvVar.mid( pos + 1 ) ).split( QLatin1Char(
',') ) ) {
88 const QStringList pair = entry.split( QLatin1Char(
'=') );
89 if ( pair.size() != 2 )
91 options.insert( pair.first(), pair.last() );
93 kDebug() << protocol << options;
95 if ( protocol ==
"tcp" ) {
96 serverAddress = options.value( QLatin1String(
"host" ) );
97 port = options.value( QLatin1String(
"port" ) ).toUInt();
99 }
else if ( protocol ==
"unix" ) {
100 serverAddress = options.value( QLatin1String(
"path" ) );
101 }
else if ( protocol ==
"pipe" ) {
102 serverAddress = options.value( QLatin1String(
"name" ) );
107 if ( serverAddress.isEmpty() ) {
109 const QFileInfo fileInfo( connectionConfigFile );
110 if ( !fileInfo.exists() ) {
111 kDebug() <<
"Akonadi Client Session: connection config file '"
112 "akonadi/akonadiconnectionrc' can not be found in"
113 << XdgBaseDirs::homePath(
"config" ) <<
"nor in any of"
114 << XdgBaseDirs::systemPathList(
"config" );
116 const QSettings connectionSettings( connectionConfigFile, QSettings::IniFormat );
118 #ifdef Q_OS_WIN //krazy:exclude=cpp
119 serverAddress = connectionSettings.value( QLatin1String(
"Data/NamedPipe" ), QLatin1String(
"Akonadi" ) ).toString();
121 const QString defaultSocketDir = Internal::xdgSaveDir(
"data" );
122 serverAddress = connectionSettings.value( QLatin1String(
"Data/UnixPath" ), QString(defaultSocketDir + QLatin1String(
"/akonadiserver.socket" )) ).toString();
133 socket = localSocket =
new QLocalSocket( mParent );
134 mParent->connect( localSocket, SIGNAL(error(QLocalSocket::LocalSocketError)), SLOT(socketError(QLocalSocket::LocalSocketError)) );
136 socket = tcpSocket =
new QTcpSocket( mParent );
137 mParent->connect( tcpSocket, SIGNAL(error(QAbstractSocket::SocketError)), SLOT(socketError(QAbstractSocket::SocketError)) );
139 mParent->connect( socket, SIGNAL(disconnected()), SLOT(socketDisconnected()) );
140 mParent->connect( socket, SIGNAL(readyRead()), SLOT(dataReceived()) );
144 kDebug() <<
"connectToServer" << serverAddress;
146 tcpSocket->connectToHost( QHostAddress::LocalHost, 31414 );
149 localSocket->connectToServer( serverAddress );
151 tcpSocket->connectToHost( serverAddress, port );
160 return Internal::xdgSaveDir(
"config" ) + QLatin1String(
"/akonadiconnectionrc");
163 void SessionPrivate::socketError( QLocalSocket::LocalSocketError )
165 Q_ASSERT( mParent->sender() == socket );
166 kWarning() <<
"Socket error occurred:" << qobject_cast<QLocalSocket*>( socket )->errorString();
167 socketDisconnected();
170 void SessionPrivate::socketError( QAbstractSocket::SocketError )
172 Q_ASSERT( mParent->sender() == socket );
173 kWarning() <<
"Socket error occurred:" << qobject_cast<QTcpSocket*>( socket )->errorString();
174 socketDisconnected();
177 void SessionPrivate::socketDisconnected()
180 currentJob->d_ptr->lostConnection();
184 void SessionPrivate::dataReceived()
186 while ( socket->bytesAvailable() > 0 ) {
187 if ( parser->continuationSize() > 1 ) {
188 const QByteArray data = socket->read( qMin( socket->bytesAvailable(), parser->continuationSize() - 1 ) );
189 parser->parseBlock( data );
190 }
else if ( socket->canReadLine() ) {
191 if ( !parser->parseNextLine( socket->readLine() ) )
195 if ( parser->tag() == QByteArray(
"0" ) ) {
196 if ( parser->data().startsWith(
"OK" ) ) {
197 writeData(
"1 CAPABILITY (NOTIFY 2 NOPAYLOADPATH)");
199 kWarning() <<
"Unable to login to Akonadi server:" << parser->data();
201 QTimer::singleShot( 1000, mParent, SLOT(
reconnect()) );
206 if ( parser->tag() == QByteArray(
"1") ) {
207 if ( parser->data().startsWith(
"OK") ) {
211 kDebug() <<
"Unhandled server capability response:" << parser->data();
216 if ( parser->tag() ==
"*" && parser->data().startsWith(
"OK Akonadi" ) ) {
217 const int pos = parser->data().indexOf(
"[PROTOCOL" );
220 ImapParser::parseNumber( parser->data(), tmp, 0, pos + 9 );
221 protocolVersion = tmp;
222 Internal::setServerProtocolVersion( tmp );
224 kDebug() <<
"Server protocol version is:" << protocolVersion;
226 writeData(
"0 LOGIN " + ImapParser::quote( sessionId ) +
'\n' );
231 currentJob->d_ptr->handleResponse( parser->tag(), parser->data() );
242 bool SessionPrivate::canPipelineNext()
244 if ( queue.isEmpty() || pipeline.count() >= PIPELINE_LENGTH )
246 if ( pipeline.isEmpty() && currentJob )
247 return currentJob->d_ptr->mWriteFinished;
248 if ( !pipeline.isEmpty() )
249 return pipeline.last()->d_ptr->mWriteFinished;
253 void SessionPrivate::doStartNext()
255 if ( !connected || (queue.isEmpty() && pipeline.isEmpty()) )
257 if ( canPipelineNext() ) {
259 pipeline.enqueue( nextJob );
265 if ( !pipeline.isEmpty() ) {
266 currentJob = pipeline.dequeue();
268 currentJob = queue.dequeue();
269 startJob( currentJob );
273 void SessionPrivate::startJob(
Job *job )
275 if ( protocolVersion < minimumProtocolVersion() ) {
277 job->setErrorText( i18n(
"Protocol version %1 found, expected at least %2", protocolVersion, minimumProtocolVersion() ) );
280 job->d_ptr->startQueued();
289 void SessionPrivate::jobDone(KJob * job)
293 if ( job == currentJob ) {
294 if ( pipeline.isEmpty() ) {
298 currentJob = pipeline.dequeue();
303 queue.removeAll( static_cast<Akonadi::Job*>( job ) );
305 pipeline.removeAll( static_cast<Akonadi::Job*>( job ) );
309 void SessionPrivate::jobWriteFinished(
Akonadi::Job* job )
311 Q_ASSERT( (job == currentJob && pipeline.isEmpty()) || (job = pipeline.last()) );
317 void SessionPrivate::jobDestroyed(QObject * job)
320 jobDone( static_cast<KJob*>( job ) );
326 QObject::connect( job, SIGNAL(result(KJob*)), mParent, SLOT(jobDone(KJob*)) );
328 QObject::connect( job, SIGNAL(destroyed(QObject*)), mParent, SLOT(jobDestroyed(QObject*)) );
340 socket->write( data );
342 kWarning() <<
"Trying to write while session is disconnected!" << kBacktrace();
355 foreach (
Job *job, queue )
356 job->d_ptr->updateItemRevision( itemId, oldRevision, newRevision );
361 SessionPrivate::SessionPrivate(
Session *parent )
362 : mParent( parent ), socket( 0 ), protocolVersion( 0 ), currentJob( 0 ), parser( 0 )
366 void SessionPrivate::init(
const QByteArray &
id )
369 parser =
new ImapParser();
371 if ( !
id.isEmpty() ) {
374 sessionId = QCoreApplication::instance()->applicationName().toUtf8()
375 +
'-' + QByteArray::number( qrand() );
395 socket->disconnect( mParent );
399 QMetaObject::invokeMethod( mParent,
"reconnect", Qt::QueuedConnection );
406 d->init( sessionId );
413 d->init( sessionId );
427 static QThreadStorage<Session*> instances;
431 Q_ASSERT_X( !sessionId.isEmpty(),
"SessionPrivate::createDefaultSession",
432 "You tried to create a default session with empty session id!" );
433 Q_ASSERT_X( !instances.hasLocalData(),
"SessionPrivate::createDefaultSession",
434 "You tried to create a default session twice!" );
436 instances.setLocalData(
new Session( sessionId ) );
441 instances.setLocalData( session );
446 if ( !instances.hasLocalData() )
447 instances.setLocalData(
new Session() );
448 return instances.localData();
453 foreach (
Job* job, d->queue )
454 job->kill( KJob::EmitResult );
456 foreach (
Job* job, d->pipeline ) {
457 job->d_ptr->mStarted =
false;
458 job->kill( KJob::EmitResult );
461 if ( d->currentJob ) {
462 d->currentJob->d_ptr->mStarted =
false;
463 d->currentJob->kill( KJob::EmitResult );
468 #include "moc_session.cpp"
The server protocol version is too old or too new.
void forceReconnect()
Disconnects a previously existing connection and tries to reconnect.
int nextTag()
Returns the next IMAP tag.
static ServerManager * self()
Returns the singleton instance of this class, for connecting to its signals.
static void createDefaultSession(const QByteArray &sessionId)
Creates a new default session for this thread with the given sessionId.
Base class for all actions in the Akonadi storage.
static void setDefaultSession(Session *session)
Sets the default session.
static Session * defaultSession()
Returns the default session for this thread.
~Session()
Destroys the session.
QByteArray sessionId() const
Returns the session identifier.
void clear()
Stops all jobs queued for execution.
static State state()
Returns the state of the server.
A communication session with the Akonadi storage.
Server is not running, could be no one started it yet or it failed to start.
static bool start()
Starts the server.
virtual void addJob(Job *job)
Associates the given Job object with this session.
void reconnected()
This signal is emitted whenever the session has been reconnected to the server (e.g.
virtual void reconnect()
Attemps to establish a connections to the Akonadi server.
void itemRevisionChanged(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
Propagate item revision changes to following jobs.
Server is running and operational.
void writeData(const QByteArray &data)
Sends the given raw data.
static QString connectionFile()
Default location for akonadiconnectionrc.
State
Enum for the various states the server can be in.
Session(const QByteArray &sessionId=QByteArray(), QObject *parent=0)
Creates a new session.