• Skip to content
  • Skip to link menu
KDE API Reference
  • KDE API Reference
  • kdepimlibs API Reference
  • KDE Home
  • Contact Us
 

akonadi

  • sources
  • kde-4.12
  • kdepimlibs
  • akonadi
session.cpp
1 /*
2  Copyright (c) 2007 Volker Krause <vkrause@kde.org>
3 
4  This library is free software; you can redistribute it and/or modify it
5  under the terms of the GNU Library General Public License as published by
6  the Free Software Foundation; either version 2 of the License, or (at your
7  option) any later version.
8 
9  This library is distributed in the hope that it will be useful, but WITHOUT
10  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
12  License for more details.
13 
14  You should have received a copy of the GNU Library General Public License
15  along with this library; see the file COPYING.LIB. If not, write to the
16  Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17  02110-1301, USA.
18 */
19 
20 #include "session.h"
21 #include "session_p.h"
22 
23 #include "imapparser_p.h"
24 #include "job.h"
25 #include "job_p.h"
26 #include "servermanager.h"
27 #include "servermanager_p.h"
28 #include "xdgbasedirs_p.h"
29 
30 #include <kdebug.h>
31 #include <klocalizedstring.h>
32 
33 #include <QCoreApplication>
34 #include <QtCore/QDir>
35 #include <QtCore/QQueue>
36 #include <QtCore/QThreadStorage>
37 #include <QtCore/QTimer>
38 #include <QtCore/QThread>
39 #include <QSettings>
40 
41 #include <QtNetwork/QLocalSocket>
42 #include <QtNetwork/QTcpSocket>
43 #include <QtNetwork/QHostAddress>
44 
45 // ### FIXME pipelining got broken by switching result emission in JobPrivate::handleResponse to delayed emission
46 // in order to work around exec() deadlocks. As a result of that Session knows to late about a finished job and still
47 // sends responses for the next one to the already finished one
48 #define PIPELINE_LENGTH 0
49 //#define PIPELINE_LENGTH 2
50 
51 using namespace Akonadi;
52 
53 //@cond PRIVATE
54 
55 void SessionPrivate::startNext()
56 {
57  QTimer::singleShot( 0, mParent, SLOT(doStartNext()) );
58 }
59 
60 void SessionPrivate::reconnect()
61 {
62  QLocalSocket *localSocket = qobject_cast<QLocalSocket*>( socket );
63  if ( localSocket && (localSocket->state() == QLocalSocket::ConnectedState
64  || localSocket->state() == QLocalSocket::ConnectingState ) ) {
65  // nothing to do, we are still/already connected
66  return;
67  }
68 
69  QTcpSocket *tcpSocket = qobject_cast<QTcpSocket*>( socket );
70  if ( tcpSocket && (tcpSocket->state() == QTcpSocket::ConnectedState
71  || tcpSocket->state() == QTcpSocket::ConnectingState ) ) {
72  // same here, but for TCP
73  return;
74  }
75 
76  // try to figure out where to connect to
77  QString serverAddress;
78  quint16 port = 0;
79  bool useTcp = false;
80 
81  // env var has precedence
82  const QByteArray serverAddressEnvVar = qgetenv( "AKONADI_SERVER_ADDRESS" );
83  if ( !serverAddressEnvVar.isEmpty() ) {
84  const int pos = serverAddressEnvVar.indexOf( ':' );
85  const QByteArray protocol = serverAddressEnvVar.left( pos );
86  QMap<QString, QString> options;
87  foreach ( const QString &entry, QString::fromLatin1( serverAddressEnvVar.mid( pos + 1 ) ).split( QLatin1Char(',') ) ) {
88  const QStringList pair = entry.split( QLatin1Char('=') );
89  if ( pair.size() != 2 )
90  continue;
91  options.insert( pair.first(), pair.last() );
92  }
93  kDebug() << protocol << options;
94 
95  if ( protocol == "tcp" ) {
96  serverAddress = options.value( QLatin1String( "host" ) );
97  port = options.value( QLatin1String( "port" ) ).toUInt();
98  useTcp = true;
99  } else if ( protocol == "unix" ) {
100  serverAddress = options.value( QLatin1String( "path" ) );
101  } else if ( protocol == "pipe" ) {
102  serverAddress = options.value( QLatin1String( "name" ) );
103  }
104  }
105 
106  // try config file next, fall back to defaults if that fails as well
107  if ( serverAddress.isEmpty() ) {
108  const QString connectionConfigFile = connectionFile();
109  const QFileInfo fileInfo( connectionConfigFile );
110  if ( !fileInfo.exists() ) {
111  kDebug() << "Akonadi Client Session: connection config file '"
112  "akonadi/akonadiconnectionrc' can not be found in"
113  << XdgBaseDirs::homePath( "config" ) << "nor in any of"
114  << XdgBaseDirs::systemPathList( "config" );
115  }
116  const QSettings connectionSettings( connectionConfigFile, QSettings::IniFormat );
117 
118 #ifdef Q_OS_WIN //krazy:exclude=cpp
119  serverAddress = connectionSettings.value( QLatin1String( "Data/NamedPipe" ), QLatin1String( "Akonadi" ) ).toString();
120 #else
121  const QString defaultSocketDir = Internal::xdgSaveDir( "data" );
122  serverAddress = connectionSettings.value( QLatin1String( "Data/UnixPath" ), QString(defaultSocketDir + QLatin1String( "/akonadiserver.socket" )) ).toString();
123 #endif
124  }
125 #ifdef Q_OS_WINCE
126  useTcp = true;
127 #endif
128 
129  // create sockets if not yet done, note that this does not yet allow changing socket types on the fly
130  // but that's probably not something we need to support anyway
131  if ( !socket ) {
132  if ( !useTcp ) {
133  socket = localSocket = new QLocalSocket( mParent );
134  mParent->connect( localSocket, SIGNAL(error(QLocalSocket::LocalSocketError)), SLOT(socketError(QLocalSocket::LocalSocketError)) );
135  } else {
136  socket = tcpSocket = new QTcpSocket( mParent );
137  mParent->connect( tcpSocket, SIGNAL(error(QAbstractSocket::SocketError)), SLOT(socketError(QAbstractSocket::SocketError)) );
138  }
139  mParent->connect( socket, SIGNAL(disconnected()), SLOT(socketDisconnected()) );
140  mParent->connect( socket, SIGNAL(readyRead()), SLOT(dataReceived()) );
141  }
142 
143  // actually do connect
144  kDebug() << "connectToServer" << serverAddress;
145 #ifdef Q_OS_WINCE
146  tcpSocket->connectToHost( QHostAddress::LocalHost, 31414 );
147 #else
148  if ( !useTcp ) {
149  localSocket->connectToServer( serverAddress );
150  } else {
151  tcpSocket->connectToHost( serverAddress, port );
152  }
153 #endif
154 
155  emit mParent->reconnected();
156 }
157 
158 QString SessionPrivate::connectionFile()
159 {
160  return Internal::xdgSaveDir( "config" ) + QLatin1String("/akonadiconnectionrc");
161 }
162 
163 void SessionPrivate::socketError( QLocalSocket::LocalSocketError )
164 {
165  Q_ASSERT( mParent->sender() == socket );
166  kWarning() << "Socket error occurred:" << qobject_cast<QLocalSocket*>( socket )->errorString();
167  socketDisconnected();
168 }
169 
170 void SessionPrivate::socketError( QAbstractSocket::SocketError )
171 {
172  Q_ASSERT( mParent->sender() == socket );
173  kWarning() << "Socket error occurred:" << qobject_cast<QTcpSocket*>( socket )->errorString();
174  socketDisconnected();
175 }
176 
177 void SessionPrivate::socketDisconnected()
178 {
179  if ( currentJob )
180  currentJob->d_ptr->lostConnection();
181  connected = false;
182 }
183 
184 void SessionPrivate::dataReceived()
185 {
186  while ( socket->bytesAvailable() > 0 ) {
187  if ( parser->continuationSize() > 1 ) {
188  const QByteArray data = socket->read( qMin( socket->bytesAvailable(), parser->continuationSize() - 1 ) );
189  parser->parseBlock( data );
190  } else if ( socket->canReadLine() ) {
191  if ( !parser->parseNextLine( socket->readLine() ) )
192  continue; // response not yet completed
193 
194  // handle login response
195  if ( parser->tag() == QByteArray( "0" ) ) {
196  if ( parser->data().startsWith( "OK" ) ) { //krazy:exclude=strings
197  writeData("1 CAPABILITY (NOTIFY 2 NOPAYLOADPATH)");
198  } else {
199  kWarning() << "Unable to login to Akonadi server:" << parser->data();
200  socket->close();
201  QTimer::singleShot( 1000, mParent, SLOT(reconnect()) );
202  }
203  }
204 
205  // handle capability response
206  if ( parser->tag() == QByteArray("1") ) {
207  if ( parser->data().startsWith("OK") ) {
208  connected = true;
209  startNext();
210  } else {
211  kDebug() << "Unhandled server capability response:" << parser->data();
212  }
213  }
214 
215  // send login command
216  if ( parser->tag() == "*" && parser->data().startsWith( "OK Akonadi" ) ) {
217  const int pos = parser->data().indexOf( "[PROTOCOL" );
218  if ( pos > 0 ) {
219  qint64 tmp = 0;
220  ImapParser::parseNumber( parser->data(), tmp, 0, pos + 9 );
221  protocolVersion = tmp;
222  Internal::setServerProtocolVersion( tmp );
223  }
224  kDebug() << "Server protocol version is:" << protocolVersion;
225 
226  writeData( "0 LOGIN " + ImapParser::quote( sessionId ) + '\n' );
227 
228  // work for the current job
229  } else {
230  if ( currentJob )
231  currentJob->d_ptr->handleResponse( parser->tag(), parser->data() );
232  }
233 
234  // reset parser stuff
235  parser->reset();
236  } else {
237  break; // nothing we can do for now
238  }
239  }
240 }
241 
242 bool SessionPrivate::canPipelineNext()
243 {
244  if ( queue.isEmpty() || pipeline.count() >= PIPELINE_LENGTH )
245  return false;
246  if ( pipeline.isEmpty() && currentJob )
247  return currentJob->d_ptr->mWriteFinished;
248  if ( !pipeline.isEmpty() )
249  return pipeline.last()->d_ptr->mWriteFinished;
250  return false;
251 }
252 
253 void SessionPrivate::doStartNext()
254 {
255  if ( !connected || (queue.isEmpty() && pipeline.isEmpty()) )
256  return;
257  if ( canPipelineNext() ) {
258  Akonadi::Job *nextJob = queue.dequeue();
259  pipeline.enqueue( nextJob );
260  startJob( nextJob );
261  }
262  if ( jobRunning )
263  return;
264  jobRunning = true;
265  if ( !pipeline.isEmpty() ) {
266  currentJob = pipeline.dequeue();
267  } else {
268  currentJob = queue.dequeue();
269  startJob( currentJob );
270  }
271 }
272 
273 void SessionPrivate::startJob( Job *job )
274 {
275  if ( protocolVersion < minimumProtocolVersion() ) {
276  job->setError( Job::ProtocolVersionMismatch );
277  job->setErrorText( i18n( "Protocol version %1 found, expected at least %2", protocolVersion, minimumProtocolVersion() ) );
278  job->emitResult();
279  } else {
280  job->d_ptr->startQueued();
281  }
282 }
283 
284 void SessionPrivate::endJob( Job *job )
285 {
286  job->emitResult();
287 }
288 
289 void SessionPrivate::jobDone(KJob * job)
290 {
291  // ### careful, this method can be called from the QObject dtor of job (see jobDestroyed() below)
292  // so don't call any methods on job itself
293  if ( job == currentJob ) {
294  if ( pipeline.isEmpty() ) {
295  jobRunning = false;
296  currentJob = 0;
297  } else {
298  currentJob = pipeline.dequeue();
299  }
300  startNext();
301  } else {
302  // non-current job finished, likely canceled while still in the queue
303  queue.removeAll( static_cast<Akonadi::Job*>( job ) );
304  // ### likely not enough to really cancel already running jobs
305  pipeline.removeAll( static_cast<Akonadi::Job*>( job ) );
306  }
307 }
308 
309 void SessionPrivate::jobWriteFinished( Akonadi::Job* job )
310 {
311  Q_ASSERT( (job == currentJob && pipeline.isEmpty()) || (job = pipeline.last()) );
312  Q_UNUSED( job );
313 
314  startNext();
315 }
316 
317 void SessionPrivate::jobDestroyed(QObject * job)
318 {
319  // careful, accessing non-QObject methods of job will fail here already
320  jobDone( static_cast<KJob*>( job ) );
321 }
322 
323 void SessionPrivate::addJob(Job * job)
324 {
325  queue.append( job );
326  QObject::connect( job, SIGNAL(result(KJob*)), mParent, SLOT(jobDone(KJob*)) );
327  QObject::connect( job, SIGNAL(writeFinished(Akonadi::Job*)), mParent, SLOT(jobWriteFinished(Akonadi::Job*)) );
328  QObject::connect( job, SIGNAL(destroyed(QObject*)), mParent, SLOT(jobDestroyed(QObject*)) );
329  startNext();
330 }
331 
332 int SessionPrivate::nextTag()
333 {
334  return theNextTag++;
335 }
336 
337 void SessionPrivate::writeData(const QByteArray & data)
338 {
339  if ( socket )
340  socket->write( data );
341  else
342  kWarning() << "Trying to write while session is disconnected!" << kBacktrace();
343 }
344 
345 void SessionPrivate::serverStateChanged( ServerManager::State state )
346 {
347  if ( state == ServerManager::Running && !connected )
348  reconnect();
349 }
350 
351 void SessionPrivate::itemRevisionChanged( Akonadi::Item::Id itemId, int oldRevision, int newRevision )
352 {
353  // only deal with the queue, for the guys in the pipeline it's too late already anyway
354  // and they shouldn't have gotten there if they depend on a preceding job anyway.
355  foreach ( Job *job, queue )
356  job->d_ptr->updateItemRevision( itemId, oldRevision, newRevision );
357 }
358 
359 //@endcond
360 
361 SessionPrivate::SessionPrivate( Session *parent )
362  : mParent( parent ), socket( 0 ), protocolVersion( 0 ), currentJob( 0 ), parser( 0 )
363 {
364 }
365 
366 void SessionPrivate::init( const QByteArray &id )
367 {
368  kDebug() << id;
369  parser = new ImapParser();
370 
371  if ( !id.isEmpty() ) {
372  sessionId = id;
373  } else {
374  sessionId = QCoreApplication::instance()->applicationName().toUtf8()
375  + '-' + QByteArray::number( qrand() );
376  }
377 
378  connected = false;
379  theNextTag = 2;
380  jobRunning = false;
381 
382  if ( ServerManager::state() == ServerManager::NotRunning )
383  ServerManager::start();
384  mParent->connect( ServerManager::self(), SIGNAL(stateChanged(Akonadi::ServerManager::State)),
385  SLOT(serverStateChanged(Akonadi::ServerManager::State)) );
386 
387  reconnect();
388 }
389 
390 void SessionPrivate::forceReconnect()
391 {
392  jobRunning = false;
393  connected = false;
394  if ( socket ) {
395  socket->disconnect( mParent ); // prevent signal emitted from close() causing mayhem - we might be called from ~QThreadStorage!
396  delete socket;
397  }
398  socket = 0;
399  QMetaObject::invokeMethod( mParent, "reconnect", Qt::QueuedConnection ); // avoids reconnecting in the dtor
400 }
401 
402 Session::Session(const QByteArray & sessionId, QObject * parent) :
403  QObject( parent ),
404  d( new SessionPrivate( this ) )
405 {
406  d->init( sessionId );
407 }
408 
409 Session::Session( SessionPrivate *dd, const QByteArray & sessionId, QObject * parent)
410  : QObject( parent ),
411  d( dd )
412 {
413  d->init( sessionId );
414 }
415 
416 Session::~Session()
417 {
418  clear();
419  delete d;
420 }
421 
422 QByteArray Session::sessionId() const
423 {
424  return d->sessionId;
425 }
426 
427 static QThreadStorage<Session*> instances;
428 
429 void SessionPrivate::createDefaultSession( const QByteArray &sessionId )
430 {
431  Q_ASSERT_X( !sessionId.isEmpty(), "SessionPrivate::createDefaultSession",
432  "You tried to create a default session with empty session id!" );
433  Q_ASSERT_X( !instances.hasLocalData(), "SessionPrivate::createDefaultSession",
434  "You tried to create a default session twice!" );
435 
436  instances.setLocalData( new Session( sessionId ) );
437 }
438 
439 void SessionPrivate::setDefaultSession( Session *session )
440 {
441  instances.setLocalData( session );
442 }
443 
444 Session* Session::defaultSession()
445 {
446  if ( !instances.hasLocalData() )
447  instances.setLocalData( new Session() );
448  return instances.localData();
449 }
450 
451 void Session::clear()
452 {
453  foreach ( Job* job, d->queue )
454  job->kill( KJob::EmitResult ); // safe, not started yet
455  d->queue.clear();
456  foreach ( Job* job, d->pipeline ) {
457  job->d_ptr->mStarted = false; // avoid killing/reconnect loops
458  job->kill( KJob::EmitResult );
459  }
460  d->pipeline.clear();
461  if ( d->currentJob ) {
462  d->currentJob->d_ptr->mStarted = false; // avoid killing/reconnect loops
463  d->currentJob->kill( KJob::EmitResult );
464  }
465  d->forceReconnect();
466 }
467 
468 #include "moc_session.cpp"
Akonadi::Job::ProtocolVersionMismatch
The server protocol version is too old or too new.
Definition: job.h:107
Akonadi::SessionPrivate::forceReconnect
void forceReconnect()
Disconnects a previously existing connection and tries to reconnect.
Definition: session.cpp:390
Akonadi::SessionPrivate::nextTag
int nextTag()
Returns the next IMAP tag.
Akonadi::ServerManager::self
static ServerManager * self()
Returns the singleton instance of this class, for connecting to its signals.
Definition: servermanager.cpp:152
Akonadi::SessionPrivate::createDefaultSession
static void createDefaultSession(const QByteArray &sessionId)
Creates a new default session for this thread with the given sessionId.
Definition: session.cpp:429
Akonadi::Job
Base class for all actions in the Akonadi storage.
Definition: job.h:86
Akonadi::SessionPrivate::setDefaultSession
static void setDefaultSession(Session *session)
Sets the default session.
Definition: session.cpp:439
Akonadi::Session::defaultSession
static Session * defaultSession()
Returns the default session for this thread.
Definition: session.cpp:444
Akonadi::Session::~Session
~Session()
Destroys the session.
Definition: session.cpp:416
Akonadi::Session::sessionId
QByteArray sessionId() const
Returns the session identifier.
Definition: session.cpp:422
Akonadi::Session::clear
void clear()
Stops all jobs queued for execution.
Definition: session.cpp:451
Akonadi::ServerManager::state
static State state()
Returns the state of the server.
Definition: servermanager.cpp:218
Akonadi::Session
A communication session with the Akonadi storage.
Definition: session.h:59
Akonadi::SessionPrivate
Definition: session_p.h:41
Akonadi::ServerManager::NotRunning
Server is not running, could be no one started it yet or it failed to start.
Definition: servermanager.h:51
Akonadi::ServerManager::start
static bool start()
Starts the server.
Definition: servermanager.cpp:157
Akonadi::SessionPrivate::endJob
void endJob(Job *job)
Akonadi::SessionPrivate::addJob
virtual void addJob(Job *job)
Associates the given Job object with this session.
Akonadi::Session::reconnected
void reconnected()
This signal is emitted whenever the session has been reconnected to the server (e.g.
Akonadi::SessionPrivate::reconnect
virtual void reconnect()
Attemps to establish a connections to the Akonadi server.
Akonadi::SessionPrivate::itemRevisionChanged
void itemRevisionChanged(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
Propagate item revision changes to following jobs.
Akonadi::ServerManager::Running
Server is running and operational.
Definition: servermanager.h:53
Akonadi::SessionPrivate::writeData
void writeData(const QByteArray &data)
Sends the given raw data.
Akonadi::SessionPrivate::connectionFile
static QString connectionFile()
Default location for akonadiconnectionrc.
Akonadi::ServerManager::State
State
Enum for the various states the server can be in.
Definition: servermanager.h:50
Akonadi::Session::Session
Session(const QByteArray &sessionId=QByteArray(), QObject *parent=0)
Creates a new session.
Definition: session.cpp:402
This file is part of the KDE documentation.
Documentation copyright © 1996-2014 The KDE developers.
Generated on Tue Oct 14 2014 23:00:27 by doxygen 1.8.7 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.

akonadi

Skip menu "akonadi"
  • Main Page
  • Namespace List
  • Namespace Members
  • Alphabetical List
  • Class List
  • Class Hierarchy
  • Class Members
  • File List
  • Modules
  • Related Pages

kdepimlibs API Reference

Skip menu "kdepimlibs API Reference"
  • akonadi
  •   contact
  •   kmime
  •   socialutils
  • kabc
  • kalarmcal
  • kblog
  • kcal
  • kcalcore
  • kcalutils
  • kholidays
  • kimap
  • kldap
  • kmbox
  • kmime
  • kpimidentities
  • kpimtextedit
  • kresources
  • ktnef
  • kxmlrpcclient
  • microblog

Search



Report problems with this website to our bug tracking system.
Contact the specific authors with questions and comments about the page contents.

KDE® and the K Desktop Environment® logo are registered trademarks of KDE e.V. | Legal