20 #include <config-kleopatra.h>
24 #include <QtCore/QDebug>
25 #include <QtCore/QMutex>
26 #include <QtCore/QPointer>
27 #include <QtCore/QThread>
28 #include <QtCore/QWaitCondition>
46 #ifndef KDAB_CHECK_THIS
47 # define KDAB_CHECK_CTOR (void)1
48 # define KDAB_CHECK_DTOR KDAB_CHECK_CTOR
49 # define KDAB_CHECK_THIS KDAB_CHECK_CTOR
52 #define LOCKED( d ) const QMutexLocker locker( &d->mutex )
53 #define synchronized( d ) if ( int i = 0 ) {} else for ( const QMutexLocker locker( &d->mutex ) ; !i ; ++i )
62 #define qDebug if( s_debugLevel == KDPipeIODevice::NoDebug ){}else qDebug
66 class Reader :
public QThread {
69 Reader(
int fd, Qt::HANDLE handle );
72 qint64 readData(
char * data, qint64 maxSize );
74 unsigned int bytesInBuffer()
const {
75 return ( wptr +
sizeof buffer - rptr ) %
sizeof buffer ;
78 bool bufferFull()
const {
79 return bytesInBuffer() ==
sizeof buffer - 1;
82 bool bufferEmpty()
const {
83 return bytesInBuffer() == 0;
86 bool bufferContains(
char ch ) {
87 const unsigned int bib = bytesInBuffer();
88 for (
unsigned int i = rptr ; i < rptr + bib ; ++i )
89 if ( buffer[i%
sizeof buffer] == ch )
94 void notifyReadyRead();
107 QWaitCondition waitForCancelCondition;
108 QWaitCondition bufferNotFullCondition;
109 QWaitCondition bufferNotEmptyCondition;
110 QWaitCondition hasStarted;
111 QWaitCondition readyReadSentCondition;
112 QWaitCondition blockedConsumerIsDoneCondition;
119 bool consumerBlocksOnUs;
122 unsigned int rptr, wptr;
127 Reader::Reader(
int fd_, Qt::HANDLE handle_ ) : QThread(),
131 bufferNotFullCondition(),
132 bufferNotEmptyCondition(),
137 eofShortCut(
false ),
140 consumerBlocksOnUs(
false ),
150 class Writer :
public QThread {
153 Writer(
int fd, Qt::HANDLE handle );
156 qint64 writeData(
const char * data, qint64 size );
158 unsigned int bytesInBuffer()
const {
return numBytesInBuffer; }
160 bool bufferFull()
const {
161 return numBytesInBuffer ==
sizeof buffer;
164 bool bufferEmpty()
const {
165 return numBytesInBuffer == 0;
169 void bytesWritten( qint64 );
179 QWaitCondition bufferEmptyCondition;
180 QWaitCondition bufferNotEmptyCondition;
181 QWaitCondition hasStarted;
186 unsigned int numBytesInBuffer;
191 Writer::Writer(
int fd_, Qt::HANDLE handle_ ) : QThread(),
195 bufferEmptyCondition(),
196 bufferNotEmptyCondition(),
201 numBytesInBuffer( 0 )
209 class KDPipeIODevice::Private :
public QObject {
211 friend class ::KDPipeIODevice;
218 bool doOpen(
int, Qt::HANDLE, OpenMode );
219 bool startReaderThread();
220 bool startWriterThread();
224 void emitReadyRead();
231 bool triedToStartReader;
232 bool triedToStartWriter;
242 s_debugLevel = level;
245 KDPipeIODevice::Private::Private(
KDPipeIODevice * qq ) : QObject( qq ),
q( qq ),
250 triedToStartReader( false ),
251 triedToStartWriter( false )
256 KDPipeIODevice::Private::~Private() {
257 qDebug(
"KDPipeIODevice::~Private(): Destroying %p", (
void* )
q );
277 open( handle, mode );
289 return d->doOpen( fd, (HANDLE)_get_osfhandle( fd ), mode );
291 return d->doOpen( fd, 0, mode );
297 return d->doOpen( -1, h, mode );
301 assert( !
"KDPipeIODevice::open( Qt::HANDLE, OpenMode ) should never be called except on Windows." );
306 bool KDPipeIODevice::Private::startReaderThread()
308 if ( triedToStartReader )
310 triedToStartReader =
true;
311 if ( reader && !reader->isRunning() && !reader->isFinished() ) {
312 qDebug(
"KDPipeIODevice::Private::startReaderThread(): locking reader (CONSUMER THREAD)" );
314 qDebug(
"KDPipeIODevice::Private::startReaderThread(): locked reader (CONSUMER THREAD)" );
315 reader->start( QThread::HighestPriority );
316 qDebug(
"KDPipeIODevice::Private::startReaderThread(): waiting for hasStarted (CONSUMER THREAD)" );
317 const bool hasStarted = reader->hasStarted.wait( &reader->mutex, 1000 );
318 qDebug(
"KDPipeIODevice::Private::startReaderThread(): returned from hasStarted (CONSUMER THREAD)" );
325 bool KDPipeIODevice::Private::startWriterThread()
327 if ( triedToStartWriter )
329 triedToStartWriter =
true;
330 if ( writer && !writer->isRunning() && !writer->isFinished() ) {
333 writer->start( QThread::HighestPriority );
334 if ( !writer->hasStarted.wait( &writer->mutex, 1000 ) )
340 void KDPipeIODevice::Private::emitReadyRead()
342 QPointer<Private> thisPointer(
this );
343 qDebug(
"KDPipeIODevice::Private::emitReadyRead %p", (
void* )
this );
350 qDebug(
"KDPipeIODevice::Private::emitReadyRead %p: locking reader (CONSUMER THREAD)", (
352 synchronized( reader ) {
353 qDebug(
"KDPipeIODevice::Private::emitReadyRead %p: locked reader (CONSUMER THREAD)", (
355 reader->readyReadSentCondition.wakeAll();
356 qDebug(
"KDPipeIODevice::Private::emitReadyRead %p: buffer empty: %d reader in ReadFile: %d", (
void* )
this, reader->bufferEmpty(), reader->isReading );
359 qDebug(
"KDPipeIODevice::Private::emitReadyRead %p leaving", (
void* )
this );
363 bool KDPipeIODevice::Private::doOpen(
int fd_, Qt::HANDLE handle_, OpenMode mode_ ) {
376 if ( !(mode_ & ReadWrite) )
380 std::auto_ptr<Reader> reader_;
381 std::auto_ptr<Writer> writer_;
383 if ( mode_ & ReadOnly ) {
384 reader_.reset(
new Reader( fd_, handle_ ) );
385 qDebug(
"KDPipeIODevice::doOpen (%p): created reader (%p) for fd %d", (
void * )
this,
386 (
void* )reader_.get(), fd_ );
387 connect( reader_.get(), SIGNAL(readyRead()),
this, SLOT(emitReadyRead()),
388 Qt::QueuedConnection );
390 if ( mode_ & WriteOnly ) {
391 writer_.reset(
new Writer( fd_, handle_ ) );
392 qDebug(
"KDPipeIODevice::doOpen (%p): created writer (%p) for fd %d",
393 (
void * )
this, (
void* )writer_.get(), fd_ );
394 connect( writer_.get(), SIGNAL(bytesWritten(qint64)),
q, SIGNAL(bytesWritten(qint64)),
395 Qt::QueuedConnection );
401 reader = reader_.release();
402 writer = writer_.release();
404 q->setOpenMode( mode_|Unbuffered );
418 const qint64 base = QIODevice::bytesAvailable();
419 if ( !d->triedToStartReader ) {
420 d->startReaderThread();
424 synchronized( d->reader ) {
425 const qint64 inBuffer = d->reader->bytesInBuffer();
426 return base + inBuffer;
433 d->startWriterThread();
434 const qint64 base = QIODevice::bytesToWrite();
436 synchronized( d->writer )
return base + d->writer->bytesInBuffer();
442 d->startReaderThread();
443 if ( QIODevice::canReadLine() )
446 synchronized( d->reader )
return d->reader->bufferContains(
'\n' );
456 d->startReaderThread();
457 if ( !QIODevice::atEnd() ) {
458 qDebug(
"%p: KDPipeIODevice::atEnd returns false since QIODevice::atEnd does (with bytesAvailable=%ld)", (
void * )
this, static_cast<long>(
bytesAvailable()) );
463 if ( d->reader->eofShortCut )
466 const bool eof = ( d->reader->error || d->reader->eof ) && d->reader->bufferEmpty();
468 if ( !d->reader->error && !d->reader->eof ) {
469 qDebug(
"%p: KDPipeIODevice::atEnd returns false since !reader->error && !reader->eof",
472 if ( !d->reader->bufferEmpty() ) {
473 qDebug(
"%p: KDPipeIODevice::atEnd returns false since !reader->bufferEmpty()",
481 d->startWriterThread();
482 Writer *
const w = d->writer;
486 qDebug(
"KDPipeIODevice::waitForBytesWritten (%p,w=%p): entered locked area",
487 (
void* )
this, (
void* ) w );
488 return w->bufferEmpty() || w->error || w->bufferEmptyCondition.wait( &w->mutex, msecs ) ;
492 qDebug(
"KDPipeIODEvice::waitForReadyRead()(%p)", (
void* )
this);
493 d->startReaderThread();
498 Reader *
const r = d->reader;
499 if ( !r || r->eofShortCut )
502 if ( r->bytesInBuffer() != 0 || r->eof || r->error )
505 return r->bufferNotEmptyCondition.wait( &r->mutex, msecs ) ;
508 template <
typename T>
509 class TemporaryValue {
511 TemporaryValue( T& var_,
const T& tv ) : var( var_ ), oldValue( var_ ) { var = tv; }
512 ~TemporaryValue() { var = oldValue; }
521 d->startReaderThread();
523 return d->reader->bufferEmpty() && !d->reader->eof && !d->reader->error;
528 d->startWriterThread();
530 return !d->writer->bufferEmpty() && !d->writer->error;
535 qDebug(
"%p: KDPipeIODevice::readData: data=%p, maxSize=%lld", (
void* )
this, data, maxSize );
536 d->startReaderThread();
537 Reader *
const r = d->reader;
543 assert( data || maxSize == 0 );
544 assert( maxSize >= 0 );
546 if ( r->eofShortCut ) {
547 qDebug(
"%p: KDPipeIODevice::readData: hit eofShortCut, returning 0", (
void* )
this );
558 qDebug(
"%p: KDPipeIODevice::readData: try to lock reader (CONSUMER THREAD)", (
void* )
this );
560 qDebug(
"%p: KDPipeIODevice::readData: locked reader (CONSUMER THREAD)", (
void* )
this );
562 r->readyReadSentCondition.wakeAll();
563 if ( r->bufferEmpty() && !r->error && !r->eof ) {
564 qDebug(
"%p: KDPipeIODevice::readData: waiting for bufferNotEmptyCondition (CONSUMER THREAD)", (
void*)
this );
565 const TemporaryValue<bool> tmp( d->reader->consumerBlocksOnUs,
true );
566 r->bufferNotEmptyCondition.wait( &r->mutex );
567 r->blockedConsumerIsDoneCondition.wakeAll();
568 qDebug(
"%p: KDPipeIODevice::readData: woke up from bufferNotEmptyCondition (CONSUMER THREAD)",
572 if ( r->bufferEmpty() ) {
573 qDebug(
"%p: KDPipeIODevice::readData: got empty buffer, signal eof", (
void* )
this );
575 assert( r->eof || r->error );
576 r->eofShortCut =
true;
577 return r->eof ? 0 : -1 ;
580 qDebug(
"%p: KDPipeIODevice::readData: got bufferNotEmptyCondition, trying to read %lld bytes",
581 (
void* )
this, maxSize );
582 const qint64 bytesRead = r->readData( data, maxSize );
583 qDebug(
"%p: KDPipeIODevice::readData: read %lld bytes", (
void* )
this, bytesRead );
584 qDebug(
"%p (fd=%d): KDPipeIODevice::readData: %s", (
void* )
this, d->fd, data );
589 qint64 Reader::readData(
char * data, qint64 maxSize ) {
590 qint64 numRead = rptr < wptr ? wptr - rptr :
sizeof buffer - rptr ;
591 if ( numRead > maxSize )
594 qDebug(
"%p: KDPipeIODevice::readData: data=%p, maxSize=%lld; rptr=%u, wptr=%u (bytesInBuffer=%u); -> numRead=%lld",
595 (
void* )
this, data, maxSize, rptr, wptr, bytesInBuffer(), numRead );
597 memcpy( data, buffer + rptr, numRead );
599 rptr = ( rptr + numRead ) %
sizeof buffer ;
601 if ( !bufferFull() ) {
602 qDebug(
"%p: KDPipeIODevice::readData: signal bufferNotFullCondition", (
void* )
this );
603 bufferNotFullCondition.wakeAll();
610 d->startWriterThread();
611 Writer *
const w = d->writer;
614 assert( w->error || w->isRunning() );
615 assert( data || size == 0 );
620 while ( !w->error && !w->bufferEmpty() ) {
621 qDebug(
"%p: KDPipeIODevice::writeData: wait for empty buffer", (
void* )
this );
622 w->bufferEmptyCondition.wait( &w->mutex );
623 qDebug(
"%p: KDPipeIODevice::writeData: empty buffer signaled", (
void* )
this );
629 assert( w->bufferEmpty() );
631 return w->writeData( data, size );
634 qint64 Writer::writeData(
const char * data, qint64 size ) {
635 assert( bufferEmpty() );
637 if ( size > static_cast<qint64>(
sizeof buffer ) )
638 size =
sizeof buffer;
640 memcpy( buffer, data, size );
642 numBytesInBuffer = size;
644 if ( !bufferEmpty() ) {
645 bufferNotEmptyCondition.wakeAll();
650 void KDPipeIODevice::Private::stopThreads()
652 if ( triedToStartWriter )
654 if ( writer &&
q->bytesToWrite() > 0 )
655 q->waitForBytesWritten( -1 );
657 assert(
q->bytesToWrite() == 0 );
659 if ( Reader * & r = reader ) {
660 disconnect( r, SIGNAL(readyRead()),
this, SLOT(emitReadyRead()) );
665 r->waitForCancelCondition.wakeAll();
666 r->bufferNotFullCondition.wakeAll();
667 r->readyReadSentCondition.wakeAll();
670 if ( Writer * & w = writer ) {
675 w->bufferNotEmptyCondition.wakeAll();
681 qDebug(
"KDPipeIODevice::close(%p)", (
void* )
this );
689 #define waitAndDelete( t ) if ( t ) { t->wait(); QThread* const t2 = t; t = 0; delete t2; }
690 qDebug(
"KPipeIODevice::close(%p): wait and closing writer %p", (
void* )
this, (
void* ) d->writer );
692 qDebug(
"KPipeIODevice::close(%p): wait and closing reader %p", (
void* )
this, (
void* ) d->reader );
695 d->reader->readyReadSentCondition.wakeAll();
703 CloseHandle( d->handle );
708 setOpenMode( NotOpen );
718 hasStarted.wakeAll();
720 qDebug(
"%p: Reader::run: started",(
void* )
this );
723 if ( !cancel && ( eof || error ) ) {
727 const bool wasEmpty = bufferEmpty();
728 qDebug(
"%p: Reader::run: received eof(%d) or error(%d), waking everyone", (
void* )
this, eof, error );
730 if ( !cancel && wasEmpty )
731 waitForCancelCondition.wait( &
mutex );
732 }
else if ( !cancel && !bufferFull() && !bufferEmpty() ) {
733 qDebug(
"%p: Reader::run: buffer no longer empty, waking everyone", (
void* )
this );
737 while ( !cancel && !error && bufferFull() ) {
739 if ( !cancel && bufferFull() ) {
740 qDebug(
"%p: Reader::run: buffer is full, going to sleep", (
void* )
this );
741 bufferNotFullCondition.wait( &
mutex );
746 qDebug(
"%p: Reader::run: detected cancel", (
void* )
this );
750 if ( !eof && !error ) {
754 unsigned int numBytes = ( rptr +
sizeof buffer - wptr - 1 ) %
sizeof buffer;
755 if ( numBytes >
sizeof buffer - wptr )
756 numBytes =
sizeof buffer - wptr;
758 qDebug(
"%p: Reader::run: rptr=%d, wptr=%d -> numBytes=%d", (
void* )
this, rptr, wptr, numBytes );
760 assert( numBytes > 0 );
762 qDebug(
"%p: Reader::run: trying to read %d bytes from fd %d", (
void* )
this, numBytes, fd );
767 const bool ok = ReadFile( handle, buffer + wptr, numBytes, &numRead, 0 );
771 if ( numRead == 0 ) {
772 qDebug(
"%p: Reader::run: got eof (numRead==0)", (
void* )
this );
776 errorCode =
static_cast<int>( GetLastError() );
777 if ( errorCode == ERROR_BROKEN_PIPE ) {
778 assert( numRead == 0 );
779 qDebug(
"%p: Reader::run: got eof (broken pipe)", (
void* )
this );
782 assert( numRead == 0 );
783 qDebug(
"%p: Reader::run: got error: %s (%d)", (
void* )
this, strerror( errorCode ), errorCode );
791 numRead = ::read( fd, buffer + wptr, numBytes );
792 }
while ( numRead == -1 && errno == EINTR );
798 qDebug(
"%p: Reader::run: got error: %d", (
void* )
this, errorCode );
799 }
else if ( numRead == 0 ) {
800 qDebug(
"%p: Reader::run: eof detected", (
void* )
this );
804 qDebug(
"%p (fd=%d): Reader::run: read %ld bytes", (
void* )
this, fd, static_cast<long>(numRead) );
805 qDebug(
"%p (fd=%d): Reader::run: %s", (
void* )
this, fd, buffer );
808 qDebug(
"%p: Reader::run: buffer before: rptr=%4d, wptr=%4d", (
void* )
this, rptr, wptr );
809 wptr = ( wptr + numRead ) %
sizeof buffer;
810 qDebug(
"%p: Reader::run: buffer after: rptr=%4d, wptr=%4d", (
void* )
this, rptr, wptr );
815 qDebug(
"%p: Reader::run: terminated", (
void* )
this );
818 void Reader::notifyReadyRead()
820 qDebug(
"notifyReadyRead: %d bytes available", bytesInBuffer() );
823 if ( consumerBlocksOnUs ) {
824 bufferNotEmptyCondition.wakeAll();
825 blockedConsumerIsDoneCondition.wait( &
mutex );
828 qDebug(
"notifyReadyRead: emit signal" );
830 readyReadSentCondition.wait( &
mutex );
831 qDebug(
"notifyReadyRead: returning from waiting, leave" );
839 hasStarted.wakeAll();
841 qDebug( ) <<
this <<
"Writer::run: started";
845 while ( !cancel && bufferEmpty() ) {
846 qDebug( ) <<
this <<
"Writer::run: buffer is empty, wake bufferEmptyCond listeners";
847 bufferEmptyCondition.wakeAll();
848 emit bytesWritten( 0 );
849 qDebug( ) <<
this <<
"Writer::run: buffer is empty, going to sleep";
850 bufferNotEmptyCondition.wait( &
mutex );
851 qDebug( ) <<
this <<
"Writer::run: woke up";
855 qDebug( ) <<
this <<
"Writer::run: detected cancel";
859 assert( numBytesInBuffer > 0 );
861 qDebug( ) <<
this <<
"Writer::run: Trying to write " << numBytesInBuffer <<
"bytes";
862 qint64 totalWritten = 0;
867 qDebug(
"%p (fd=%d): Writer::run: buffer before WriteFile (numBytes=%lld): %s:",
868 (
void*)
this, fd, numBytesInBuffer, buffer );
869 qDebug(
"%p (fd=%d): Writer::run: Going into WriteFile", (
void* )
this, fd );
870 if ( !WriteFile( handle, buffer + totalWritten, numBytesInBuffer - totalWritten, &numWritten, 0 ) ) {
872 errorCode =
static_cast<int>( GetLastError() );
873 qDebug(
"%p: Writer::run: got error code: %d", (
void* )
this, errorCode );
880 numWritten = ::write( fd, buffer + totalWritten, numBytesInBuffer - totalWritten );
881 }
while ( numWritten == -1 && errno == EINTR );
883 if ( numWritten < 0 ) {
886 qDebug(
"%p: Writer::run: got error code: %s (%d)", (
void* )
this, strerror( errorCode ), errorCode );
891 qDebug(
"%p (fd=%d): Writer::run: buffer after WriteFile (numBytes=%u): %s:", (
void* )
this, fd, numBytesInBuffer, buffer );
892 totalWritten += numWritten;
894 }
while ( totalWritten < numBytesInBuffer );
896 qDebug() <<
this <<
"Writer::run: wrote " << totalWritten <<
"bytes";
897 numBytesInBuffer = 0;
898 qDebug() <<
this <<
"Writer::run: buffer is empty, wake bufferEmptyCond listeners";
899 bufferEmptyCondition.wakeAll();
900 emit bytesWritten( totalWritten );
903 qDebug() <<
this <<
"Writer::run: terminating";
904 numBytesInBuffer = 0;
905 qDebug() <<
this <<
"Writer::run: buffer is empty, wake bufferEmptyCond listeners";
906 bufferEmptyCondition.wakeAll();
907 emit bytesWritten( 0 );
917 SECURITY_ATTRIBUTES sa;
918 memset( &sa, 0,
sizeof(sa) );
919 sa.nLength =
sizeof(sa);
920 sa.bInheritHandle = TRUE;
923 read->
open( rh, ReadOnly );
925 write->
open( wh, WriteOnly );
929 if ( pipe( fds ) == 0 ) {
931 read->
open( fds[0], ReadOnly );
933 write->
open( fds[1], WriteOnly );
936 return std::make_pair( read, write );
939 #ifdef KDAB_DEFINE_CHECKS
942 assert( openMode() == NotOpen );
943 assert( !
d->reader );
944 assert( !
d->writer );
946 assert( !
d->handle );
951 assert( openMode() != NotOpen );
952 assert( openMode() & ReadWrite );
953 if ( openMode() & ReadOnly ) {
955 synchronized(
d->reader )
956 assert(
d->reader->eof ||
d->reader->error ||
d->reader->isRunning() );
958 if ( openMode() & WriteOnly ) {
960 synchronized(
d->writer )
961 assert(
d->writer->error ||
d->writer->isRunning() );
966 assert(
d->fd >= 0 );
970 #endif // KDAB_DEFINE_CHECKS
972 #include "moc_kdpipeiodevice.cpp"
973 #include "kdpipeiodevice.moc"
bool waitForBytesWritten(int msecs)
qint64 writeData(const char *data, qint64 maxSize)
qint64 bytesToWrite() const
static DebugLevel debugLevel()
bool readWouldBlock() const
const unsigned int BUFFER_SIZE
bool waitForReadyRead(int msecs)
static std::pair< KDPipeIODevice *, KDPipeIODevice * > makePairOfConnectedPipes()
qint64 readData(char *data, qint64 maxSize)
static void setDebugLevel(DebugLevel level)
bool isSequential() const
bool writeWouldBlock() const
Qt::HANDLE handle() const
bool open(int fd, OpenMode mode=ReadOnly)
#define KDAB_DEFINE_CHECKS(Class)
qint64 bytesAvailable() const
KDPipeIODevice(QObject *parent=0)
const bool ALLOW_QIODEVICE_BUFFERING