• Skip to content
  • Skip to link menu
KDE 4.2 API Reference
  • KDE API Reference
  • kdepim
  • Sitemap
  • Contact Us
 

kleopatra

kdpipeiodevice.cpp

Go to the documentation of this file.
00001 /*
00002   Copyright (C) 2007 Klarälvdalens Datakonsult AB
00003 
00004   KDPipeIODevice is free software; you can redistribute it and/or
00005   modify it under the terms of the GNU Library General Public
00006   License as published by the Free Software Foundation; either
00007   version 2 of the License, or (at your option) any later version.
00008 
00009   KDPipeIODevice is distributed in the hope that it will be useful,
00010   but WITHOUT ANY WARRANTY; without even the implied warranty of
00011   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012   GNU Library General Public License for more details.
00013 
00014   You should have received a copy of the GNU Library General Public License
00015   along with KDPipeIODevice; see the file COPYING.LIB.  If not, write to the
00016   Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
00017   Boston, MA 02110-1301, USA.
00018  */
00019 
00020 #include <config-kleopatra.h>
00021 
00022 #include "kdpipeiodevice.h"
00023 
00024 #include <QtCore>
00025 
00026 #include <cassert>
00027 #include <cstring>
00028 #include <memory>
00029 #include <algorithm>
00030 
00031 #ifdef Q_OS_WIN32
00032 # ifndef NOMINMAX
00033 #  define NOMINMAX
00034 # endif
00035 # include <windows.h>
00036 # include <io.h>
00037 #else
00038 # include <unistd.h>
00039 # include <errno.h>
00040 #endif
00041 
00042 #ifndef KDAB_CHECK_THIS
00043 # define KDAB_CHECK_CTOR (void)1
00044 # define KDAB_CHECK_DTOR KDAB_CHECK_CTOR
00045 # define KDAB_CHECK_THIS KDAB_CHECK_CTOR
00046 #endif
00047 
00048 #define LOCKED( d ) const QMutexLocker locker( &d->mutex )
00049 #define synchronized( d ) if ( int i = 0 ) {} else for ( const QMutexLocker locker( &d->mutex ) ; !i ; ++i )
00050 
00051 const unsigned int BUFFER_SIZE = 4096;
00052 const bool ALLOW_QIODEVICE_BUFFERING = true;
00053 
00054 namespace {
00055     KDPipeIODevice::DebugLevel s_debugLevel = KDPipeIODevice::NoDebug;
00056 }
00057 
00058 #define qDebug if( s_debugLevel == KDPipeIODevice::NoDebug ){}else qDebug
00059 
00060 namespace {
00061 
00062 class Reader : public QThread {
00063     Q_OBJECT
00064 public:
00065     Reader( int fd, Qt::HANDLE handle );
00066     ~Reader();
00067 
00068     qint64 readData( char * data, qint64 maxSize );
00069 
00070     unsigned int bytesInBuffer() const {
00071         return ( wptr + sizeof buffer - rptr ) % sizeof buffer ;
00072     }
00073 
00074     bool bufferFull() const {
00075         return bytesInBuffer() == sizeof buffer - 1;
00076     }
00077 
00078     bool bufferEmpty() const {
00079         return bytesInBuffer() == 0;
00080     }
00081 
00082     bool bufferContains( char ch ) {
00083         const unsigned int bib = bytesInBuffer();
00084         for ( unsigned int i = rptr ; i < rptr + bib ; ++i )
00085             if ( buffer[i%sizeof buffer] == ch )
00086                 return true;
00087         return false;
00088     }
00089 
00090     void notifyReadyRead();
00091 
00092     Q_SIGNALS:
00093     void readyRead();
00094 
00095 protected:
00096     /* reimp */ void run();
00097 
00098 private:
00099     int fd;
00100     Qt::HANDLE handle;
00101 public:
00102     QMutex mutex;
00103     QWaitCondition waitForCancelCondition;
00104     QWaitCondition bufferNotFullCondition;
00105     QWaitCondition bufferNotEmptyCondition;
00106     QWaitCondition hasStarted;
00107     QWaitCondition readyReadSentCondition;
00108     QWaitCondition blockedConsumerIsDoneCondition;
00109     bool cancel;
00110     bool eof;
00111     bool error;
00112     bool eofShortCut;
00113     int errorCode;
00114     bool isReading;
00115     bool consumerBlocksOnUs;
00116 
00117 private:
00118     unsigned int rptr, wptr;
00119     char buffer[BUFFER_SIZE+1]; // need to keep one byte free to detect empty state
00120 };
00121 
00122 
00123 Reader::Reader( int fd_, Qt::HANDLE handle_ ) : QThread(),
00124     fd( fd_ ),
00125     handle( handle_ ),
00126     mutex(),
00127     bufferNotFullCondition(),
00128     bufferNotEmptyCondition(),
00129     hasStarted(),
00130     cancel( false ),
00131     eof( false ),
00132     error( false ),
00133     eofShortCut( false ),
00134     errorCode( 0 ),
00135     isReading( false ),
00136     consumerBlocksOnUs( false ),
00137     rptr( 0 ),
00138     wptr( 0 )
00139 {
00140 
00141 }
00142 
00143 Reader::~Reader() {}
00144 
00145 
00146 class Writer : public QThread {
00147     Q_OBJECT
00148 public:
00149     Writer( int fd, Qt::HANDLE handle );
00150     ~Writer();
00151 
00152     qint64 writeData( const char * data, qint64 size );
00153 
00154     unsigned int bytesInBuffer() const { return numBytesInBuffer; }
00155 
00156     bool bufferFull() const {
00157         return numBytesInBuffer == sizeof buffer;
00158     }
00159 
00160     bool bufferEmpty() const {
00161         return numBytesInBuffer == 0;
00162     }
00163 
00164     Q_SIGNALS:
00165     void bytesWritten( qint64 );
00166 
00167 protected:
00168     /* reimp */ void run();
00169 
00170 private:
00171     int fd;
00172     Qt::HANDLE handle;
00173 public:
00174     QMutex mutex;
00175     QWaitCondition bufferEmptyCondition;
00176     QWaitCondition bufferNotEmptyCondition;
00177     QWaitCondition hasStarted;
00178     bool cancel;
00179     bool error;
00180     int errorCode;
00181 private:
00182     unsigned int numBytesInBuffer;
00183     char buffer[BUFFER_SIZE];
00184 };
00185 }
00186 
00187 Writer::Writer( int fd_, Qt::HANDLE handle_ ) : QThread(),
00188     fd( fd_ ),
00189     handle( handle_ ),
00190     mutex(),
00191     bufferEmptyCondition(),
00192     bufferNotEmptyCondition(),
00193     hasStarted(),
00194     cancel( false ),
00195     error( false ),
00196     errorCode( 0 ),
00197     numBytesInBuffer( 0 )
00198 {
00199 
00200 }
00201 
00202 Writer::~Writer() {}
00203 
00204 
00205 class KDPipeIODevice::Private : public QObject {
00206     Q_OBJECT
00207     friend class ::KDPipeIODevice;
00208     KDPipeIODevice * const q;
00209 public:
00210     
00211     explicit Private( KDPipeIODevice * qq );
00212     ~Private();
00213 
00214     bool doOpen( int, Qt::HANDLE, OpenMode );
00215     bool startReaderThread(); 
00216     bool startWriterThread(); 
00217     void stopThreads();
00218 
00219 public Q_SLOTS:
00220     void emitReadyRead();
00221 
00222 private:
00223     int fd;
00224     Qt::HANDLE handle;
00225     Reader * reader;
00226     Writer * writer;
00227     bool triedToStartReader;
00228     bool triedToStartWriter;
00229 };
00230 
00231 KDPipeIODevice::DebugLevel KDPipeIODevice::debugLevel()
00232 {
00233     return s_debugLevel;
00234 }
00235 
00236 void KDPipeIODevice::setDebugLevel( KDPipeIODevice::DebugLevel level )
00237 {
00238     s_debugLevel = level;
00239 }
00240 
00241 KDPipeIODevice::Private::Private( KDPipeIODevice * qq ) : QObject( qq ), q( qq ),
00242     fd( -1 ),
00243     handle( 0 ),
00244     reader( 0 ),
00245     writer( 0 ),
00246     triedToStartReader( false ),
00247     triedToStartWriter( false ) 
00248 {
00249 
00250 }
00251 
00252 KDPipeIODevice::Private::~Private() {
00253     qDebug( "KDPipeIODevice::~Private(): Destroying %p", q );
00254 }
00255 
00256 KDPipeIODevice::KDPipeIODevice( QObject * p )
00257 : QIODevice( p ), d( new Private( this ) )
00258 {
00259     KDAB_CHECK_CTOR;
00260 }
00261 
00262 KDPipeIODevice::KDPipeIODevice( int fd, OpenMode mode, QObject * p )
00263 : QIODevice( p ), d( new Private( this ) )
00264 {
00265     KDAB_CHECK_CTOR;
00266     open( fd, mode );
00267 }
00268 
00269 KDPipeIODevice::KDPipeIODevice( Qt::HANDLE handle, OpenMode mode, QObject * p )
00270 : QIODevice( p ), d( new Private( this ) )
00271 {
00272     KDAB_CHECK_CTOR;
00273     open( handle, mode );
00274 }
00275 
00276 KDPipeIODevice::~KDPipeIODevice() { KDAB_CHECK_DTOR;
00277     if ( isOpen() )
00278         close();
00279     delete d; d = 0;
00280 }
00281 
00282 
00283 bool KDPipeIODevice::open( int fd, OpenMode mode ) { KDAB_CHECK_THIS;
00284     #ifdef Q_OS_WIN32
00285     return d->doOpen( fd, (HANDLE)_get_osfhandle( fd ), mode );
00286     #else
00287     return d->doOpen( fd, 0, mode );
00288     #endif
00289 }
00290 
00291 bool KDPipeIODevice::open( Qt::HANDLE h, OpenMode mode ) { KDAB_CHECK_THIS;
00292     #ifdef Q_OS_WIN32
00293     return d->doOpen( -1, h, mode );
00294     #else
00295     Q_UNUSED( h );
00296     Q_UNUSED( mode );
00297     assert( !"KDPipeIODevice::open( Qt::HANDLE, OpenMode ) should never be called except on Windows." );
00298     return false;
00299     #endif
00300 }
00301 
00302 bool KDPipeIODevice::Private::startReaderThread()
00303 {
00304     if ( triedToStartReader )
00305         return true;
00306     triedToStartReader = true;    
00307     if ( reader && !reader->isRunning() && !reader->isFinished() ) {
00308         qDebug("KDPipeIODevice::Private::startReaderThread(): locking reader (CONSUMER THREAD)" );
00309         LOCKED( reader );
00310         qDebug("KDPipeIODevice::Private::startReaderThread(): locked reader (CONSUMER THREAD)" );
00311         reader->start( QThread::HighestPriority );
00312         qDebug("KDPipeIODevice::Private::startReaderThread(): waiting for hasStarted (CONSUMER THREAD)" );
00313         const bool hasStarted = reader->hasStarted.wait( &reader->mutex, 1000 );
00314         qDebug("KDPipeIODevice::Private::startReaderThread(): returned from hasStarted (CONSUMER THREAD)" );
00315 
00316         return hasStarted;
00317     }
00318     return true;
00319 }
00320 
00321 bool KDPipeIODevice::Private::startWriterThread()
00322 {
00323     if ( triedToStartWriter )
00324         return true;
00325     triedToStartWriter = true;    
00326     if ( writer && !writer->isRunning() && !writer->isFinished() ) {
00327         LOCKED( writer );
00328 
00329         writer->start( QThread::HighestPriority );
00330         if ( !writer->hasStarted.wait( &writer->mutex, 1000 ) )
00331             return false;
00332     }
00333     return true;
00334 }
00335 
00336 void KDPipeIODevice::Private::emitReadyRead()
00337 {
00338     QPointer<Private> thisPointer( this );
00339     qDebug( "KDPipeIODevice::Private::emitReadyRead %p", this );
00340 
00341     emit q->readyRead();
00342 
00343     if ( !thisPointer )
00344         return;
00345     bool mustNotify = false;
00346     if ( reader ) {
00347         qDebug( "KDPipeIODevice::Private::emitReadyRead %p: locking reader (CONSUMER THREAD)", this );
00348         synchronized( reader ) {
00349             qDebug( "KDPipeIODevice::Private::emitReadyRead %p: locked reader (CONSUMER THREAD)", this );
00350             reader->readyReadSentCondition.wakeAll();
00351             mustNotify = !reader->bufferEmpty() && reader->isReading;
00352             qDebug( "KDPipeIODevice::Private::emitReadyRead %p: buffer empty: %d reader in ReadFile: %d", this, reader->bufferEmpty(), reader->isReading );            
00353         }
00354     }
00355     qDebug( "KDPipeIODevice::Private::emitReadyRead %p leaving", this );
00356 
00357 }
00358 
00359 bool KDPipeIODevice::Private::doOpen( int fd_, Qt::HANDLE handle_, OpenMode mode_ ) {
00360 
00361     if ( q->isOpen() )
00362         return false;
00363 
00364 #ifdef Q_OS_WIN32
00365     if ( !handle_ )
00366         return false;
00367 #else
00368     if ( fd_ < 0 )
00369         return false;
00370 #endif
00371 
00372     if ( !(mode_ & ReadWrite) )
00373         return false; // need to have at least read -or- write
00374 
00375 
00376     std::auto_ptr<Reader> reader_;
00377     std::auto_ptr<Writer> writer_;
00378 
00379     if ( mode_ & ReadOnly ) {
00380         reader_.reset( new Reader( fd_, handle_ ) );
00381         qDebug( "KDPipeIODevice::doOpen (%p): created reader (%p) for fd %d", this, reader_.get(), fd_ ); 
00382         connect( reader_.get(), SIGNAL(readyRead()), this, SLOT(emitReadyRead()), 
00383                 Qt::QueuedConnection );
00384     }
00385     if ( mode_ & WriteOnly ) {
00386         writer_.reset( new Writer( fd_, handle_ ) );
00387         qDebug( "KDPipeIODevice::doOpen (%p): created writer (%p) for fd %d", this, writer_.get(), fd_ ); 
00388         connect( writer_.get(), SIGNAL(bytesWritten(qint64)), q, SIGNAL(bytesWritten(qint64)), 
00389                 Qt::QueuedConnection );
00390     }
00391 
00392     // commit to *this:
00393     fd = fd_;
00394     handle = handle_;
00395     reader = reader_.release();
00396     writer = writer_.release();
00397 
00398     q->setOpenMode( mode_|Unbuffered );
00399     return true;
00400 }
00401 
00402 int KDPipeIODevice::descriptor() const { KDAB_CHECK_THIS;
00403     return d->fd;
00404 }
00405 
00406 
00407 Qt::HANDLE KDPipeIODevice::handle() const { KDAB_CHECK_THIS;
00408     return d->handle;
00409 }
00410 
00411 qint64 KDPipeIODevice::bytesAvailable() const { KDAB_CHECK_THIS;
00412     const qint64 base = QIODevice::bytesAvailable();
00413     if ( !d->triedToStartReader ) {
00414         d->startReaderThread();
00415         return base;
00416     }
00417     if ( d->reader )
00418         synchronized( d->reader ) {
00419         const qint64 inBuffer = d->reader->bytesInBuffer();     
00420         return base + inBuffer;
00421     }
00422     return base;
00423 }
00424 
00425 qint64 KDPipeIODevice::bytesToWrite() const { KDAB_CHECK_THIS;
00426     d->startWriterThread();
00427     const qint64 base = QIODevice::bytesToWrite();
00428     if ( d->writer )
00429         synchronized( d->writer ) return base + d->writer->bytesInBuffer();
00430     return base;
00431 }
00432 
00433 bool KDPipeIODevice::canReadLine() const { KDAB_CHECK_THIS;
00434     d->startReaderThread();
00435     if ( QIODevice::canReadLine() )
00436         return true;
00437     if ( d->reader )
00438         synchronized( d->reader ) return d->reader->bufferContains( '\n' );
00439     return true;
00440 }
00441 
00442 bool KDPipeIODevice::isSequential() const {
00443     return true;
00444 }
00445 
00446 bool KDPipeIODevice::atEnd() const { KDAB_CHECK_THIS;
00447     d->startReaderThread();
00448     if ( !QIODevice::atEnd() ) {
00449         qDebug( "%p: KDPipeIODevice::atEnd returns false since QIODevice::atEnd does (with bytesAvailable=%ld)", this, static_cast<long>(bytesAvailable()) );
00450         return false;
00451     }
00452     if ( !isOpen() )
00453         return true;
00454     if ( d->reader->eofShortCut )
00455         return true;
00456     LOCKED( d->reader );
00457     const bool eof = ( d->reader->error || d->reader->eof ) && d->reader->bufferEmpty();
00458     if ( !eof ) {
00459         if ( !d->reader->error && !d->reader->eof )
00460             qDebug( "%p: KDPipeIODevice::atEnd returns false since !reader->error && !reader->eof", this );
00461         if ( !d->reader->bufferEmpty() )
00462             qDebug( "%p: KDPipeIODevice::atEnd returns false since !reader->bufferEmpty()", this );
00463     }
00464     return eof;
00465 }
00466 
00467 bool KDPipeIODevice::waitForBytesWritten( int msecs ) { KDAB_CHECK_THIS;
00468     d->startWriterThread();
00469     Writer * const w = d->writer;
00470     if ( !w )
00471         return true;
00472     LOCKED( w );
00473     qDebug( "KDPipeIODevice::waitForBytesWritten (%p,w=%p): entered locked area", this, w 
00474     ); 
00475     return w->bufferEmpty() || w->error || w->bufferEmptyCondition.wait( &w->mutex, msecs ) ;
00476     }
00477     
00478     bool KDPipeIODevice::waitForReadyRead( int msecs ) { KDAB_CHECK_THIS;
00479     qDebug( "KDPipeIODEvice::waitForReadyRead()(%p)", this);
00480     d->startReaderThread();
00481     if ( ALLOW_QIODEVICE_BUFFERING ) {
00482         if ( bytesAvailable() > 0 )
00483             return true;
00484     }
00485     Reader * const r = d->reader;
00486     if ( !r || r->eofShortCut )
00487         return true;
00488     LOCKED( r );
00489     if ( r->bytesInBuffer() != 0 || r->eof || r->error )
00490         return true;
00491     assert( false );
00492     return r->bufferNotEmptyCondition.wait( &r->mutex, msecs ) ;
00493 }
00494 
00495 template <typename T>
00496 class TemporaryValue {
00497 public:
00498     TemporaryValue( T& var_, const T& tv ) : var( var_ ), oldValue( var_ ) { var = tv; }
00499     ~TemporaryValue() { var = oldValue; }
00500 private:   
00501     T& var;
00502     const T oldValue;
00503 }; 
00504 
00505 
00506 bool KDPipeIODevice::readWouldBlock() const
00507 {
00508     d->startReaderThread();
00509     LOCKED( d->reader );
00510     return d->reader->bufferEmpty() && !d->reader->eof && !d->reader->error;
00511 }  
00512 
00513 bool KDPipeIODevice::writeWouldBlock() const
00514 {
00515     d->startWriterThread();
00516     LOCKED( d->writer );
00517     return !d->writer->bufferEmpty() && !d->writer->error;
00518 }  
00519 
00520 
00521 qint64 KDPipeIODevice::readData( char * data, qint64 maxSize ) { KDAB_CHECK_THIS;
00522     qDebug( "%p: KDPipeIODevice::readData: data=%p, maxSize=%lld", this, data, maxSize );
00523     d->startReaderThread();
00524     Reader * const r = d->reader;
00525     
00526     assert( r );
00527     
00528     
00529     //assert( r->isRunning() ); // wrong (might be eof, error)
00530     assert( data || maxSize == 0 );
00531     assert( maxSize >= 0 );
00532     
00533     if ( r->eofShortCut ) {
00534         qDebug( "%p: KDPipeIODevice::readData: hit eofShortCut, returning 0", this );
00535         return 0;
00536     }
00537     
00538     if ( maxSize < 0 )
00539         maxSize = 0;
00540     
00541     if ( ALLOW_QIODEVICE_BUFFERING ) {
00542         if ( bytesAvailable() > 0 )
00543             maxSize = std::min( maxSize, bytesAvailable() ); // don't block
00544     }
00545     qDebug( "%p: KDPipeIODevice::readData: try to lock reader (CONSUMER THREAD)", this );
00546     LOCKED( r );
00547     qDebug( "%p: KDPipeIODevice::readData: locked reader (CONSUMER THREAD)", this );
00548     
00549     r->readyReadSentCondition.wakeAll();
00550     if ( /* maxSize > 0 && */ r->bufferEmpty() &&  !r->error && !r->eof ) { // ### block on maxSize == 0?
00551         qDebug( "%p: KDPipeIODevice::readData: waiting for bufferNotEmptyCondition (CONSUMER THREAD)", this );
00552         const TemporaryValue<bool> tmp( d->reader->consumerBlocksOnUs, true );
00553         r->bufferNotEmptyCondition.wait( &r->mutex );
00554         r->blockedConsumerIsDoneCondition.wakeAll();
00555         qDebug( "%p: KDPipeIODevice::readData: woke up from bufferNotEmptyCondition (CONSUMER THREAD)", this ); 
00556     }
00557     
00558     if ( r->bufferEmpty() ) {
00559         qDebug( "%p: KDPipeIODevice::readData: got empty buffer, signal eof", this );
00560         // woken with an empty buffer must mean either EOF or error:
00561         assert( r->eof || r->error );
00562         r->eofShortCut = true;
00563         return r->eof ? 0 : -1 ;
00564     }
00565     
00566     qDebug( "%p: KDPipeIODevice::readData: got bufferNotEmptyCondition, trying to read %lld bytes", this, maxSize );
00567     const qint64 bytesRead = r->readData( data, maxSize );
00568     qDebug( "%p: KDPipeIODevice::readData: read %lld bytes", this, bytesRead );
00569     qDebug( "%p (fd=%d): KDPipeIODevice::readData: %s", this, d->fd, data );
00570     
00571     return bytesRead;
00572 }
00573 
00574 qint64 Reader::readData( char * data, qint64 maxSize ) {
00575     qint64 numRead = rptr < wptr ? wptr - rptr : sizeof buffer - rptr ;
00576     if ( numRead > maxSize )
00577         numRead = maxSize;
00578 
00579     qDebug( "%p: KDPipeIODevice::readData: data=%p, maxSize=%lld; rptr=%u, wptr=%u (bytesInBuffer=%u); -> numRead=%lld", this,
00580             data, maxSize, rptr, wptr, bytesInBuffer(), numRead );
00581 
00582     memcpy( data, buffer + rptr, numRead );
00583 
00584     rptr = ( rptr + numRead ) % sizeof buffer ;
00585 
00586     if ( !bufferFull() ) {
00587         qDebug( "%p: KDPipeIODevice::readData: signal bufferNotFullCondition", this );
00588         bufferNotFullCondition.wakeAll();
00589     }
00590 
00591     return numRead;
00592 }
00593 
00594 qint64 KDPipeIODevice::writeData( const char * data, qint64 size ) { KDAB_CHECK_THIS;
00595     d->startWriterThread();
00596     Writer * const w = d->writer;
00597     
00598     assert( w );
00599     assert( w->error || w->isRunning() );
00600     assert( data || size == 0 );
00601     assert( size >= 0 );
00602     
00603     LOCKED( w );
00604     
00605     while ( !w->error && !w->bufferEmpty() ) { 
00606         qDebug( "%p: KDPipeIODevice::writeData: wait for empty buffer", this );
00607         w->bufferEmptyCondition.wait( &w->mutex );
00608         qDebug( "%p: KDPipeIODevice::writeData: empty buffer signaled", this );
00609     
00610     }
00611     if ( w->error )
00612         return -1;
00613     
00614     assert( w->bufferEmpty() );
00615     
00616     return w->writeData( data, size );
00617 }
00618 
00619 qint64 Writer::writeData( const char * data, qint64 size ) {
00620     assert( bufferEmpty() );
00621 
00622     if ( size > static_cast<qint64>( sizeof buffer ) )
00623         size = sizeof buffer;
00624 
00625     memcpy( buffer, data, size );
00626 
00627     numBytesInBuffer = size;
00628 
00629     if ( !bufferEmpty() ) {
00630         bufferNotEmptyCondition.wakeAll();
00631     }
00632     return size;
00633 }
00634 
00635 void KDPipeIODevice::Private::stopThreads()
00636 {
00637     if ( triedToStartWriter )
00638     {
00639         if ( writer && q->bytesToWrite() > 0 )
00640             q->waitForBytesWritten( -1 );
00641 
00642         assert( q->bytesToWrite() == 0 );
00643     }
00644     if ( Reader * & r = reader ) {
00645         disconnect( r, SIGNAL( readyRead() ), this, SLOT( emitReadyRead() ) ); 
00646         synchronized( r ) {
00647             // tell thread to cancel:
00648             r->cancel = true;
00649             // and wake it, so it can terminate:
00650             r->waitForCancelCondition.wakeAll();
00651             r->bufferNotFullCondition.wakeAll();
00652             r->readyReadSentCondition.wakeAll();
00653         }
00654     }
00655     if ( Writer * & w = writer ) {
00656         synchronized( w ) {
00657             // tell thread to cancel:
00658             w->cancel = true;
00659             // and wake it, so it can terminate:
00660             w->bufferNotEmptyCondition.wakeAll();
00661         }
00662     }
00663 }
00664 
00665 void KDPipeIODevice::close() { KDAB_CHECK_THIS;
00666     qDebug( "KDPipeIODevice::close(%p)", this );
00667     if ( !isOpen() )
00668         return;
00669     
00670     // tell clients we're about to close:
00671     emit aboutToClose();
00672     d->stopThreads();
00673     
00674     #define waitAndDelete( t ) if ( t ) { t->wait(); QThread* const t2 = t; t = 0; delete t2; }
00675     qDebug( "KPipeIODevice::close(%p): wait and closing writer %p", this, d->writer );
00676     waitAndDelete( d->writer );
00677     qDebug( "KPipeIODevice::close(%p): wait and closing reader %p", this, d->reader );
00678     if ( d->reader ) {
00679         LOCKED( d->reader );
00680         d->reader->readyReadSentCondition.wakeAll();
00681     }
00682     waitAndDelete( d->reader );
00683 #undef waitAndDelete
00684 #ifdef Q_OS_WIN32
00685     if ( d->fd != -1 )
00686         _close( d->fd );
00687 else
00688     CloseHandle( d->handle );
00689 #else
00690     ::close( d->fd );
00691 #endif
00692 
00693     setOpenMode( NotOpen );
00694     d->fd = -1;
00695     d->handle = 0;
00696 }
00697 
00698 void Reader::run() {
00699 
00700     LOCKED( this );
00701 
00702     // too bad QThread doesn't have that itself; a signal isn't enough
00703     hasStarted.wakeAll();
00704 
00705     qDebug( "%p: Reader::run: started", this );
00706 
00707     while ( true ) {
00708         if ( !cancel && ( eof || error ) ) {
00709             //notify the client until the buffer is empty and then once 
00710             //again so he receives eof/error. After that, wait for him 
00711             //to cancel 
00712             const bool wasEmpty = bufferEmpty();
00713             qDebug( "%p: Reader::run: received eof(%d) or error(%d), waking everyone", this, eof, error );
00714             notifyReadyRead();
00715             if ( !cancel && wasEmpty ) 
00716                 waitForCancelCondition.wait( &mutex );
00717         } else if ( !cancel && !bufferFull() && !bufferEmpty() ) {
00718             qDebug( "%p: Reader::run: buffer no longer empty, waking everyone", this );
00719             notifyReadyRead();
00720         } 
00721 
00722         while ( !cancel && !error && bufferFull() ) {
00723             notifyReadyRead();
00724             if ( !cancel && bufferFull() ) {
00725                 qDebug( "%p: Reader::run: buffer is full, going to sleep", this );
00726                 bufferNotFullCondition.wait( &mutex );
00727             }
00728         }
00729 
00730         if ( cancel ) {
00731             qDebug( "%p: Reader::run: detected cancel", this );
00732             goto leave;
00733         }
00734 
00735         if ( !eof && !error ) {
00736             if ( rptr == wptr ) // optimize for larger chunks in case the buffer is empty
00737                 rptr = wptr = 0;
00738 
00739             unsigned int numBytes = ( rptr + sizeof buffer - wptr - 1 ) % sizeof buffer;
00740             if ( numBytes > sizeof buffer - wptr )
00741                 numBytes = sizeof buffer - wptr;
00742 
00743             qDebug( "%p: Reader::run: rptr=%d, wptr=%d -> numBytes=%d", this, rptr, wptr, numBytes );
00744 
00745             assert( numBytes > 0 );
00746 
00747             qDebug( "%p: Reader::run: trying to read %d bytes", this, numBytes );
00748 #ifdef Q_OS_WIN32
00749             isReading = true;
00750             mutex.unlock();
00751             DWORD numRead;
00752             const bool ok = ReadFile( handle, buffer + wptr, numBytes, &numRead, 0 );
00753             mutex.lock();
00754             isReading = false;
00755             if ( ok ) {
00756                 if ( numRead == 0 ) {
00757                     qDebug( "%p: Reader::run: got eof (numRead==0)", this );
00758                     eof = true;
00759                 } 
00760             } else { // !ok
00761                 errorCode = static_cast<int>( GetLastError() );
00762                 if ( errorCode == ERROR_BROKEN_PIPE ) {
00763                     assert( numRead == 0 );
00764                     qDebug( "%p: Reader::run: got eof (broken pipe)", this );
00765                     eof = true;
00766                 } else {
00767                     assert( numRead == 0 );
00768                     qDebug( "%p: Reader::run: got error: %s (%d)", this, strerror( errorCode ), errorCode );
00769                     error = true;
00770                 }
00771             }
00772 #else
00773             qint64 numRead;
00774             mutex.unlock();
00775             do {
00776                 numRead = ::read( fd, buffer + wptr, numBytes );
00777             } while ( numRead == -1 && errno == EINTR );
00778             mutex.lock();
00779 
00780             if ( numRead < 0 ) {
00781                 errorCode = errno;
00782                 error = true;
00783                 qDebug( "%p: Reader::run: got error: %d", this, errorCode );
00784             } else if ( numRead == 0 ) {
00785                 qDebug( "%p: Reader::run: eof detected", this );  
00786                 eof = true;
00787             }
00788 #endif
00789             qDebug( "%p (fd=%d): Reader::run: read %ld bytes", this, fd, static_cast<long>(numRead) );
00790             qDebug( "%p (fd=%d): Reader::run: %s", this, fd, buffer );
00791 
00792             if ( numRead > 0 ) {
00793                 qDebug( "%p: Reader::run: buffer before: rptr=%4d, wptr=%4d", this, rptr, wptr );
00794                 wptr = ( wptr + numRead ) % sizeof buffer;
00795                 qDebug( "%p: Reader::run: buffer after:  rptr=%4d, wptr=%4d", this, rptr, wptr );
00796             }
00797         }
00798     }
00799     leave:
00800     qDebug( "%p: Reader::run: terminated", this );
00801 }
00802 
00803 void Reader::notifyReadyRead()
00804 {
00805     qDebug( "notifyReadyRead: %d bytes available", bytesInBuffer() );
00806     assert( !cancel );
00807 
00808     if ( consumerBlocksOnUs ) {
00809         bufferNotEmptyCondition.wakeAll();
00810         blockedConsumerIsDoneCondition.wait( &mutex );
00811         return;
00812     }
00813     qDebug( "notifyReadyRead: emit signal" );
00814     emit readyRead();
00815     readyReadSentCondition.wait( &mutex );
00816     qDebug( "notifyReadyRead: returning from waiting, leave" );
00817 }
00818 
00819 void Writer::run() {
00820 
00821     LOCKED( this );
00822 
00823     // too bad QThread doesn't have that itself; a signal isn't enough
00824     hasStarted.wakeAll();
00825 
00826     qDebug( "%p: Writer::run: started", this );
00827 
00828     while ( true ) {
00829 
00830         while ( !cancel && bufferEmpty() ) {
00831             qDebug( "%p: Writer::run: buffer is empty, wake bufferEmptyCond listeners", this );
00832             bufferEmptyCondition.wakeAll();
00833             emit bytesWritten( 0 );
00834             qDebug( "%p: Writer::run: buffer is empty, going to sleep", this );
00835             bufferNotEmptyCondition.wait( &mutex );
00836             qDebug( "%p: Writer::run: woke up", this );
00837         }
00838 
00839         if ( cancel ) {
00840             qDebug( "%p: Writer::run: detected cancel", this );
00841             goto leave;
00842         }
00843 
00844         assert( numBytesInBuffer > 0 );
00845 
00846         qDebug( "%p: Writer::run: Trying to write %u bytes", this, numBytesInBuffer );
00847         qint64 totalWritten = 0;
00848         do {  
00849             mutex.unlock();
00850 #ifdef Q_OS_WIN32
00851             DWORD numWritten;
00852             qDebug( "%p (fd=%d): Writer::run: buffer before WriteFile (numBytes=%lld): %s:", this, fd, numBytesInBuffer, buffer ); 
00853             qDebug( "%p (fd=%d): Writer::run: Going into WriteFile", this, fd );
00854             if ( !WriteFile( handle, buffer + totalWritten, numBytesInBuffer - totalWritten, &numWritten, 0 ) ) {
00855                 mutex.lock();
00856                 errorCode = static_cast<int>( GetLastError() );
00857                 qDebug( "%p: Writer::run: got error code: %d", this, errorCode );
00858                 error = true;
00859                 goto leave;
00860             }
00861 #else
00862             qint64 numWritten;
00863             do {
00864                 numWritten = ::write( fd, buffer + totalWritten, numBytesInBuffer - totalWritten );
00865             } while ( numWritten == -1 && errno == EINTR );
00866 
00867             if ( numWritten < 0 ) {
00868                 mutex.lock();
00869                 errorCode = errno;
00870                 qDebug( "%p: Writer::run: got error code: %s (%d)", this, strerror( errorCode ), errorCode );
00871                 error = true;
00872                 goto leave;
00873             }
00874 #endif
00875             qDebug( "%p (fd=%d): Writer::run: buffer after WriteFile (numBytes=%u): %s:", this, fd, numBytesInBuffer, buffer );
00876             totalWritten += numWritten;
00877             mutex.lock();
00878         } while ( totalWritten < numBytesInBuffer );
00879 
00880         qDebug( "%p: Writer::run: wrote %lld bytes", this, totalWritten );
00881 
00882         numBytesInBuffer = 0;
00883 
00884         qDebug( "%p: Writer::run: buffer is empty, wake bufferEmptyCond listeners", this );
00885         bufferEmptyCondition.wakeAll();
00886         emit bytesWritten( totalWritten );
00887     }
00888     leave:
00889     qDebug( "%p: Writer::run: terminating", this );
00890     numBytesInBuffer = 0;
00891     qDebug( "%p: Writer::run: buffer is empty, wake bufferEmptyCond listeners", this );
00892     bufferEmptyCondition.wakeAll();
00893     emit bytesWritten( 0 );
00894 }
00895 
00896 // static 
00897 std::pair<KDPipeIODevice*,KDPipeIODevice*> KDPipeIODevice::makePairOfConnectedPipes() {
00898     KDPipeIODevice * read = 0;
00899     KDPipeIODevice * write = 0;
00900 #ifdef Q_OS_WIN32
00901     HANDLE rh;
00902     HANDLE wh;
00903     SECURITY_ATTRIBUTES sa;
00904     memset( &sa, 0, sizeof(sa) );
00905     sa.nLength = sizeof(sa);
00906     sa.bInheritHandle = TRUE;
00907     if ( CreatePipe( &rh, &wh, &sa, BUFFER_SIZE ) ) {
00908         read = new KDPipeIODevice;
00909         read->open( rh, ReadOnly );
00910         write = new KDPipeIODevice;
00911         write->open( wh, WriteOnly );
00912     }
00913 #else
00914     int fds[2];
00915     if ( pipe( fds ) == 0 ) {
00916         read = new KDPipeIODevice;
00917         read->open( fds[0], ReadOnly );
00918         write = new KDPipeIODevice;
00919         write->open( fds[1], WriteOnly );
00920     }
00921 #endif
00922     return std::make_pair( read, write );
00923 }
00924 
00925 #ifdef KDAB_DEFINE_CHECKS
00926 KDAB_DEFINE_CHECKS( KDPipeIODevice ) {
00927     if ( !isOpen() ) {
00928         assert( openMode() == NotOpen );
00929         assert( !d->reader );
00930         assert( !d->writer );
00931 #ifdef Q_OS_WIN32
00932         assert( !d->handle );
00933 #else
00934         assert( d->fd < 0 );
00935 #endif
00936     } else {
00937         assert( openMode() != NotOpen );
00938         assert( openMode() & ReadWrite );
00939         if ( openMode() & ReadOnly ) {
00940             assert( d->reader );
00941             synchronized( d->reader )
00942             assert( d->reader->eof || d->reader->error || d->reader->isRunning() );
00943         }
00944         if ( openMode() & WriteOnly ) {
00945             assert( d->writer );
00946             synchronized( d->writer )
00947             assert( d->writer->error || d->writer->isRunning() );
00948         }
00949 #ifdef Q_OS_WIN32
00950         assert( d->handle );
00951 #else
00952         assert( d->fd >= 0 );
00953 #endif
00954     }
00955 }
00956 #endif // KDAB_DEFINE_CHECKS
00957 
00958 #include "moc_kdpipeiodevice.cpp"
00959 #include "kdpipeiodevice.moc"

kleopatra

Skip menu "kleopatra"
  • Main Page
  • Namespace List
  • Class Hierarchy
  • Alphabetical List
  • Class List
  • File List
  • Namespace Members
  • Class Members