• Skip to content
  • Skip to link menu
KDE API Reference
  • KDE API Reference
  • kdepim API Reference
  • KDE Home
  • Contact Us
 

kleopatra

  • sources
  • kde-4.12
  • kdepim
  • kleopatra
  • utils
kdpipeiodevice.cpp
Go to the documentation of this file.
1 /*
2  Copyright (C) 2007 Klarälvdalens Datakonsult AB
3 
4  KDPipeIODevice is free software; you can redistribute it and/or
5  modify it under the terms of the GNU Library General Public
6  License as published by the Free Software Foundation; either
7  version 2 of the License, or (at your option) any later version.
8 
9  KDPipeIODevice is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU Library General Public License for more details.
13 
14  You should have received a copy of the GNU Library General Public License
15  along with KDPipeIODevice; see the file COPYING.LIB. If not, write to the
16  Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
17  Boston, MA 02110-1301, USA.
18  */
19 
20 #include <config-kleopatra.h>
21 
22 #include "kdpipeiodevice.h"
23 
24 #include <QtCore/QDebug>
25 #include <QtCore/QMutex>
26 #include <QtCore/QPointer>
27 #include <QtCore/QThread>
28 #include <QtCore/QWaitCondition>
29 
30 #include <cassert>
31 #include <cstring>
32 #include <memory>
33 #include <algorithm>
34 
35 #ifdef Q_OS_WIN32
36 # ifndef NOMINMAX
37 # define NOMINMAX
38 # endif
39 # include <windows.h>
40 # include <io.h>
41 #else
42 # include <unistd.h>
43 # include <errno.h>
44 #endif
45 
46 #ifndef KDAB_CHECK_THIS
47 # define KDAB_CHECK_CTOR (void)1
48 # define KDAB_CHECK_DTOR KDAB_CHECK_CTOR
49 # define KDAB_CHECK_THIS KDAB_CHECK_CTOR
50 #endif
51 
52 #define LOCKED( d ) const QMutexLocker locker( &d->mutex )
53 #define synchronized( d ) if ( int i = 0 ) {} else for ( const QMutexLocker locker( &d->mutex ) ; !i ; ++i )
54 
55 const unsigned int BUFFER_SIZE = 4096;
56 const bool ALLOW_QIODEVICE_BUFFERING = true;
57 
58 namespace {
59  KDPipeIODevice::DebugLevel s_debugLevel = KDPipeIODevice::NoDebug;
60 }
61 
62 #define qDebug if( s_debugLevel == KDPipeIODevice::NoDebug ){}else qDebug
63 
64 namespace {
65 
66 class Reader : public QThread {
67  Q_OBJECT
68 public:
69  Reader( int fd, Qt::HANDLE handle );
70  ~Reader();
71 
72  qint64 readData( char * data, qint64 maxSize );
73 
74  unsigned int bytesInBuffer() const {
75  return ( wptr + sizeof buffer - rptr ) % sizeof buffer ;
76  }
77 
78  bool bufferFull() const {
79  return bytesInBuffer() == sizeof buffer - 1;
80  }
81 
82  bool bufferEmpty() const {
83  return bytesInBuffer() == 0;
84  }
85 
86  bool bufferContains( char ch ) {
87  const unsigned int bib = bytesInBuffer();
88  for ( unsigned int i = rptr ; i < rptr + bib ; ++i )
89  if ( buffer[i%sizeof buffer] == ch )
90  return true;
91  return false;
92  }
93 
94  void notifyReadyRead();
95 
96  Q_SIGNALS:
97  void readyRead();
98 
99 protected:
100  /* reimp */ void run();
101 
102 private:
103  int fd;
104  Qt::HANDLE handle;
105 public:
106  QMutex mutex;
107  QWaitCondition waitForCancelCondition;
108  QWaitCondition bufferNotFullCondition;
109  QWaitCondition bufferNotEmptyCondition;
110  QWaitCondition hasStarted;
111  QWaitCondition readyReadSentCondition;
112  QWaitCondition blockedConsumerIsDoneCondition;
113  bool cancel;
114  bool eof;
115  bool error;
116  bool eofShortCut;
117  int errorCode;
118  bool isReading;
119  bool consumerBlocksOnUs;
120 
121 private:
122  unsigned int rptr, wptr;
123  char buffer[BUFFER_SIZE+1]; // need to keep one byte free to detect empty state
124 };
125 
126 
127 Reader::Reader( int fd_, Qt::HANDLE handle_ ) : QThread(),
128  fd( fd_ ),
129  handle( handle_ ),
130  mutex(),
131  bufferNotFullCondition(),
132  bufferNotEmptyCondition(),
133  hasStarted(),
134  cancel( false ),
135  eof( false ),
136  error( false ),
137  eofShortCut( false ),
138  errorCode( 0 ),
139  isReading( false ),
140  consumerBlocksOnUs( false ),
141  rptr( 0 ),
142  wptr( 0 )
143 {
144 
145 }
146 
147 Reader::~Reader() {}
148 
149 
150 class Writer : public QThread {
151  Q_OBJECT
152 public:
153  Writer( int fd, Qt::HANDLE handle );
154  ~Writer();
155 
156  qint64 writeData( const char * data, qint64 size );
157 
158  unsigned int bytesInBuffer() const { return numBytesInBuffer; }
159 
160  bool bufferFull() const {
161  return numBytesInBuffer == sizeof buffer;
162  }
163 
164  bool bufferEmpty() const {
165  return numBytesInBuffer == 0;
166  }
167 
168  Q_SIGNALS:
169  void bytesWritten( qint64 );
170 
171 protected:
172  /* reimp */ void run();
173 
174 private:
175  int fd;
176  Qt::HANDLE handle;
177 public:
178  QMutex mutex;
179  QWaitCondition bufferEmptyCondition;
180  QWaitCondition bufferNotEmptyCondition;
181  QWaitCondition hasStarted;
182  bool cancel;
183  bool error;
184  int errorCode;
185 private:
186  unsigned int numBytesInBuffer;
187  char buffer[BUFFER_SIZE];
188 };
189 }
190 
191 Writer::Writer( int fd_, Qt::HANDLE handle_ ) : QThread(),
192  fd( fd_ ),
193  handle( handle_ ),
194  mutex(),
195  bufferEmptyCondition(),
196  bufferNotEmptyCondition(),
197  hasStarted(),
198  cancel( false ),
199  error( false ),
200  errorCode( 0 ),
201  numBytesInBuffer( 0 )
202 {
203 
204 }
205 
206 Writer::~Writer() {}
207 
208 
209 class KDPipeIODevice::Private : public QObject {
210  Q_OBJECT
211  friend class ::KDPipeIODevice;
212  KDPipeIODevice * const q;
213 public:
214 
215  explicit Private( KDPipeIODevice * qq );
216  ~Private();
217 
218  bool doOpen( int, Qt::HANDLE, OpenMode );
219  bool startReaderThread();
220  bool startWriterThread();
221  void stopThreads();
222 
223 public Q_SLOTS:
224  void emitReadyRead();
225 
226 private:
227  int fd;
228  Qt::HANDLE handle;
229  Reader * reader;
230  Writer * writer;
231  bool triedToStartReader;
232  bool triedToStartWriter;
233 };
234 
235 KDPipeIODevice::DebugLevel KDPipeIODevice::debugLevel()
236 {
237  return s_debugLevel;
238 }
239 
240 void KDPipeIODevice::setDebugLevel( KDPipeIODevice::DebugLevel level )
241 {
242  s_debugLevel = level;
243 }
244 
245 KDPipeIODevice::Private::Private( KDPipeIODevice * qq ) : QObject( qq ), q( qq ),
246  fd( -1 ),
247  handle( 0 ),
248  reader( 0 ),
249  writer( 0 ),
250  triedToStartReader( false ),
251  triedToStartWriter( false )
252 {
253 
254 }
255 
256 KDPipeIODevice::Private::~Private() {
257  qDebug( "KDPipeIODevice::~Private(): Destroying %p", ( void* ) q );
258 }
259 
260 KDPipeIODevice::KDPipeIODevice( QObject * p )
261 : QIODevice( p ), d( new Private( this ) )
262 {
263  KDAB_CHECK_CTOR;
264 }
265 
266 KDPipeIODevice::KDPipeIODevice( int fd, OpenMode mode, QObject * p )
267 : QIODevice( p ), d( new Private( this ) )
268 {
269  KDAB_CHECK_CTOR;
270  open( fd, mode );
271 }
272 
273 KDPipeIODevice::KDPipeIODevice( Qt::HANDLE handle, OpenMode mode, QObject * p )
274 : QIODevice( p ), d( new Private( this ) )
275 {
276  KDAB_CHECK_CTOR;
277  open( handle, mode );
278 }
279 
280 KDPipeIODevice::~KDPipeIODevice() { KDAB_CHECK_DTOR;
281  if ( isOpen() )
282  close();
283  delete d; d = 0;
284 }
285 
286 
287 bool KDPipeIODevice::open( int fd, OpenMode mode ) { KDAB_CHECK_THIS;
288  #ifdef Q_OS_WIN32
289  return d->doOpen( fd, (HANDLE)_get_osfhandle( fd ), mode );
290  #else
291  return d->doOpen( fd, 0, mode );
292  #endif
293 }
294 
295 bool KDPipeIODevice::open( Qt::HANDLE h, OpenMode mode ) { KDAB_CHECK_THIS;
296  #ifdef Q_OS_WIN32
297  return d->doOpen( -1, h, mode );
298  #else
299  Q_UNUSED( h );
300  Q_UNUSED( mode );
301  assert( !"KDPipeIODevice::open( Qt::HANDLE, OpenMode ) should never be called except on Windows." );
302  return false;
303  #endif
304 }
305 
306 bool KDPipeIODevice::Private::startReaderThread()
307 {
308  if ( triedToStartReader )
309  return true;
310  triedToStartReader = true;
311  if ( reader && !reader->isRunning() && !reader->isFinished() ) {
312  qDebug("KDPipeIODevice::Private::startReaderThread(): locking reader (CONSUMER THREAD)" );
313  LOCKED( reader );
314  qDebug("KDPipeIODevice::Private::startReaderThread(): locked reader (CONSUMER THREAD)" );
315  reader->start( QThread::HighestPriority );
316  qDebug("KDPipeIODevice::Private::startReaderThread(): waiting for hasStarted (CONSUMER THREAD)" );
317  const bool hasStarted = reader->hasStarted.wait( &reader->mutex, 1000 );
318  qDebug("KDPipeIODevice::Private::startReaderThread(): returned from hasStarted (CONSUMER THREAD)" );
319 
320  return hasStarted;
321  }
322  return true;
323 }
324 
325 bool KDPipeIODevice::Private::startWriterThread()
326 {
327  if ( triedToStartWriter )
328  return true;
329  triedToStartWriter = true;
330  if ( writer && !writer->isRunning() && !writer->isFinished() ) {
331  LOCKED( writer );
332 
333  writer->start( QThread::HighestPriority );
334  if ( !writer->hasStarted.wait( &writer->mutex, 1000 ) )
335  return false;
336  }
337  return true;
338 }
339 
340 void KDPipeIODevice::Private::emitReadyRead()
341 {
342  QPointer<Private> thisPointer( this );
343  qDebug( "KDPipeIODevice::Private::emitReadyRead %p", ( void* ) this );
344 
345  emit q->readyRead();
346 
347  if ( !thisPointer )
348  return;
349  if ( reader ) {
350  qDebug( "KDPipeIODevice::Private::emitReadyRead %p: locking reader (CONSUMER THREAD)", (
351  void* ) this );
352  synchronized( reader ) {
353  qDebug( "KDPipeIODevice::Private::emitReadyRead %p: locked reader (CONSUMER THREAD)", (
354  void* ) this );
355  reader->readyReadSentCondition.wakeAll();
356  qDebug( "KDPipeIODevice::Private::emitReadyRead %p: buffer empty: %d reader in ReadFile: %d", ( void* )this, reader->bufferEmpty(), reader->isReading );
357  }
358  }
359  qDebug( "KDPipeIODevice::Private::emitReadyRead %p leaving", ( void* ) this );
360 
361 }
362 
363 bool KDPipeIODevice::Private::doOpen( int fd_, Qt::HANDLE handle_, OpenMode mode_ ) {
364 
365  if ( q->isOpen() )
366  return false;
367 
368 #ifdef Q_OS_WIN32
369  if ( !handle_ )
370  return false;
371 #else
372  if ( fd_ < 0 )
373  return false;
374 #endif
375 
376  if ( !(mode_ & ReadWrite) )
377  return false; // need to have at least read -or- write
378 
379 
380  std::auto_ptr<Reader> reader_;
381  std::auto_ptr<Writer> writer_;
382 
383  if ( mode_ & ReadOnly ) {
384  reader_.reset( new Reader( fd_, handle_ ) );
385  qDebug( "KDPipeIODevice::doOpen (%p): created reader (%p) for fd %d", ( void * )this,
386  ( void* )reader_.get(), fd_ );
387  connect( reader_.get(), SIGNAL(readyRead()), this, SLOT(emitReadyRead()),
388  Qt::QueuedConnection );
389  }
390  if ( mode_ & WriteOnly ) {
391  writer_.reset( new Writer( fd_, handle_ ) );
392  qDebug( "KDPipeIODevice::doOpen (%p): created writer (%p) for fd %d",
393  ( void * )this, ( void* )writer_.get(), fd_ );
394  connect( writer_.get(), SIGNAL(bytesWritten(qint64)), q, SIGNAL(bytesWritten(qint64)),
395  Qt::QueuedConnection );
396  }
397 
398  // commit to *this:
399  fd = fd_;
400  handle = handle_;
401  reader = reader_.release();
402  writer = writer_.release();
403 
404  q->setOpenMode( mode_|Unbuffered );
405  return true;
406 }
407 
408 int KDPipeIODevice::descriptor() const { KDAB_CHECK_THIS;
409  return d->fd;
410 }
411 
412 
413 Qt::HANDLE KDPipeIODevice::handle() const { KDAB_CHECK_THIS;
414  return d->handle;
415 }
416 
417 qint64 KDPipeIODevice::bytesAvailable() const { KDAB_CHECK_THIS;
418  const qint64 base = QIODevice::bytesAvailable();
419  if ( !d->triedToStartReader ) {
420  d->startReaderThread();
421  return base;
422  }
423  if ( d->reader ) {
424  synchronized( d->reader ) {
425  const qint64 inBuffer = d->reader->bytesInBuffer();
426  return base + inBuffer;
427  }
428  }
429  return base;
430 }
431 
432 qint64 KDPipeIODevice::bytesToWrite() const { KDAB_CHECK_THIS;
433  d->startWriterThread();
434  const qint64 base = QIODevice::bytesToWrite();
435  if ( d->writer ) {
436  synchronized( d->writer ) return base + d->writer->bytesInBuffer();
437  }
438  return base;
439 }
440 
441 bool KDPipeIODevice::canReadLine() const { KDAB_CHECK_THIS;
442  d->startReaderThread();
443  if ( QIODevice::canReadLine() )
444  return true;
445  if ( d->reader ) {
446  synchronized( d->reader ) return d->reader->bufferContains( '\n' );
447  }
448  return true;
449 }
450 
451 bool KDPipeIODevice::isSequential() const {
452  return true;
453 }
454 
455 bool KDPipeIODevice::atEnd() const { KDAB_CHECK_THIS;
456  d->startReaderThread();
457  if ( !QIODevice::atEnd() ) {
458  qDebug( "%p: KDPipeIODevice::atEnd returns false since QIODevice::atEnd does (with bytesAvailable=%ld)", ( void * )this, static_cast<long>(bytesAvailable()) );
459  return false;
460  }
461  if ( !isOpen() )
462  return true;
463  if ( d->reader->eofShortCut )
464  return true;
465  LOCKED( d->reader );
466  const bool eof = ( d->reader->error || d->reader->eof ) && d->reader->bufferEmpty();
467  if ( !eof ) {
468  if ( !d->reader->error && !d->reader->eof ) {
469  qDebug( "%p: KDPipeIODevice::atEnd returns false since !reader->error && !reader->eof",
470  ( void* )( this ) );
471  }
472  if ( !d->reader->bufferEmpty() ) {
473  qDebug( "%p: KDPipeIODevice::atEnd returns false since !reader->bufferEmpty()",
474  ( void*) this );
475  }
476  }
477  return eof;
478 }
479 
480 bool KDPipeIODevice::waitForBytesWritten( int msecs ) { KDAB_CHECK_THIS;
481  d->startWriterThread();
482  Writer * const w = d->writer;
483  if ( !w )
484  return true;
485  LOCKED( w );
486  qDebug( "KDPipeIODevice::waitForBytesWritten (%p,w=%p): entered locked area",
487  ( void* )this, ( void* ) w );
488  return w->bufferEmpty() || w->error || w->bufferEmptyCondition.wait( &w->mutex, msecs ) ;
489  }
490 
491  bool KDPipeIODevice::waitForReadyRead( int msecs ) { KDAB_CHECK_THIS;
492  qDebug( "KDPipeIODEvice::waitForReadyRead()(%p)", ( void* ) this);
493  d->startReaderThread();
494  if ( ALLOW_QIODEVICE_BUFFERING ) {
495  if ( bytesAvailable() > 0 )
496  return true;
497  }
498  Reader * const r = d->reader;
499  if ( !r || r->eofShortCut )
500  return true;
501  LOCKED( r );
502  if ( r->bytesInBuffer() != 0 || r->eof || r->error )
503  return true;
504  assert( false ); // ### wtf?
505  return r->bufferNotEmptyCondition.wait( &r->mutex, msecs ) ;
506 }
507 
508 template <typename T>
509 class TemporaryValue {
510 public:
511  TemporaryValue( T& var_, const T& tv ) : var( var_ ), oldValue( var_ ) { var = tv; }
512  ~TemporaryValue() { var = oldValue; }
513 private:
514  T& var;
515  const T oldValue;
516 };
517 
518 
519 bool KDPipeIODevice::readWouldBlock() const
520 {
521  d->startReaderThread();
522  LOCKED( d->reader );
523  return d->reader->bufferEmpty() && !d->reader->eof && !d->reader->error;
524 }
525 
526 bool KDPipeIODevice::writeWouldBlock() const
527 {
528  d->startWriterThread();
529  LOCKED( d->writer );
530  return !d->writer->bufferEmpty() && !d->writer->error;
531 }
532 
533 
534 qint64 KDPipeIODevice::readData( char * data, qint64 maxSize ) { KDAB_CHECK_THIS;
535  qDebug( "%p: KDPipeIODevice::readData: data=%p, maxSize=%lld", ( void* )this, data, maxSize );
536  d->startReaderThread();
537  Reader * const r = d->reader;
538 
539  assert( r );
540 
541 
542  //assert( r->isRunning() ); // wrong (might be eof, error)
543  assert( data || maxSize == 0 );
544  assert( maxSize >= 0 );
545 
546  if ( r->eofShortCut ) {
547  qDebug( "%p: KDPipeIODevice::readData: hit eofShortCut, returning 0", ( void* )this );
548  return 0;
549  }
550 
551  if ( maxSize < 0 )
552  maxSize = 0;
553 
554  if ( ALLOW_QIODEVICE_BUFFERING ) {
555  if ( bytesAvailable() > 0 )
556  maxSize = std::min( maxSize, bytesAvailable() ); // don't block
557  }
558  qDebug( "%p: KDPipeIODevice::readData: try to lock reader (CONSUMER THREAD)", ( void* ) this );
559  LOCKED( r );
560  qDebug( "%p: KDPipeIODevice::readData: locked reader (CONSUMER THREAD)", ( void* ) this );
561 
562  r->readyReadSentCondition.wakeAll();
563  if ( /* maxSize > 0 && */ r->bufferEmpty() && !r->error && !r->eof ) { // ### block on maxSize == 0?
564  qDebug( "%p: KDPipeIODevice::readData: waiting for bufferNotEmptyCondition (CONSUMER THREAD)", ( void*) this );
565  const TemporaryValue<bool> tmp( d->reader->consumerBlocksOnUs, true );
566  r->bufferNotEmptyCondition.wait( &r->mutex );
567  r->blockedConsumerIsDoneCondition.wakeAll();
568  qDebug( "%p: KDPipeIODevice::readData: woke up from bufferNotEmptyCondition (CONSUMER THREAD)",
569  ( void*) this );
570  }
571 
572  if ( r->bufferEmpty() ) {
573  qDebug( "%p: KDPipeIODevice::readData: got empty buffer, signal eof", ( void* ) this );
574  // woken with an empty buffer must mean either EOF or error:
575  assert( r->eof || r->error );
576  r->eofShortCut = true;
577  return r->eof ? 0 : -1 ;
578  }
579 
580  qDebug( "%p: KDPipeIODevice::readData: got bufferNotEmptyCondition, trying to read %lld bytes",
581  ( void* )this, maxSize );
582  const qint64 bytesRead = r->readData( data, maxSize );
583  qDebug( "%p: KDPipeIODevice::readData: read %lld bytes", ( void* )this, bytesRead );
584  qDebug( "%p (fd=%d): KDPipeIODevice::readData: %s", ( void* )this, d->fd, data );
585 
586  return bytesRead;
587 }
588 
589 qint64 Reader::readData( char * data, qint64 maxSize ) {
590  qint64 numRead = rptr < wptr ? wptr - rptr : sizeof buffer - rptr ;
591  if ( numRead > maxSize )
592  numRead = maxSize;
593 
594  qDebug( "%p: KDPipeIODevice::readData: data=%p, maxSize=%lld; rptr=%u, wptr=%u (bytesInBuffer=%u); -> numRead=%lld",
595  ( void* )this, data, maxSize, rptr, wptr, bytesInBuffer(), numRead );
596 
597  memcpy( data, buffer + rptr, numRead );
598 
599  rptr = ( rptr + numRead ) % sizeof buffer ;
600 
601  if ( !bufferFull() ) {
602  qDebug( "%p: KDPipeIODevice::readData: signal bufferNotFullCondition", ( void* ) this );
603  bufferNotFullCondition.wakeAll();
604  }
605 
606  return numRead;
607 }
608 
609 qint64 KDPipeIODevice::writeData( const char * data, qint64 size ) { KDAB_CHECK_THIS;
610  d->startWriterThread();
611  Writer * const w = d->writer;
612 
613  assert( w );
614  assert( w->error || w->isRunning() );
615  assert( data || size == 0 );
616  assert( size >= 0 );
617 
618  LOCKED( w );
619 
620  while ( !w->error && !w->bufferEmpty() ) {
621  qDebug( "%p: KDPipeIODevice::writeData: wait for empty buffer", ( void* ) this );
622  w->bufferEmptyCondition.wait( &w->mutex );
623  qDebug( "%p: KDPipeIODevice::writeData: empty buffer signaled", ( void* ) this );
624 
625  }
626  if ( w->error )
627  return -1;
628 
629  assert( w->bufferEmpty() );
630 
631  return w->writeData( data, size );
632 }
633 
634 qint64 Writer::writeData( const char * data, qint64 size ) {
635  assert( bufferEmpty() );
636 
637  if ( size > static_cast<qint64>( sizeof buffer ) )
638  size = sizeof buffer;
639 
640  memcpy( buffer, data, size );
641 
642  numBytesInBuffer = size;
643 
644  if ( !bufferEmpty() ) {
645  bufferNotEmptyCondition.wakeAll();
646  }
647  return size;
648 }
649 
650 void KDPipeIODevice::Private::stopThreads()
651 {
652  if ( triedToStartWriter )
653  {
654  if ( writer && q->bytesToWrite() > 0 )
655  q->waitForBytesWritten( -1 );
656 
657  assert( q->bytesToWrite() == 0 );
658  }
659  if ( Reader * & r = reader ) {
660  disconnect( r, SIGNAL(readyRead()), this, SLOT(emitReadyRead()) );
661  synchronized( r ) {
662  // tell thread to cancel:
663  r->cancel = true;
664  // and wake it, so it can terminate:
665  r->waitForCancelCondition.wakeAll();
666  r->bufferNotFullCondition.wakeAll();
667  r->readyReadSentCondition.wakeAll();
668  }
669  }
670  if ( Writer * & w = writer ) {
671  synchronized( w ) {
672  // tell thread to cancel:
673  w->cancel = true;
674  // and wake it, so it can terminate:
675  w->bufferNotEmptyCondition.wakeAll();
676  }
677  }
678 }
679 
680 void KDPipeIODevice::close() { KDAB_CHECK_THIS;
681  qDebug( "KDPipeIODevice::close(%p)", ( void* ) this );
682  if ( !isOpen() )
683  return;
684 
685  // tell clients we're about to close:
686  emit aboutToClose();
687  d->stopThreads();
688 
689  #define waitAndDelete( t ) if ( t ) { t->wait(); QThread* const t2 = t; t = 0; delete t2; }
690  qDebug( "KPipeIODevice::close(%p): wait and closing writer %p", ( void* )this, ( void* ) d->writer );
691  waitAndDelete( d->writer );
692  qDebug( "KPipeIODevice::close(%p): wait and closing reader %p", ( void* )this, ( void* ) d->reader );
693  if ( d->reader ) {
694  LOCKED( d->reader );
695  d->reader->readyReadSentCondition.wakeAll();
696  }
697  waitAndDelete( d->reader );
698 #undef waitAndDelete
699 #ifdef Q_OS_WIN32
700  if ( d->fd != -1 )
701  _close( d->fd );
702 else
703  CloseHandle( d->handle );
704 #else
705  ::close( d->fd );
706 #endif
707 
708  setOpenMode( NotOpen );
709  d->fd = -1;
710  d->handle = 0;
711 }
712 
713 void Reader::run() {
714 
715  LOCKED( this );
716 
717  // too bad QThread doesn't have that itself; a signal isn't enough
718  hasStarted.wakeAll();
719 
720  qDebug( "%p: Reader::run: started",( void* ) this );
721 
722  while ( true ) {
723  if ( !cancel && ( eof || error ) ) {
724  //notify the client until the buffer is empty and then once
725  //again so he receives eof/error. After that, wait for him
726  //to cancel
727  const bool wasEmpty = bufferEmpty();
728  qDebug( "%p: Reader::run: received eof(%d) or error(%d), waking everyone", ( void* )this, eof, error );
729  notifyReadyRead();
730  if ( !cancel && wasEmpty )
731  waitForCancelCondition.wait( &mutex );
732  } else if ( !cancel && !bufferFull() && !bufferEmpty() ) {
733  qDebug( "%p: Reader::run: buffer no longer empty, waking everyone", ( void* ) this );
734  notifyReadyRead();
735  }
736 
737  while ( !cancel && !error && bufferFull() ) {
738  notifyReadyRead();
739  if ( !cancel && bufferFull() ) {
740  qDebug( "%p: Reader::run: buffer is full, going to sleep", ( void* )this );
741  bufferNotFullCondition.wait( &mutex );
742  }
743  }
744 
745  if ( cancel ) {
746  qDebug( "%p: Reader::run: detected cancel", ( void* )this );
747  goto leave;
748  }
749 
750  if ( !eof && !error ) {
751  if ( rptr == wptr ) // optimize for larger chunks in case the buffer is empty
752  rptr = wptr = 0;
753 
754  unsigned int numBytes = ( rptr + sizeof buffer - wptr - 1 ) % sizeof buffer;
755  if ( numBytes > sizeof buffer - wptr )
756  numBytes = sizeof buffer - wptr;
757 
758  qDebug( "%p: Reader::run: rptr=%d, wptr=%d -> numBytes=%d", ( void* )this, rptr, wptr, numBytes );
759 
760  assert( numBytes > 0 );
761 
762  qDebug( "%p: Reader::run: trying to read %d bytes from fd %d", ( void* )this, numBytes, fd );
763 #ifdef Q_OS_WIN32
764  isReading = true;
765  mutex.unlock();
766  DWORD numRead;
767  const bool ok = ReadFile( handle, buffer + wptr, numBytes, &numRead, 0 );
768  mutex.lock();
769  isReading = false;
770  if ( ok ) {
771  if ( numRead == 0 ) {
772  qDebug( "%p: Reader::run: got eof (numRead==0)", ( void* ) this );
773  eof = true;
774  }
775  } else { // !ok
776  errorCode = static_cast<int>( GetLastError() );
777  if ( errorCode == ERROR_BROKEN_PIPE ) {
778  assert( numRead == 0 );
779  qDebug( "%p: Reader::run: got eof (broken pipe)", ( void* ) this );
780  eof = true;
781  } else {
782  assert( numRead == 0 );
783  qDebug( "%p: Reader::run: got error: %s (%d)", ( void* ) this, strerror( errorCode ), errorCode );
784  error = true;
785  }
786  }
787 #else
788  qint64 numRead;
789  mutex.unlock();
790  do {
791  numRead = ::read( fd, buffer + wptr, numBytes );
792  } while ( numRead == -1 && errno == EINTR );
793  mutex.lock();
794 
795  if ( numRead < 0 ) {
796  errorCode = errno;
797  error = true;
798  qDebug( "%p: Reader::run: got error: %d", ( void* )this, errorCode );
799  } else if ( numRead == 0 ) {
800  qDebug( "%p: Reader::run: eof detected", ( void* )this );
801  eof = true;
802  }
803 #endif
804  qDebug( "%p (fd=%d): Reader::run: read %ld bytes", ( void* ) this, fd, static_cast<long>(numRead) );
805  qDebug( "%p (fd=%d): Reader::run: %s", ( void* )this, fd, buffer );
806 
807  if ( numRead > 0 ) {
808  qDebug( "%p: Reader::run: buffer before: rptr=%4d, wptr=%4d", ( void* )this, rptr, wptr );
809  wptr = ( wptr + numRead ) % sizeof buffer;
810  qDebug( "%p: Reader::run: buffer after: rptr=%4d, wptr=%4d", ( void* )this, rptr, wptr );
811  }
812  }
813  }
814  leave:
815  qDebug( "%p: Reader::run: terminated", ( void* )this );
816 }
817 
818 void Reader::notifyReadyRead()
819 {
820  qDebug( "notifyReadyRead: %d bytes available", bytesInBuffer() );
821  assert( !cancel );
822 
823  if ( consumerBlocksOnUs ) {
824  bufferNotEmptyCondition.wakeAll();
825  blockedConsumerIsDoneCondition.wait( &mutex );
826  return;
827  }
828  qDebug( "notifyReadyRead: emit signal" );
829  emit readyRead();
830  readyReadSentCondition.wait( &mutex );
831  qDebug( "notifyReadyRead: returning from waiting, leave" );
832 }
833 
834 void Writer::run() {
835 
836  LOCKED( this );
837 
838  // too bad QThread doesn't have that itself; a signal isn't enough
839  hasStarted.wakeAll();
840 
841  qDebug( ) << this << "Writer::run: started";
842 
843  while ( true ) {
844 
845  while ( !cancel && bufferEmpty() ) {
846  qDebug( ) << this << "Writer::run: buffer is empty, wake bufferEmptyCond listeners";
847  bufferEmptyCondition.wakeAll();
848  emit bytesWritten( 0 );
849  qDebug( ) << this << "Writer::run: buffer is empty, going to sleep";
850  bufferNotEmptyCondition.wait( &mutex );
851  qDebug( ) << this << "Writer::run: woke up";
852  }
853 
854  if ( cancel ) {
855  qDebug( ) << this << "Writer::run: detected cancel";
856  goto leave;
857  }
858 
859  assert( numBytesInBuffer > 0 );
860 
861  qDebug( ) << this << "Writer::run: Trying to write " << numBytesInBuffer << "bytes";
862  qint64 totalWritten = 0;
863  do {
864  mutex.unlock();
865 #ifdef Q_OS_WIN32
866  DWORD numWritten;
867  qDebug( "%p (fd=%d): Writer::run: buffer before WriteFile (numBytes=%lld): %s:",
868  ( void*) this, fd, numBytesInBuffer, buffer );
869  qDebug( "%p (fd=%d): Writer::run: Going into WriteFile", ( void* ) this, fd );
870  if ( !WriteFile( handle, buffer + totalWritten, numBytesInBuffer - totalWritten, &numWritten, 0 ) ) {
871  mutex.lock();
872  errorCode = static_cast<int>( GetLastError() );
873  qDebug( "%p: Writer::run: got error code: %d", ( void* ) this, errorCode );
874  error = true;
875  goto leave;
876  }
877 #else
878  qint64 numWritten;
879  do {
880  numWritten = ::write( fd, buffer + totalWritten, numBytesInBuffer - totalWritten );
881  } while ( numWritten == -1 && errno == EINTR );
882 
883  if ( numWritten < 0 ) {
884  mutex.lock();
885  errorCode = errno;
886  qDebug( "%p: Writer::run: got error code: %s (%d)", ( void* )this, strerror( errorCode ), errorCode );
887  error = true;
888  goto leave;
889  }
890 #endif
891  qDebug( "%p (fd=%d): Writer::run: buffer after WriteFile (numBytes=%u): %s:", ( void* )this, fd, numBytesInBuffer, buffer );
892  totalWritten += numWritten;
893  mutex.lock();
894  } while ( totalWritten < numBytesInBuffer );
895 
896  qDebug() << this << "Writer::run: wrote " << totalWritten << "bytes";
897  numBytesInBuffer = 0;
898  qDebug() << this << "Writer::run: buffer is empty, wake bufferEmptyCond listeners";
899  bufferEmptyCondition.wakeAll();
900  emit bytesWritten( totalWritten );
901  }
902  leave:
903  qDebug() << this << "Writer::run: terminating";
904  numBytesInBuffer = 0;
905  qDebug() << this << "Writer::run: buffer is empty, wake bufferEmptyCond listeners";
906  bufferEmptyCondition.wakeAll();
907  emit bytesWritten( 0 );
908 }
909 
910 // static
911 std::pair<KDPipeIODevice*,KDPipeIODevice*> KDPipeIODevice::makePairOfConnectedPipes() {
912  KDPipeIODevice * read = 0;
913  KDPipeIODevice * write = 0;
914 #ifdef Q_OS_WIN32
915  HANDLE rh;
916  HANDLE wh;
917  SECURITY_ATTRIBUTES sa;
918  memset( &sa, 0, sizeof(sa) );
919  sa.nLength = sizeof(sa);
920  sa.bInheritHandle = TRUE;
921  if ( CreatePipe( &rh, &wh, &sa, BUFFER_SIZE ) ) {
922  read = new KDPipeIODevice;
923  read->open( rh, ReadOnly );
924  write = new KDPipeIODevice;
925  write->open( wh, WriteOnly );
926  }
927 #else
928  int fds[2];
929  if ( pipe( fds ) == 0 ) {
930  read = new KDPipeIODevice;
931  read->open( fds[0], ReadOnly );
932  write = new KDPipeIODevice;
933  write->open( fds[1], WriteOnly );
934  }
935 #endif
936  return std::make_pair( read, write );
937 }
938 
939 #ifdef KDAB_DEFINE_CHECKS
940 KDAB_DEFINE_CHECKS( KDPipeIODevice ) {
941  if ( !isOpen() ) {
942  assert( openMode() == NotOpen );
943  assert( !d->reader );
944  assert( !d->writer );
945 #ifdef Q_OS_WIN32
946  assert( !d->handle );
947 #else
948  assert( d->fd < 0 );
949 #endif
950  } else {
951  assert( openMode() != NotOpen );
952  assert( openMode() & ReadWrite );
953  if ( openMode() & ReadOnly ) {
954  assert( d->reader );
955  synchronized( d->reader )
956  assert( d->reader->eof || d->reader->error || d->reader->isRunning() );
957  }
958  if ( openMode() & WriteOnly ) {
959  assert( d->writer );
960  synchronized( d->writer )
961  assert( d->writer->error || d->writer->isRunning() );
962  }
963 #ifdef Q_OS_WIN32
964  assert( d->handle );
965 #else
966  assert( d->fd >= 0 );
967 #endif
968  }
969 }
970 #endif // KDAB_DEFINE_CHECKS
971 
972 #include "moc_kdpipeiodevice.cpp"
973 #include "kdpipeiodevice.moc"
KDPipeIODevice::waitForBytesWritten
bool waitForBytesWritten(int msecs)
Definition: kdpipeiodevice.cpp:480
KDPipeIODevice::writeData
qint64 writeData(const char *data, qint64 maxSize)
Definition: kdpipeiodevice.cpp:609
KDPipeIODevice::bytesToWrite
qint64 bytesToWrite() const
Definition: kdpipeiodevice.cpp:432
KDPipeIODevice::debugLevel
static DebugLevel debugLevel()
Definition: kdpipeiodevice.cpp:235
KDPipeIODevice::readWouldBlock
bool readWouldBlock() const
Definition: kdpipeiodevice.cpp:519
BUFFER_SIZE
const unsigned int BUFFER_SIZE
Definition: kdpipeiodevice.cpp:55
KDPipeIODevice::waitForReadyRead
bool waitForReadyRead(int msecs)
Definition: kdpipeiodevice.cpp:491
KDPipeIODevice::makePairOfConnectedPipes
static std::pair< KDPipeIODevice *, KDPipeIODevice * > makePairOfConnectedPipes()
Definition: kdpipeiodevice.cpp:911
KDPipeIODevice::readData
qint64 readData(char *data, qint64 maxSize)
Definition: kdpipeiodevice.cpp:534
LOCKED
#define LOCKED(d)
Definition: kdpipeiodevice.cpp:52
KDPipeIODevice::setDebugLevel
static void setDebugLevel(DebugLevel level)
Definition: kdpipeiodevice.cpp:240
KDPipeIODevice::descriptor
int descriptor() const
Definition: kdpipeiodevice.cpp:408
KDPipeIODevice::isSequential
bool isSequential() const
Definition: kdpipeiodevice.cpp:451
KDPipeIODevice::writeWouldBlock
bool writeWouldBlock() const
Definition: kdpipeiodevice.cpp:526
d
#define d
Definition: adduseridcommand.cpp:90
KDPipeIODevice::handle
Qt::HANDLE handle() const
Definition: kdpipeiodevice.cpp:413
KDPipeIODevice::~KDPipeIODevice
~KDPipeIODevice()
Definition: kdpipeiodevice.cpp:280
KDPipeIODevice::DebugLevel
DebugLevel
Definition: kdpipeiodevice.h:33
KDPipeIODevice
Definition: kdpipeiodevice.h:29
KDAB_CHECK_DTOR
#define KDAB_CHECK_DTOR
Definition: kdpipeiodevice.cpp:48
KDPipeIODevice::open
bool open(int fd, OpenMode mode=ReadOnly)
Definition: kdpipeiodevice.cpp:287
mutex
static QMutex mutex
Definition: sessiondata.cpp:48
KDAB_CHECK_CTOR
#define KDAB_CHECK_CTOR
Definition: kdpipeiodevice.cpp:47
waitAndDelete
#define waitAndDelete(t)
KDAB_DEFINE_CHECKS
#define KDAB_DEFINE_CHECKS(Class)
Definition: checker.h:114
KDPipeIODevice::atEnd
bool atEnd() const
Definition: kdpipeiodevice.cpp:455
KDPipeIODevice::bytesAvailable
qint64 bytesAvailable() const
Definition: kdpipeiodevice.cpp:417
q
#define q
Definition: adduseridcommand.cpp:91
KDAB_CHECK_THIS
#define KDAB_CHECK_THIS
Definition: kdpipeiodevice.cpp:49
kdpipeiodevice.h
KDPipeIODevice::NoDebug
Definition: kdpipeiodevice.h:34
KDPipeIODevice::close
void close()
Definition: kdpipeiodevice.cpp:680
QIODevice
KDPipeIODevice::KDPipeIODevice
KDPipeIODevice(QObject *parent=0)
Definition: kdpipeiodevice.cpp:260
KDPipeIODevice::canReadLine
bool canReadLine() const
Definition: kdpipeiodevice.cpp:441
qDebug
#define qDebug
Definition: kdpipeiodevice.cpp:62
ALLOW_QIODEVICE_BUFFERING
const bool ALLOW_QIODEVICE_BUFFERING
Definition: kdpipeiodevice.cpp:56
This file is part of the KDE documentation.
Documentation copyright © 1996-2014 The KDE developers.
Generated on Tue Oct 14 2014 22:56:41 by doxygen 1.8.7 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.

kleopatra

Skip menu "kleopatra"
  • Main Page
  • Namespace List
  • Namespace Members
  • Alphabetical List
  • Class List
  • Class Hierarchy
  • Class Members
  • File List
  • File Members

kdepim API Reference

Skip menu "kdepim API Reference"
  • akonadi_next
  • akregator
  • blogilo
  • calendarsupport
  • console
  •   kabcclient
  •   konsolekalendar
  • kaddressbook
  • kalarm
  •   lib
  • kdgantt2
  • kjots
  • kleopatra
  • kmail
  • knode
  • knotes
  • kontact
  • korgac
  • korganizer
  • ktimetracker
  • libkdepim
  • libkleo
  • libkpgp
  • mailcommon
  • messagelist
  • messageviewer

Search



Report problems with this website to our bug tracking system.
Contact the specific authors with questions and comments about the page contents.

KDE® and the K Desktop Environment® logo are registered trademarks of KDE e.V. | Legal