00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include <config-kleopatra.h>
00021
00022 #include "kdpipeiodevice.h"
00023
00024 #include <QtCore>
00025
00026 #include <cassert>
00027 #include <cstring>
00028 #include <memory>
00029 #include <algorithm>
00030
00031 #ifdef Q_OS_WIN32
00032 # ifndef NOMINMAX
00033 # define NOMINMAX
00034 # endif
00035 # include <windows.h>
00036 # include <io.h>
00037 #else
00038 # include <unistd.h>
00039 # include <errno.h>
00040 #endif
00041
00042 #ifndef KDAB_CHECK_THIS
00043 # define KDAB_CHECK_CTOR (void)1
00044 # define KDAB_CHECK_DTOR KDAB_CHECK_CTOR
00045 # define KDAB_CHECK_THIS KDAB_CHECK_CTOR
00046 #endif
00047
00048 #define LOCKED( d ) const QMutexLocker locker( &d->mutex )
00049 #define synchronized( d ) if ( int i = 0 ) {} else for ( const QMutexLocker locker( &d->mutex ) ; !i ; ++i )
00050
00051 const unsigned int BUFFER_SIZE = 4096;
00052 const bool ALLOW_QIODEVICE_BUFFERING = true;
00053
00054 namespace {
00055 KDPipeIODevice::DebugLevel s_debugLevel = KDPipeIODevice::NoDebug;
00056 }
00057
00058 #define qDebug if( s_debugLevel == KDPipeIODevice::NoDebug ){}else qDebug
00059
00060 namespace {
00061
00062 class Reader : public QThread {
00063 Q_OBJECT
00064 public:
00065 Reader( int fd, Qt::HANDLE handle );
00066 ~Reader();
00067
00068 qint64 readData( char * data, qint64 maxSize );
00069
00070 unsigned int bytesInBuffer() const {
00071 return ( wptr + sizeof buffer - rptr ) % sizeof buffer ;
00072 }
00073
00074 bool bufferFull() const {
00075 return bytesInBuffer() == sizeof buffer - 1;
00076 }
00077
00078 bool bufferEmpty() const {
00079 return bytesInBuffer() == 0;
00080 }
00081
00082 bool bufferContains( char ch ) {
00083 const unsigned int bib = bytesInBuffer();
00084 for ( unsigned int i = rptr ; i < rptr + bib ; ++i )
00085 if ( buffer[i%sizeof buffer] == ch )
00086 return true;
00087 return false;
00088 }
00089
00090 void notifyReadyRead();
00091
00092 Q_SIGNALS:
00093 void readyRead();
00094
00095 protected:
00096 void run();
00097
00098 private:
00099 int fd;
00100 Qt::HANDLE handle;
00101 public:
00102 QMutex mutex;
00103 QWaitCondition waitForCancelCondition;
00104 QWaitCondition bufferNotFullCondition;
00105 QWaitCondition bufferNotEmptyCondition;
00106 QWaitCondition hasStarted;
00107 QWaitCondition readyReadSentCondition;
00108 QWaitCondition blockedConsumerIsDoneCondition;
00109 bool cancel;
00110 bool eof;
00111 bool error;
00112 bool eofShortCut;
00113 int errorCode;
00114 bool isReading;
00115 bool consumerBlocksOnUs;
00116
00117 private:
00118 unsigned int rptr, wptr;
00119 char buffer[BUFFER_SIZE+1];
00120 };
00121
00122
00123 Reader::Reader( int fd_, Qt::HANDLE handle_ ) : QThread(),
00124 fd( fd_ ),
00125 handle( handle_ ),
00126 mutex(),
00127 bufferNotFullCondition(),
00128 bufferNotEmptyCondition(),
00129 hasStarted(),
00130 cancel( false ),
00131 eof( false ),
00132 error( false ),
00133 eofShortCut( false ),
00134 errorCode( 0 ),
00135 isReading( false ),
00136 consumerBlocksOnUs( false ),
00137 rptr( 0 ),
00138 wptr( 0 )
00139 {
00140
00141 }
00142
00143 Reader::~Reader() {}
00144
00145
00146 class Writer : public QThread {
00147 Q_OBJECT
00148 public:
00149 Writer( int fd, Qt::HANDLE handle );
00150 ~Writer();
00151
00152 qint64 writeData( const char * data, qint64 size );
00153
00154 unsigned int bytesInBuffer() const { return numBytesInBuffer; }
00155
00156 bool bufferFull() const {
00157 return numBytesInBuffer == sizeof buffer;
00158 }
00159
00160 bool bufferEmpty() const {
00161 return numBytesInBuffer == 0;
00162 }
00163
00164 Q_SIGNALS:
00165 void bytesWritten( qint64 );
00166
00167 protected:
00168 void run();
00169
00170 private:
00171 int fd;
00172 Qt::HANDLE handle;
00173 public:
00174 QMutex mutex;
00175 QWaitCondition bufferEmptyCondition;
00176 QWaitCondition bufferNotEmptyCondition;
00177 QWaitCondition hasStarted;
00178 bool cancel;
00179 bool error;
00180 int errorCode;
00181 private:
00182 unsigned int numBytesInBuffer;
00183 char buffer[BUFFER_SIZE];
00184 };
00185 }
00186
00187 Writer::Writer( int fd_, Qt::HANDLE handle_ ) : QThread(),
00188 fd( fd_ ),
00189 handle( handle_ ),
00190 mutex(),
00191 bufferEmptyCondition(),
00192 bufferNotEmptyCondition(),
00193 hasStarted(),
00194 cancel( false ),
00195 error( false ),
00196 errorCode( 0 ),
00197 numBytesInBuffer( 0 )
00198 {
00199
00200 }
00201
00202 Writer::~Writer() {}
00203
00204
00205 class KDPipeIODevice::Private : public QObject {
00206 Q_OBJECT
00207 friend class ::KDPipeIODevice;
00208 KDPipeIODevice * const q;
00209 public:
00210
00211 explicit Private( KDPipeIODevice * qq );
00212 ~Private();
00213
00214 bool doOpen( int, Qt::HANDLE, OpenMode );
00215 bool startReaderThread();
00216 bool startWriterThread();
00217 void stopThreads();
00218
00219 public Q_SLOTS:
00220 void emitReadyRead();
00221
00222 private:
00223 int fd;
00224 Qt::HANDLE handle;
00225 Reader * reader;
00226 Writer * writer;
00227 bool triedToStartReader;
00228 bool triedToStartWriter;
00229 };
00230
00231 KDPipeIODevice::DebugLevel KDPipeIODevice::debugLevel()
00232 {
00233 return s_debugLevel;
00234 }
00235
00236 void KDPipeIODevice::setDebugLevel( KDPipeIODevice::DebugLevel level )
00237 {
00238 s_debugLevel = level;
00239 }
00240
00241 KDPipeIODevice::Private::Private( KDPipeIODevice * qq ) : QObject( qq ), q( qq ),
00242 fd( -1 ),
00243 handle( 0 ),
00244 reader( 0 ),
00245 writer( 0 ),
00246 triedToStartReader( false ),
00247 triedToStartWriter( false )
00248 {
00249
00250 }
00251
00252 KDPipeIODevice::Private::~Private() {
00253 qDebug( "KDPipeIODevice::~Private(): Destroying %p", q );
00254 }
00255
00256 KDPipeIODevice::KDPipeIODevice( QObject * p )
00257 : QIODevice( p ), d( new Private( this ) )
00258 {
00259 KDAB_CHECK_CTOR;
00260 }
00261
00262 KDPipeIODevice::KDPipeIODevice( int fd, OpenMode mode, QObject * p )
00263 : QIODevice( p ), d( new Private( this ) )
00264 {
00265 KDAB_CHECK_CTOR;
00266 open( fd, mode );
00267 }
00268
00269 KDPipeIODevice::KDPipeIODevice( Qt::HANDLE handle, OpenMode mode, QObject * p )
00270 : QIODevice( p ), d( new Private( this ) )
00271 {
00272 KDAB_CHECK_CTOR;
00273 open( handle, mode );
00274 }
00275
00276 KDPipeIODevice::~KDPipeIODevice() { KDAB_CHECK_DTOR;
00277 if ( isOpen() )
00278 close();
00279 delete d; d = 0;
00280 }
00281
00282
00283 bool KDPipeIODevice::open( int fd, OpenMode mode ) { KDAB_CHECK_THIS;
00284 #ifdef Q_OS_WIN32
00285 return d->doOpen( fd, (HANDLE)_get_osfhandle( fd ), mode );
00286 #else
00287 return d->doOpen( fd, 0, mode );
00288 #endif
00289 }
00290
00291 bool KDPipeIODevice::open( Qt::HANDLE h, OpenMode mode ) { KDAB_CHECK_THIS;
00292 #ifdef Q_OS_WIN32
00293 return d->doOpen( -1, h, mode );
00294 #else
00295 Q_UNUSED( h );
00296 Q_UNUSED( mode );
00297 assert( !"KDPipeIODevice::open( Qt::HANDLE, OpenMode ) should never be called except on Windows." );
00298 return false;
00299 #endif
00300 }
00301
00302 bool KDPipeIODevice::Private::startReaderThread()
00303 {
00304 if ( triedToStartReader )
00305 return true;
00306 triedToStartReader = true;
00307 if ( reader && !reader->isRunning() && !reader->isFinished() ) {
00308 qDebug("KDPipeIODevice::Private::startReaderThread(): locking reader (CONSUMER THREAD)" );
00309 LOCKED( reader );
00310 qDebug("KDPipeIODevice::Private::startReaderThread(): locked reader (CONSUMER THREAD)" );
00311 reader->start( QThread::HighestPriority );
00312 qDebug("KDPipeIODevice::Private::startReaderThread(): waiting for hasStarted (CONSUMER THREAD)" );
00313 const bool hasStarted = reader->hasStarted.wait( &reader->mutex, 1000 );
00314 qDebug("KDPipeIODevice::Private::startReaderThread(): returned from hasStarted (CONSUMER THREAD)" );
00315
00316 return hasStarted;
00317 }
00318 return true;
00319 }
00320
00321 bool KDPipeIODevice::Private::startWriterThread()
00322 {
00323 if ( triedToStartWriter )
00324 return true;
00325 triedToStartWriter = true;
00326 if ( writer && !writer->isRunning() && !writer->isFinished() ) {
00327 LOCKED( writer );
00328
00329 writer->start( QThread::HighestPriority );
00330 if ( !writer->hasStarted.wait( &writer->mutex, 1000 ) )
00331 return false;
00332 }
00333 return true;
00334 }
00335
00336 void KDPipeIODevice::Private::emitReadyRead()
00337 {
00338 QPointer<Private> thisPointer( this );
00339 qDebug( "KDPipeIODevice::Private::emitReadyRead %p", this );
00340
00341 emit q->readyRead();
00342
00343 if ( !thisPointer )
00344 return;
00345 bool mustNotify = false;
00346 if ( reader ) {
00347 qDebug( "KDPipeIODevice::Private::emitReadyRead %p: locking reader (CONSUMER THREAD)", this );
00348 synchronized( reader ) {
00349 qDebug( "KDPipeIODevice::Private::emitReadyRead %p: locked reader (CONSUMER THREAD)", this );
00350 reader->readyReadSentCondition.wakeAll();
00351 mustNotify = !reader->bufferEmpty() && reader->isReading;
00352 qDebug( "KDPipeIODevice::Private::emitReadyRead %p: buffer empty: %d reader in ReadFile: %d", this, reader->bufferEmpty(), reader->isReading );
00353 }
00354 }
00355 qDebug( "KDPipeIODevice::Private::emitReadyRead %p leaving", this );
00356
00357 }
00358
00359 bool KDPipeIODevice::Private::doOpen( int fd_, Qt::HANDLE handle_, OpenMode mode_ ) {
00360
00361 if ( q->isOpen() )
00362 return false;
00363
00364 #ifdef Q_OS_WIN32
00365 if ( !handle_ )
00366 return false;
00367 #else
00368 if ( fd_ < 0 )
00369 return false;
00370 #endif
00371
00372 if ( !(mode_ & ReadWrite) )
00373 return false;
00374
00375
00376 std::auto_ptr<Reader> reader_;
00377 std::auto_ptr<Writer> writer_;
00378
00379 if ( mode_ & ReadOnly ) {
00380 reader_.reset( new Reader( fd_, handle_ ) );
00381 qDebug( "KDPipeIODevice::doOpen (%p): created reader (%p) for fd %d", this, reader_.get(), fd_ );
00382 connect( reader_.get(), SIGNAL(readyRead()), this, SLOT(emitReadyRead()),
00383 Qt::QueuedConnection );
00384 }
00385 if ( mode_ & WriteOnly ) {
00386 writer_.reset( new Writer( fd_, handle_ ) );
00387 qDebug( "KDPipeIODevice::doOpen (%p): created writer (%p) for fd %d", this, writer_.get(), fd_ );
00388 connect( writer_.get(), SIGNAL(bytesWritten(qint64)), q, SIGNAL(bytesWritten(qint64)),
00389 Qt::QueuedConnection );
00390 }
00391
00392
00393 fd = fd_;
00394 handle = handle_;
00395 reader = reader_.release();
00396 writer = writer_.release();
00397
00398 q->setOpenMode( mode_|Unbuffered );
00399 return true;
00400 }
00401
00402 int KDPipeIODevice::descriptor() const { KDAB_CHECK_THIS;
00403 return d->fd;
00404 }
00405
00406
00407 Qt::HANDLE KDPipeIODevice::handle() const { KDAB_CHECK_THIS;
00408 return d->handle;
00409 }
00410
00411 qint64 KDPipeIODevice::bytesAvailable() const { KDAB_CHECK_THIS;
00412 const qint64 base = QIODevice::bytesAvailable();
00413 if ( !d->triedToStartReader ) {
00414 d->startReaderThread();
00415 return base;
00416 }
00417 if ( d->reader )
00418 synchronized( d->reader ) {
00419 const qint64 inBuffer = d->reader->bytesInBuffer();
00420 return base + inBuffer;
00421 }
00422 return base;
00423 }
00424
00425 qint64 KDPipeIODevice::bytesToWrite() const { KDAB_CHECK_THIS;
00426 d->startWriterThread();
00427 const qint64 base = QIODevice::bytesToWrite();
00428 if ( d->writer )
00429 synchronized( d->writer ) return base + d->writer->bytesInBuffer();
00430 return base;
00431 }
00432
00433 bool KDPipeIODevice::canReadLine() const { KDAB_CHECK_THIS;
00434 d->startReaderThread();
00435 if ( QIODevice::canReadLine() )
00436 return true;
00437 if ( d->reader )
00438 synchronized( d->reader ) return d->reader->bufferContains( '\n' );
00439 return true;
00440 }
00441
00442 bool KDPipeIODevice::isSequential() const {
00443 return true;
00444 }
00445
00446 bool KDPipeIODevice::atEnd() const { KDAB_CHECK_THIS;
00447 d->startReaderThread();
00448 if ( !QIODevice::atEnd() ) {
00449 qDebug( "%p: KDPipeIODevice::atEnd returns false since QIODevice::atEnd does (with bytesAvailable=%ld)", this, static_cast<long>(bytesAvailable()) );
00450 return false;
00451 }
00452 if ( !isOpen() )
00453 return true;
00454 if ( d->reader->eofShortCut )
00455 return true;
00456 LOCKED( d->reader );
00457 const bool eof = ( d->reader->error || d->reader->eof ) && d->reader->bufferEmpty();
00458 if ( !eof ) {
00459 if ( !d->reader->error && !d->reader->eof )
00460 qDebug( "%p: KDPipeIODevice::atEnd returns false since !reader->error && !reader->eof", this );
00461 if ( !d->reader->bufferEmpty() )
00462 qDebug( "%p: KDPipeIODevice::atEnd returns false since !reader->bufferEmpty()", this );
00463 }
00464 return eof;
00465 }
00466
00467 bool KDPipeIODevice::waitForBytesWritten( int msecs ) { KDAB_CHECK_THIS;
00468 d->startWriterThread();
00469 Writer * const w = d->writer;
00470 if ( !w )
00471 return true;
00472 LOCKED( w );
00473 qDebug( "KDPipeIODevice::waitForBytesWritten (%p,w=%p): entered locked area", this, w
00474 );
00475 return w->bufferEmpty() || w->error || w->bufferEmptyCondition.wait( &w->mutex, msecs ) ;
00476 }
00477
00478 bool KDPipeIODevice::waitForReadyRead( int msecs ) { KDAB_CHECK_THIS;
00479 qDebug( "KDPipeIODEvice::waitForReadyRead()(%p)", this);
00480 d->startReaderThread();
00481 if ( ALLOW_QIODEVICE_BUFFERING ) {
00482 if ( bytesAvailable() > 0 )
00483 return true;
00484 }
00485 Reader * const r = d->reader;
00486 if ( !r || r->eofShortCut )
00487 return true;
00488 LOCKED( r );
00489 if ( r->bytesInBuffer() != 0 || r->eof || r->error )
00490 return true;
00491 assert( false );
00492 return r->bufferNotEmptyCondition.wait( &r->mutex, msecs ) ;
00493 }
00494
00495 template <typename T>
00496 class TemporaryValue {
00497 public:
00498 TemporaryValue( T& var_, const T& tv ) : var( var_ ), oldValue( var_ ) { var = tv; }
00499 ~TemporaryValue() { var = oldValue; }
00500 private:
00501 T& var;
00502 const T oldValue;
00503 };
00504
00505
00506 bool KDPipeIODevice::readWouldBlock() const
00507 {
00508 d->startReaderThread();
00509 LOCKED( d->reader );
00510 return d->reader->bufferEmpty() && !d->reader->eof && !d->reader->error;
00511 }
00512
00513 bool KDPipeIODevice::writeWouldBlock() const
00514 {
00515 d->startWriterThread();
00516 LOCKED( d->writer );
00517 return !d->writer->bufferEmpty() && !d->writer->error;
00518 }
00519
00520
00521 qint64 KDPipeIODevice::readData( char * data, qint64 maxSize ) { KDAB_CHECK_THIS;
00522 qDebug( "%p: KDPipeIODevice::readData: data=%p, maxSize=%lld", this, data, maxSize );
00523 d->startReaderThread();
00524 Reader * const r = d->reader;
00525
00526 assert( r );
00527
00528
00529
00530 assert( data || maxSize == 0 );
00531 assert( maxSize >= 0 );
00532
00533 if ( r->eofShortCut ) {
00534 qDebug( "%p: KDPipeIODevice::readData: hit eofShortCut, returning 0", this );
00535 return 0;
00536 }
00537
00538 if ( maxSize < 0 )
00539 maxSize = 0;
00540
00541 if ( ALLOW_QIODEVICE_BUFFERING ) {
00542 if ( bytesAvailable() > 0 )
00543 maxSize = std::min( maxSize, bytesAvailable() );
00544 }
00545 qDebug( "%p: KDPipeIODevice::readData: try to lock reader (CONSUMER THREAD)", this );
00546 LOCKED( r );
00547 qDebug( "%p: KDPipeIODevice::readData: locked reader (CONSUMER THREAD)", this );
00548
00549 r->readyReadSentCondition.wakeAll();
00550 if ( r->bufferEmpty() && !r->error && !r->eof ) {
00551 qDebug( "%p: KDPipeIODevice::readData: waiting for bufferNotEmptyCondition (CONSUMER THREAD)", this );
00552 const TemporaryValue<bool> tmp( d->reader->consumerBlocksOnUs, true );
00553 r->bufferNotEmptyCondition.wait( &r->mutex );
00554 r->blockedConsumerIsDoneCondition.wakeAll();
00555 qDebug( "%p: KDPipeIODevice::readData: woke up from bufferNotEmptyCondition (CONSUMER THREAD)", this );
00556 }
00557
00558 if ( r->bufferEmpty() ) {
00559 qDebug( "%p: KDPipeIODevice::readData: got empty buffer, signal eof", this );
00560
00561 assert( r->eof || r->error );
00562 r->eofShortCut = true;
00563 return r->eof ? 0 : -1 ;
00564 }
00565
00566 qDebug( "%p: KDPipeIODevice::readData: got bufferNotEmptyCondition, trying to read %lld bytes", this, maxSize );
00567 const qint64 bytesRead = r->readData( data, maxSize );
00568 qDebug( "%p: KDPipeIODevice::readData: read %lld bytes", this, bytesRead );
00569 qDebug( "%p (fd=%d): KDPipeIODevice::readData: %s", this, d->fd, data );
00570
00571 return bytesRead;
00572 }
00573
00574 qint64 Reader::readData( char * data, qint64 maxSize ) {
00575 qint64 numRead = rptr < wptr ? wptr - rptr : sizeof buffer - rptr ;
00576 if ( numRead > maxSize )
00577 numRead = maxSize;
00578
00579 qDebug( "%p: KDPipeIODevice::readData: data=%p, maxSize=%lld; rptr=%u, wptr=%u (bytesInBuffer=%u); -> numRead=%lld", this,
00580 data, maxSize, rptr, wptr, bytesInBuffer(), numRead );
00581
00582 memcpy( data, buffer + rptr, numRead );
00583
00584 rptr = ( rptr + numRead ) % sizeof buffer ;
00585
00586 if ( !bufferFull() ) {
00587 qDebug( "%p: KDPipeIODevice::readData: signal bufferNotFullCondition", this );
00588 bufferNotFullCondition.wakeAll();
00589 }
00590
00591 return numRead;
00592 }
00593
00594 qint64 KDPipeIODevice::writeData( const char * data, qint64 size ) { KDAB_CHECK_THIS;
00595 d->startWriterThread();
00596 Writer * const w = d->writer;
00597
00598 assert( w );
00599 assert( w->error || w->isRunning() );
00600 assert( data || size == 0 );
00601 assert( size >= 0 );
00602
00603 LOCKED( w );
00604
00605 while ( !w->error && !w->bufferEmpty() ) {
00606 qDebug( "%p: KDPipeIODevice::writeData: wait for empty buffer", this );
00607 w->bufferEmptyCondition.wait( &w->mutex );
00608 qDebug( "%p: KDPipeIODevice::writeData: empty buffer signaled", this );
00609
00610 }
00611 if ( w->error )
00612 return -1;
00613
00614 assert( w->bufferEmpty() );
00615
00616 return w->writeData( data, size );
00617 }
00618
00619 qint64 Writer::writeData( const char * data, qint64 size ) {
00620 assert( bufferEmpty() );
00621
00622 if ( size > static_cast<qint64>( sizeof buffer ) )
00623 size = sizeof buffer;
00624
00625 memcpy( buffer, data, size );
00626
00627 numBytesInBuffer = size;
00628
00629 if ( !bufferEmpty() ) {
00630 bufferNotEmptyCondition.wakeAll();
00631 }
00632 return size;
00633 }
00634
00635 void KDPipeIODevice::Private::stopThreads()
00636 {
00637 if ( triedToStartWriter )
00638 {
00639 if ( writer && q->bytesToWrite() > 0 )
00640 q->waitForBytesWritten( -1 );
00641
00642 assert( q->bytesToWrite() == 0 );
00643 }
00644 if ( Reader * & r = reader ) {
00645 disconnect( r, SIGNAL( readyRead() ), this, SLOT( emitReadyRead() ) );
00646 synchronized( r ) {
00647
00648 r->cancel = true;
00649
00650 r->waitForCancelCondition.wakeAll();
00651 r->bufferNotFullCondition.wakeAll();
00652 r->readyReadSentCondition.wakeAll();
00653 }
00654 }
00655 if ( Writer * & w = writer ) {
00656 synchronized( w ) {
00657
00658 w->cancel = true;
00659
00660 w->bufferNotEmptyCondition.wakeAll();
00661 }
00662 }
00663 }
00664
00665 void KDPipeIODevice::close() { KDAB_CHECK_THIS;
00666 qDebug( "KDPipeIODevice::close(%p)", this );
00667 if ( !isOpen() )
00668 return;
00669
00670
00671 emit aboutToClose();
00672 d->stopThreads();
00673
00674 #define waitAndDelete( t ) if ( t ) { t->wait(); QThread* const t2 = t; t = 0; delete t2; }
00675 qDebug( "KPipeIODevice::close(%p): wait and closing writer %p", this, d->writer );
00676 waitAndDelete( d->writer );
00677 qDebug( "KPipeIODevice::close(%p): wait and closing reader %p", this, d->reader );
00678 if ( d->reader ) {
00679 LOCKED( d->reader );
00680 d->reader->readyReadSentCondition.wakeAll();
00681 }
00682 waitAndDelete( d->reader );
00683 #undef waitAndDelete
00684 #ifdef Q_OS_WIN32
00685 if ( d->fd != -1 )
00686 _close( d->fd );
00687 else
00688 CloseHandle( d->handle );
00689 #else
00690 ::close( d->fd );
00691 #endif
00692
00693 setOpenMode( NotOpen );
00694 d->fd = -1;
00695 d->handle = 0;
00696 }
00697
00698 void Reader::run() {
00699
00700 LOCKED( this );
00701
00702
00703 hasStarted.wakeAll();
00704
00705 qDebug( "%p: Reader::run: started", this );
00706
00707 while ( true ) {
00708 if ( !cancel && ( eof || error ) ) {
00709
00710
00711
00712 const bool wasEmpty = bufferEmpty();
00713 qDebug( "%p: Reader::run: received eof(%d) or error(%d), waking everyone", this, eof, error );
00714 notifyReadyRead();
00715 if ( !cancel && wasEmpty )
00716 waitForCancelCondition.wait( &mutex );
00717 } else if ( !cancel && !bufferFull() && !bufferEmpty() ) {
00718 qDebug( "%p: Reader::run: buffer no longer empty, waking everyone", this );
00719 notifyReadyRead();
00720 }
00721
00722 while ( !cancel && !error && bufferFull() ) {
00723 notifyReadyRead();
00724 if ( !cancel && bufferFull() ) {
00725 qDebug( "%p: Reader::run: buffer is full, going to sleep", this );
00726 bufferNotFullCondition.wait( &mutex );
00727 }
00728 }
00729
00730 if ( cancel ) {
00731 qDebug( "%p: Reader::run: detected cancel", this );
00732 goto leave;
00733 }
00734
00735 if ( !eof && !error ) {
00736 if ( rptr == wptr )
00737 rptr = wptr = 0;
00738
00739 unsigned int numBytes = ( rptr + sizeof buffer - wptr - 1 ) % sizeof buffer;
00740 if ( numBytes > sizeof buffer - wptr )
00741 numBytes = sizeof buffer - wptr;
00742
00743 qDebug( "%p: Reader::run: rptr=%d, wptr=%d -> numBytes=%d", this, rptr, wptr, numBytes );
00744
00745 assert( numBytes > 0 );
00746
00747 qDebug( "%p: Reader::run: trying to read %d bytes", this, numBytes );
00748 #ifdef Q_OS_WIN32
00749 isReading = true;
00750 mutex.unlock();
00751 DWORD numRead;
00752 const bool ok = ReadFile( handle, buffer + wptr, numBytes, &numRead, 0 );
00753 mutex.lock();
00754 isReading = false;
00755 if ( ok ) {
00756 if ( numRead == 0 ) {
00757 qDebug( "%p: Reader::run: got eof (numRead==0)", this );
00758 eof = true;
00759 }
00760 } else {
00761 errorCode = static_cast<int>( GetLastError() );
00762 if ( errorCode == ERROR_BROKEN_PIPE ) {
00763 assert( numRead == 0 );
00764 qDebug( "%p: Reader::run: got eof (broken pipe)", this );
00765 eof = true;
00766 } else {
00767 assert( numRead == 0 );
00768 qDebug( "%p: Reader::run: got error: %s (%d)", this, strerror( errorCode ), errorCode );
00769 error = true;
00770 }
00771 }
00772 #else
00773 qint64 numRead;
00774 mutex.unlock();
00775 do {
00776 numRead = ::read( fd, buffer + wptr, numBytes );
00777 } while ( numRead == -1 && errno == EINTR );
00778 mutex.lock();
00779
00780 if ( numRead < 0 ) {
00781 errorCode = errno;
00782 error = true;
00783 qDebug( "%p: Reader::run: got error: %d", this, errorCode );
00784 } else if ( numRead == 0 ) {
00785 qDebug( "%p: Reader::run: eof detected", this );
00786 eof = true;
00787 }
00788 #endif
00789 qDebug( "%p (fd=%d): Reader::run: read %ld bytes", this, fd, static_cast<long>(numRead) );
00790 qDebug( "%p (fd=%d): Reader::run: %s", this, fd, buffer );
00791
00792 if ( numRead > 0 ) {
00793 qDebug( "%p: Reader::run: buffer before: rptr=%4d, wptr=%4d", this, rptr, wptr );
00794 wptr = ( wptr + numRead ) % sizeof buffer;
00795 qDebug( "%p: Reader::run: buffer after: rptr=%4d, wptr=%4d", this, rptr, wptr );
00796 }
00797 }
00798 }
00799 leave:
00800 qDebug( "%p: Reader::run: terminated", this );
00801 }
00802
00803 void Reader::notifyReadyRead()
00804 {
00805 qDebug( "notifyReadyRead: %d bytes available", bytesInBuffer() );
00806 assert( !cancel );
00807
00808 if ( consumerBlocksOnUs ) {
00809 bufferNotEmptyCondition.wakeAll();
00810 blockedConsumerIsDoneCondition.wait( &mutex );
00811 return;
00812 }
00813 qDebug( "notifyReadyRead: emit signal" );
00814 emit readyRead();
00815 readyReadSentCondition.wait( &mutex );
00816 qDebug( "notifyReadyRead: returning from waiting, leave" );
00817 }
00818
00819 void Writer::run() {
00820
00821 LOCKED( this );
00822
00823
00824 hasStarted.wakeAll();
00825
00826 qDebug( "%p: Writer::run: started", this );
00827
00828 while ( true ) {
00829
00830 while ( !cancel && bufferEmpty() ) {
00831 qDebug( "%p: Writer::run: buffer is empty, wake bufferEmptyCond listeners", this );
00832 bufferEmptyCondition.wakeAll();
00833 emit bytesWritten( 0 );
00834 qDebug( "%p: Writer::run: buffer is empty, going to sleep", this );
00835 bufferNotEmptyCondition.wait( &mutex );
00836 qDebug( "%p: Writer::run: woke up", this );
00837 }
00838
00839 if ( cancel ) {
00840 qDebug( "%p: Writer::run: detected cancel", this );
00841 goto leave;
00842 }
00843
00844 assert( numBytesInBuffer > 0 );
00845
00846 qDebug( "%p: Writer::run: Trying to write %u bytes", this, numBytesInBuffer );
00847 qint64 totalWritten = 0;
00848 do {
00849 mutex.unlock();
00850 #ifdef Q_OS_WIN32
00851 DWORD numWritten;
00852 qDebug( "%p (fd=%d): Writer::run: buffer before WriteFile (numBytes=%lld): %s:", this, fd, numBytesInBuffer, buffer );
00853 qDebug( "%p (fd=%d): Writer::run: Going into WriteFile", this, fd );
00854 if ( !WriteFile( handle, buffer + totalWritten, numBytesInBuffer - totalWritten, &numWritten, 0 ) ) {
00855 mutex.lock();
00856 errorCode = static_cast<int>( GetLastError() );
00857 qDebug( "%p: Writer::run: got error code: %d", this, errorCode );
00858 error = true;
00859 goto leave;
00860 }
00861 #else
00862 qint64 numWritten;
00863 do {
00864 numWritten = ::write( fd, buffer + totalWritten, numBytesInBuffer - totalWritten );
00865 } while ( numWritten == -1 && errno == EINTR );
00866
00867 if ( numWritten < 0 ) {
00868 mutex.lock();
00869 errorCode = errno;
00870 qDebug( "%p: Writer::run: got error code: %s (%d)", this, strerror( errorCode ), errorCode );
00871 error = true;
00872 goto leave;
00873 }
00874 #endif
00875 qDebug( "%p (fd=%d): Writer::run: buffer after WriteFile (numBytes=%u): %s:", this, fd, numBytesInBuffer, buffer );
00876 totalWritten += numWritten;
00877 mutex.lock();
00878 } while ( totalWritten < numBytesInBuffer );
00879
00880 qDebug( "%p: Writer::run: wrote %lld bytes", this, totalWritten );
00881
00882 numBytesInBuffer = 0;
00883
00884 qDebug( "%p: Writer::run: buffer is empty, wake bufferEmptyCond listeners", this );
00885 bufferEmptyCondition.wakeAll();
00886 emit bytesWritten( totalWritten );
00887 }
00888 leave:
00889 qDebug( "%p: Writer::run: terminating", this );
00890 numBytesInBuffer = 0;
00891 qDebug( "%p: Writer::run: buffer is empty, wake bufferEmptyCond listeners", this );
00892 bufferEmptyCondition.wakeAll();
00893 emit bytesWritten( 0 );
00894 }
00895
00896
00897 std::pair<KDPipeIODevice*,KDPipeIODevice*> KDPipeIODevice::makePairOfConnectedPipes() {
00898 KDPipeIODevice * read = 0;
00899 KDPipeIODevice * write = 0;
00900 #ifdef Q_OS_WIN32
00901 HANDLE rh;
00902 HANDLE wh;
00903 SECURITY_ATTRIBUTES sa;
00904 memset( &sa, 0, sizeof(sa) );
00905 sa.nLength = sizeof(sa);
00906 sa.bInheritHandle = TRUE;
00907 if ( CreatePipe( &rh, &wh, &sa, BUFFER_SIZE ) ) {
00908 read = new KDPipeIODevice;
00909 read->open( rh, ReadOnly );
00910 write = new KDPipeIODevice;
00911 write->open( wh, WriteOnly );
00912 }
00913 #else
00914 int fds[2];
00915 if ( pipe( fds ) == 0 ) {
00916 read = new KDPipeIODevice;
00917 read->open( fds[0], ReadOnly );
00918 write = new KDPipeIODevice;
00919 write->open( fds[1], WriteOnly );
00920 }
00921 #endif
00922 return std::make_pair( read, write );
00923 }
00924
00925 #ifdef KDAB_DEFINE_CHECKS
00926 KDAB_DEFINE_CHECKS( KDPipeIODevice ) {
00927 if ( !isOpen() ) {
00928 assert( openMode() == NotOpen );
00929 assert( !d->reader );
00930 assert( !d->writer );
00931 #ifdef Q_OS_WIN32
00932 assert( !d->handle );
00933 #else
00934 assert( d->fd < 0 );
00935 #endif
00936 } else {
00937 assert( openMode() != NotOpen );
00938 assert( openMode() & ReadWrite );
00939 if ( openMode() & ReadOnly ) {
00940 assert( d->reader );
00941 synchronized( d->reader )
00942 assert( d->reader->eof || d->reader->error || d->reader->isRunning() );
00943 }
00944 if ( openMode() & WriteOnly ) {
00945 assert( d->writer );
00946 synchronized( d->writer )
00947 assert( d->writer->error || d->writer->isRunning() );
00948 }
00949 #ifdef Q_OS_WIN32
00950 assert( d->handle );
00951 #else
00952 assert( d->fd >= 0 );
00953 #endif
00954 }
00955 }
00956 #endif // KDAB_DEFINE_CHECKS
00957
00958 #include "moc_kdpipeiodevice.cpp"
00959 #include "kdpipeiodevice.moc"