00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "kmessageio.h"
00025 #include <QTcpSocket>
00026 #include <kdebug.h>
00027 #include <kprocess.h>
00028 #include <QFile>
00029 #include <QDataStream>
00030
00031
00032 KMessageIO::KMessageIO (QObject *parent)
00033 : QObject (parent), m_id (0)
00034 {}
00035
00036 KMessageIO::~KMessageIO ()
00037 {}
00038
00039 void KMessageIO::setId (quint32 id)
00040 {
00041 m_id = id;
00042 }
00043
00044 quint32 KMessageIO::id ()
00045 {
00046 return m_id;
00047 }
00048
00049
00050
00051 KMessageSocket::KMessageSocket (const QString& host, quint16 port, QObject *parent)
00052 : KMessageIO (parent)
00053 {
00054 mSocket = new QTcpSocket ();
00055 mSocket->connectToHost (host, port);
00056 initSocket ();
00057 }
00058
00059 KMessageSocket::KMessageSocket (QHostAddress host, quint16 port, QObject *parent)
00060 : KMessageIO (parent)
00061 {
00062 mSocket = new QTcpSocket ();
00063 mSocket->connectToHost (host.toString(), port);
00064 initSocket ();
00065 }
00066
00067 KMessageSocket::KMessageSocket (QTcpSocket *socket, QObject *parent)
00068 : KMessageIO (parent)
00069 {
00070 mSocket = socket;
00071 initSocket ();
00072 }
00073
00074 KMessageSocket::KMessageSocket (int socketFD, QObject *parent)
00075 : KMessageIO (parent)
00076 {
00077 mSocket = new QTcpSocket ();
00078 mSocket->setSocketDescriptor (socketFD);
00079 initSocket ();
00080 }
00081
00082 KMessageSocket::~KMessageSocket ()
00083 {
00084 delete mSocket;
00085 }
00086
00087 bool KMessageSocket::isConnected () const
00088 {
00089 return mSocket->state() == QAbstractSocket::ConnectedState;
00090 }
00091
00092 void KMessageSocket::send (const QByteArray &msg)
00093 {
00094 QDataStream str (mSocket);
00095 str << quint8 ('M');
00096 str.writeBytes (msg.data(), msg.size());
00097 }
00098
00099 void KMessageSocket::processNewData ()
00100 {
00101 if (isRecursive)
00102 return;
00103 isRecursive = true;
00104
00105 QDataStream str (mSocket);
00106 while (mSocket->bytesAvailable() > 0)
00107 {
00108 if (mAwaitingHeader)
00109 {
00110
00111 if (mSocket->bytesAvailable() < 5)
00112 {
00113 isRecursive = false;
00114 return;
00115 }
00116
00117
00118
00119
00120 quint8 v;
00121 str >> v;
00122 if (v != 'M')
00123 {
00124 kWarning(11001) << ": Received unexpected data, magic number wrong!";
00125 continue;
00126 }
00127
00128 str >> mNextBlockLength;
00129 mAwaitingHeader = false;
00130 }
00131 else
00132 {
00133
00134 if (mSocket->bytesAvailable() < (qint64) mNextBlockLength)
00135 {
00136 isRecursive = false;
00137 return;
00138 }
00139
00140 QByteArray msg (mNextBlockLength, 0);
00141 str.readRawData (msg.data(), mNextBlockLength);
00142
00143
00144 emit received (msg);
00145
00146
00147 mAwaitingHeader = true;
00148 }
00149 }
00150
00151 isRecursive = false;
00152 }
00153
00154 void KMessageSocket::initSocket ()
00155 {
00156 connect (mSocket, SIGNAL (error(QAbstractSocket::SocketError)), this, SIGNAL (connectionBroken()));
00157 connect (mSocket, SIGNAL (disconnected()), this, SIGNAL (connectionBroken()));
00158 connect (mSocket, SIGNAL (readyRead()), this, SLOT (processNewData()));
00159 mAwaitingHeader = true;
00160 mNextBlockLength = 0;
00161 isRecursive = false;
00162 }
00163
00164 quint16 KMessageSocket::peerPort () const
00165 {
00166 return mSocket->peerPort();
00167 }
00168
00169 QString KMessageSocket::peerName () const
00170 {
00171 return mSocket->peerName();
00172 }
00173
00174
00175
00176 KMessageDirect::KMessageDirect (KMessageDirect *partner, QObject *parent)
00177 : KMessageIO (parent), mPartner (0)
00178 {
00179
00180 if (!partner)
00181 return;
00182
00183
00184 if (partner && partner->mPartner)
00185 {
00186 kWarning(11001) << ": Object is already connected!";
00187 return;
00188 }
00189
00190
00191 mPartner = partner;
00192
00193
00194 partner->mPartner = this;
00195 }
00196
00197 KMessageDirect::~KMessageDirect ()
00198 {
00199 if (mPartner)
00200 {
00201 mPartner->mPartner = 0;
00202 emit mPartner->connectionBroken();
00203 }
00204 }
00205
00206 bool KMessageDirect::isConnected () const
00207 {
00208 return mPartner != 0;
00209 }
00210
00211 void KMessageDirect::send (const QByteArray &msg)
00212 {
00213 if (mPartner)
00214 emit mPartner->received (msg);
00215 else
00216 kError(11001) << ": Not yet connected!";
00217 }
00218
00219
00220
00221
00222 KMessageProcess::~KMessageProcess()
00223 {
00224 kDebug(11001) << "@@@KMessageProcess::Delete process";
00225 if (mProcess)
00226 {
00227 mProcess->kill();
00228 mProcess->deleteLater();
00229 mProcess=0;
00230
00231 }
00232 }
00233 KMessageProcess::KMessageProcess(QObject *parent, const QString& file) : KMessageIO(parent)
00234 {
00235
00236 kDebug(11001) << "@@@KMessageProcess::Start process";
00237 mProcessName=file;
00238 mProcess=new KProcess;
00239
00240 mProcess-> setOutputChannelMode(KProcess::SeparateChannels);
00241 int id=0;
00242 *mProcess << mProcessName << QString("%1").arg(id);
00243 kDebug(11001) << "@@@KMessageProcess::Init:Id=" << id;
00244 kDebug(11001) << "@@@KMessgeProcess::Init:Processname:" << mProcessName;
00245 connect(mProcess, SIGNAL(readyReadStandardOutput()), this, SLOT(slotReceivedStdout()));
00246 connect(mProcess, SIGNAL(readyReadStandardError()), this, SLOT(slotReceivedStderr()));
00247 connect(mProcess, SIGNAL(finished (int, QProcess::ExitStatus)),
00248 this, SLOT(slotProcessExited(int, QProcess::ExitStatus)));
00249 mProcess->start();
00250 mSendBuffer=0;
00251 mReceiveCount=0;
00252 mReceiveBuffer.resize(1024);
00253 }
00254 bool KMessageProcess::isConnected() const
00255 {
00256 kDebug(11001) << "@@@KMessageProcess::Is conencted";
00257 if (!mProcess)
00258 return false;
00259 return (mProcess->state() == QProcess::Running);
00260 }
00261
00262
00263 void KMessageProcess::send(const QByteArray &msg)
00264 {
00265 kDebug(11001) << "@@@KMessageProcess:: SEND("<<msg.size()<<") to process";
00266 unsigned int size=msg.size()+2*sizeof(long);
00267
00268 if (mProcess == 0) {
00269 kDebug(11001) << "@@@KMessageProcess:: cannot write to stdin, no process available";
00270 return;
00271 }
00272
00273 char *tmpbuffer=new char[size];
00274 long *p1=(long *)tmpbuffer;
00275 long *p2=p1+1;
00276 kDebug(11001) << "p1="<<p1 << "p2="<< p2;
00277 memcpy(tmpbuffer+2*sizeof(long),msg.data(),msg.size());
00278 *p1=0x4242aeae;
00279 *p2=size;
00280
00281
00282 mProcess->write(tmpbuffer,size);
00283 delete [] tmpbuffer;
00284 }
00285
00286 void KMessageProcess::slotReceivedStderr()
00287 {
00288 QByteArray ba;
00289 kDebug(11001)<<"@@@ KMessageProcess::slotReceivedStderr";
00290
00291 mProcess->setReadChannel(QProcess::StandardError);
00292 while(mProcess->canReadLine())
00293 {
00294 ba = mProcess->readLine();
00295 if( ba.isEmpty() )
00296 return;
00297 ba.chop( 1 );
00298
00299 kDebug(11001) << "KProcess (" << ba.size() << "):" << ba.constData();
00300 emit signalReceivedStderr(ba);
00301 ba.clear();
00302 };
00303 }
00304
00305
00306 void KMessageProcess::slotReceivedStdout()
00307 {
00308 mProcess->setReadChannel(QProcess::StandardOutput);
00309 QByteArray ba = mProcess->readAll();
00310 kDebug(11001) << "$$$$$$ " << ": Received" << ba.size() << "bytes over inter process communication";
00311
00312
00313 while (mReceiveCount+ba.size()>=mReceiveBuffer.size()) mReceiveBuffer.resize(mReceiveBuffer.size()+1024);
00314
00315 qCopy(ba.begin(), ba.begin()+ba.size(), mReceiveBuffer.begin()+mReceiveCount);
00316 mReceiveCount += ba.size();
00317
00318
00319 while (mReceiveCount>int(2*sizeof(long)))
00320 {
00321 long *p1=(long *)mReceiveBuffer.data();
00322 long *p2=p1+1;
00323 int len;
00324 if (*p1!=0x4242aeae)
00325 {
00326 kDebug(11001) << ": Cookie error...transmission failure...serious problem...";
00327 }
00328 len=(int)(*p2);
00329 if (len<int(2*sizeof(long)))
00330 {
00331 kDebug(11001) << ": Message size error";
00332 break;
00333 }
00334 if (len<=mReceiveCount)
00335 {
00336 kDebug(11001) << ": Got message with len" << len;
00337
00338 QByteArray msg ;
00339 msg.resize(len);
00340
00341
00342 qCopy(mReceiveBuffer.begin()+2*sizeof(long),mReceiveBuffer.begin()+len, msg.begin());
00343
00344 emit received(msg);
00345
00346
00347 if (len<mReceiveCount)
00348 {
00349 memmove(mReceiveBuffer.data(),mReceiveBuffer.data()+len,mReceiveCount-len);
00350 }
00351 mReceiveCount-=len;
00352 }
00353 else break;
00354 }
00355 }
00356
00357 void KMessageProcess::slotProcessExited(int exitCode, QProcess::ExitStatus)
00358 {
00359 kDebug(11001) << "Process exited (slot) with code" << exitCode;
00360 emit connectionBroken();
00361 delete mProcess;
00362 mProcess=0;
00363 }
00364
00365
00366
00367 KMessageFilePipe::KMessageFilePipe(QObject *parent,QFile *readfile,QFile *writefile) : KMessageIO(parent)
00368 {
00369 mReadFile=readfile;
00370 mWriteFile=writefile;
00371 mReceiveCount=0;
00372 mReceiveBuffer.resize(1024);
00373 }
00374
00375 KMessageFilePipe::~KMessageFilePipe()
00376 {
00377 }
00378
00379 bool KMessageFilePipe::isConnected () const
00380 {
00381 return (mReadFile!=0)&&(mWriteFile!=0);
00382 }
00383
00384
00385 void KMessageFilePipe::send(const QByteArray &msg)
00386 {
00387 unsigned int size=msg.size()+2*sizeof(long);
00388
00389 char *tmpbuffer=new char[size];
00390 long *p1=(long *)tmpbuffer;
00391 long *p2=p1+1;
00392 memcpy(tmpbuffer+2*sizeof(long),msg.data(),msg.size());
00393 *p1=0x4242aeae;
00394 *p2=size;
00395
00396 QByteArray buffer(tmpbuffer,size);
00397 mWriteFile->write(buffer);
00398 mWriteFile->flush();
00399 delete [] tmpbuffer;
00400
00401
00402
00403
00404
00405
00406
00407
00408 }
00409
00410 void KMessageFilePipe::exec()
00411 {
00412
00413
00414
00415 char ch;
00416 mReadFile->getChar(&ch);
00417
00418 while (mReceiveCount>=mReceiveBuffer.size()) mReceiveBuffer.resize(mReceiveBuffer.size()+1024);
00419 mReceiveBuffer[mReceiveCount]=ch;
00420 mReceiveCount++;
00421
00422
00423 if (mReceiveCount>=int(2*sizeof(long)))
00424 {
00425 long *p1=(long *)mReceiveBuffer.data();
00426 long *p2=p1+1;
00427 int len;
00428 if (*p1!=0x4242aeae)
00429 {
00430 fprintf(stderr,"KMessageFilePipe::exec:: Cookie error...transmission failure...serious problem...\n");
00431 fflush(stderr);
00432
00433 }
00434 len=(int)(*p2);
00435 if (len==mReceiveCount)
00436 {
00437
00438
00439 QByteArray msg;
00440 msg.resize(len);
00441
00442 qCopy(mReceiveBuffer.begin()+2*sizeof(long),mReceiveBuffer.begin()+len, msg.begin());
00443
00444 emit received(msg);
00445
00446 mReceiveCount=0;
00447 }
00448 }
00449
00450
00451 return ;
00452
00453
00454 }
00455
00456 #include "kmessageio.moc"