24 #include "dbusconnectionpool.h"
26 #include "imapparser_p.h"
28 #include "session_p.h"
31 #include <klocalizedstring.h>
33 #include <QtCore/QEventLoop>
34 #include <QtCore/QTimer>
35 #include <QtCore/QTextStream>
36 #include <QtNetwork/QHostAddress>
37 #include <QtNetwork/QTcpSocket>
38 #include <QtDBus/QDBusInterface>
39 #include <QtDBus/QDBusConnectionInterface>
41 using namespace Akonadi;
43 static QDBusAbstractInterface *s_jobtracker = 0;
46 void JobPrivate::handleResponse(
const QByteArray & tag,
const QByteArray & data )
50 if ( mCurrentSubJob ) {
51 mCurrentSubJob->d_ptr->handleResponse( tag, data );
56 if ( data.startsWith(
"NO " ) || data.startsWith(
"BAD " ) ) {
57 QString msg = QString::fromUtf8( data );
59 msg.remove( 0, msg.startsWith( QLatin1String(
"NO " ) ) ? 3 : 4 );
61 if ( msg.endsWith( QLatin1String(
"\r\n" ) ) )
65 q->setErrorText( msg );
68 }
else if ( data.startsWith(
"OK" ) ) {
75 QTimer::singleShot( 0, q, SLOT(delayedEmitResult()) );
80 q->doHandleResponse( tag, data );
83 void JobPrivate::init( QObject *parent )
87 mParentJob =
dynamic_cast<Job*
>( parent );
88 mSession =
dynamic_cast<Session*
>( parent );
94 mSession = mParentJob->d_ptr->mSession;
98 mSession->d->addJob( q );
103 if ( !s_jobtracker ) {
106 static QTime s_lastTime;
107 if ( s_lastTime.isNull() || s_lastTime.elapsed() > 3000 ) {
108 if ( s_lastTime.isNull() )
110 if ( DBusConnectionPool::threadConnection().interface()->isServiceRegistered(QLatin1String(
"org.kde.akonadiconsole" ) ) ) {
111 s_jobtracker =
new QDBusInterface( QLatin1String(
"org.kde.akonadiconsole" ),
112 QLatin1String(
"/jobtracker" ),
113 QLatin1String(
"org.freedesktop.Akonadi.JobTracker" ),
114 DBusConnectionPool::threadConnection(), 0 );
116 s_lastTime.restart();
122 QMetaObject::invokeMethod( q,
"signalCreationToJobTracker", Qt::QueuedConnection );
125 void JobPrivate::signalCreationToJobTracker()
128 if ( s_jobtracker ) {
133 QList<QVariant> argumentList;
134 argumentList << QLatin1String( mSession->
sessionId() )
135 << QString::number(reinterpret_cast<quintptr>( q ), 16)
136 << ( mParentJob ? QString::number( reinterpret_cast<quintptr>( mParentJob ), 16) : QString() )
137 << QString::fromLatin1( q->metaObject()->className() )
138 << jobDebuggingString();
139 s_jobtracker->callWithArgumentList(QDBus::NoBlock, QLatin1String(
"jobCreated" ), argumentList);
143 void JobPrivate::signalStartedToJobTracker()
146 if ( s_jobtracker ) {
148 QList<QVariant> argumentList;
149 argumentList << QString::number(reinterpret_cast<quintptr>( q ), 16);
150 s_jobtracker->callWithArgumentList(QDBus::NoBlock, QLatin1String(
"jobStarted" ), argumentList);
154 void JobPrivate::delayedEmitResult()
160 void JobPrivate::startQueued()
165 emit q->aboutToStart( q );
167 QTimer::singleShot( 0, q, SLOT(startNext()) );
168 QMetaObject::invokeMethod( q,
"signalStartedToJobTracker", Qt::QueuedConnection );
171 void JobPrivate::lostConnection()
175 if ( mCurrentSubJob ) {
176 mCurrentSubJob->d_ptr->lostConnection();
183 void JobPrivate::slotSubJobAboutToStart(
Job * job )
185 Q_ASSERT( mCurrentSubJob == 0 );
186 mCurrentSubJob = job;
189 void JobPrivate::startNext()
193 if ( mStarted && !mCurrentSubJob && q->hasSubjobs() ) {
196 job->d_ptr->startQueued();
203 mTag = mParentJob->d_ptr->newTag();
205 mTag = QByteArray::number( mSession->d->nextTag() );
216 Q_ASSERT_X( !mWriteFinished,
"Job::writeData()",
"Calling writeData() after emitting writeFinished()" );
217 mSession->d->writeData( data );
222 mSession->d->itemRevisionChanged( itemId, oldRevision, newRevision );
228 foreach ( KJob *j, q->subjobs() ) {
231 job->d_ptr->updateItemRevision( itemId, oldRevision, newRevision );
239 Q_UNUSED( oldRevision );
240 Q_UNUSED( newRevision );
243 int JobPrivate::protocolVersion()
const
245 return mSession->d->protocolVersion;
250 : KCompositeJob( parent ),
253 d_ptr->init( parent );
257 : KCompositeJob( parent ),
260 d_ptr->init( parent );
268 if ( s_jobtracker ) {
269 QList<QVariant> argumentList;
270 argumentList << QString::number(reinterpret_cast<quintptr>( this ), 16)
272 s_jobtracker->callWithArgumentList(QDBus::NoBlock, QLatin1String(
"jobEnded" ), argumentList);
285 d->mSession->d->forceReconnect();
298 str = i18n(
"Cannot connect to the Akonadi service." );
301 str = i18n(
"The protocol version of the Akonadi server is incompatible. Make sure you have a compatible version installed." );
304 str = i18n(
"User canceled operation." );
309 str = i18n(
"Unknown error." );
312 if ( !errorText().isEmpty() ) {
313 str += QString::fromLatin1(
" (%1)" ).arg( errorText() );
320 bool rv = KCompositeJob::addSubjob( job );
323 QTimer::singleShot( 0,
this, SLOT(startNext()) );
330 bool rv = KCompositeJob::removeSubjob( job );
331 if ( job == d_ptr->mCurrentSubJob ) {
332 d_ptr->mCurrentSubJob = 0;
333 QTimer::singleShot( 0,
this, SLOT(startNext()) );
340 kDebug() <<
"Unhandled response: " << tag << data;
343 void Job::slotResult(KJob * job)
345 if ( d_ptr->mCurrentSubJob == job ) {
347 d_ptr->mCurrentSubJob = 0;
348 KCompositeJob::slotResult( job );
350 QTimer::singleShot( 0,
this, SLOT(startNext()) );
355 KCompositeJob::removeSubjob( job );
361 d_ptr->mWriteFinished =
true;
365 #include "moc_job.cpp"
virtual void doUpdateItemRevision(Akonadi::Item::Id, int oldRevision, int newRevision)
Overwrite this if your job does operations with conflict detection and update the item revisions if y...
The server protocol version is too old or too new.
Base class for all actions in the Akonadi storage.
void updateItemRevision(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
Propagate item revision changes to this job and its sub-jobs.
void itemRevisionChanged(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
Notify following jobs about item revision changes.
The connection to the Akonadi server failed.
static Session * defaultSession()
Returns the default session for this thread.
void start()
Jobs are started automatically once entering the event loop again, no need to explicitly call this...
void writeFinished(Akonadi::Job *job)
This signal is emitted if the job has finished all write operations, ie.
QByteArray sessionId() const
Returns the session identifier.
QByteArray tag() const
Return the tag used for the request.
virtual bool addSubjob(KJob *job)
Adds the given job as a subjob to this job.
virtual bool doKill()
Kills the execution of the job.
void emitWriteFinished()
Call this method to indicate that this job will not call writeData() again.
A communication session with the Akonadi storage.
QByteArray newTag()
Returns a new unique command tag for communication with the backend.
virtual void doHandleResponse(const QByteArray &tag, const QByteArray &data)
This method should be reimplemented in the concrete jobs in case you want to handle incoming data...
virtual ~Job()
Destroys the job.
void writeData(const QByteArray &data)
Sends raw data to the backend.
virtual bool removeSubjob(KJob *job)
Removes the given subjob of this job.
void aboutToStart(Akonadi::Job *job)
This signal is emitted directly before the job will be started.
The user canceld this job.
virtual QString errorString() const
Returns the error string, if there has been an error, an empty string otherwise.
Job(QObject *parent=0)
Creates a new job.