KDECore
kbufferedsocket.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include <config.h>
00026
00027 #include <qmutex.h>
00028 #include <qtimer.h>
00029
00030 #include "ksocketdevice.h"
00031 #include "ksocketaddress.h"
00032 #include "ksocketbuffer_p.h"
00033 #include "kbufferedsocket.h"
00034
00035 using namespace KNetwork;
00036 using namespace KNetwork::Internal;
00037
00038 class KNetwork::KBufferedSocketPrivate
00039 {
00040 public:
00041 mutable KSocketBuffer *input, *output;
00042
00043 KBufferedSocketPrivate()
00044 {
00045 input = 0L;
00046 output = 0L;
00047 }
00048 };
00049
00050 KBufferedSocket::KBufferedSocket(const QString& host, const QString& service,
00051 QObject *parent, const char *name)
00052 : KStreamSocket(host, service, parent, name),
00053 d(new KBufferedSocketPrivate)
00054 {
00055 setInputBuffering(true);
00056 setOutputBuffering(true);
00057 }
00058
00059 KBufferedSocket::~KBufferedSocket()
00060 {
00061 closeNow();
00062 delete d->input;
00063 delete d->output;
00064 delete d;
00065 }
00066
00067 void KBufferedSocket::setSocketDevice(KSocketDevice* device)
00068 {
00069 KStreamSocket::setSocketDevice(device);
00070 device->setBlocking(false);
00071 }
00072
00073 bool KBufferedSocket::setSocketOptions(int opts)
00074 {
00075 if (opts == Blocking)
00076 return false;
00077
00078 opts &= ~Blocking;
00079 return KStreamSocket::setSocketOptions(opts);
00080 }
00081
00082 void KBufferedSocket::close()
00083 {
00084 if (!d->output || d->output->isEmpty())
00085 closeNow();
00086 else
00087 {
00088 setState(Closing);
00089 QSocketNotifier *n = socketDevice()->readNotifier();
00090 if (n)
00091 n->setEnabled(false);
00092 emit stateChanged(Closing);
00093 }
00094 }
00095
00096 Q_LONG KBufferedSocket::bytesAvailable() const
00097 {
00098 if (!d->input)
00099 return KStreamSocket::bytesAvailable();
00100
00101 return d->input->length();
00102 }
00103
00104 Q_LONG KBufferedSocket::waitForMore(int msecs, bool *timeout)
00105 {
00106 Q_LONG retval = KStreamSocket::waitForMore(msecs, timeout);
00107 if (d->input)
00108 {
00109 resetError();
00110 slotReadActivity();
00111 return bytesAvailable();
00112 }
00113 return retval;
00114 }
00115
00116 Q_LONG KBufferedSocket::readBlock(char *data, Q_ULONG maxlen)
00117 {
00118 if (d->input)
00119 {
00120 if (d->input->isEmpty())
00121 {
00122 setError(IO_ReadError, WouldBlock);
00123 emit gotError(WouldBlock);
00124 return -1;
00125 }
00126 resetError();
00127 return d->input->consumeBuffer(data, maxlen);
00128 }
00129 return KStreamSocket::readBlock(data, maxlen);
00130 }
00131
00132 Q_LONG KBufferedSocket::readBlock(char *data, Q_ULONG maxlen, KSocketAddress& from)
00133 {
00134 from = peerAddress();
00135 return readBlock(data, maxlen);
00136 }
00137
00138 Q_LONG KBufferedSocket::peekBlock(char *data, Q_ULONG maxlen)
00139 {
00140 if (d->input)
00141 {
00142 if (d->input->isEmpty())
00143 {
00144 setError(IO_ReadError, WouldBlock);
00145 emit gotError(WouldBlock);
00146 return -1;
00147 }
00148 resetError();
00149 return d->input->consumeBuffer(data, maxlen, false);
00150 }
00151 return KStreamSocket::peekBlock(data, maxlen);
00152 }
00153
00154 Q_LONG KBufferedSocket::peekBlock(char *data, Q_ULONG maxlen, KSocketAddress& from)
00155 {
00156 from = peerAddress();
00157 return peekBlock(data, maxlen);
00158 }
00159
00160 Q_LONG KBufferedSocket::writeBlock(const char *data, Q_ULONG len)
00161 {
00162 if (state() != Connected)
00163 {
00164
00165 setError(IO_WriteError, NotConnected);
00166 return -1;
00167 }
00168
00169 if (d->output)
00170 {
00171 if (d->output->isFull())
00172 {
00173 setError(IO_WriteError, WouldBlock);
00174 emit gotError(WouldBlock);
00175 return -1;
00176 }
00177 resetError();
00178
00179
00180 QSocketNotifier *n = socketDevice()->writeNotifier();
00181 if (n)
00182 n->setEnabled(true);
00183
00184 return d->output->feedBuffer(data, len);
00185 }
00186
00187 return KStreamSocket::writeBlock(data, len);
00188 }
00189
00190 Q_LONG KBufferedSocket::writeBlock(const char *data, Q_ULONG maxlen,
00191 const KSocketAddress&)
00192 {
00193
00194 return writeBlock(data, maxlen);
00195 }
00196
00197 void KBufferedSocket::enableRead(bool enable)
00198 {
00199 KStreamSocket::enableRead(enable);
00200 if (!enable && d->input)
00201 {
00202
00203 QSocketNotifier *n = socketDevice()->readNotifier();
00204 if (n)
00205 n->setEnabled(true);
00206 }
00207
00208 if (enable && state() != Connected && d->input && !d->input->isEmpty())
00209
00210
00211 QTimer::singleShot(0, this, SLOT(slotReadActivity()));
00212 }
00213
00214 void KBufferedSocket::enableWrite(bool enable)
00215 {
00216 KStreamSocket::enableWrite(enable);
00217 if (!enable && d->output && !d->output->isEmpty())
00218 {
00219
00220 QSocketNotifier *n = socketDevice()->writeNotifier();
00221 if (n)
00222 n->setEnabled(true);
00223 }
00224 }
00225
00226 void KBufferedSocket::stateChanging(SocketState newState)
00227 {
00228 if (newState == Connecting || newState == Connected)
00229 {
00230
00231
00232 if (d->input)
00233 d->input->clear();
00234 if (d->output)
00235 d->output->clear();
00236
00237
00238 enableRead(emitsReadyRead());
00239 enableWrite(emitsReadyWrite());
00240 }
00241 KStreamSocket::stateChanging(newState);
00242 }
00243
00244 void KBufferedSocket::setInputBuffering(bool enable)
00245 {
00246 QMutexLocker locker(mutex());
00247 if (!enable)
00248 {
00249 delete d->input;
00250 d->input = 0L;
00251 }
00252 else if (d->input == 0L)
00253 {
00254 d->input = new KSocketBuffer;
00255 }
00256 }
00257
00258 KIOBufferBase* KBufferedSocket::inputBuffer()
00259 {
00260 return d->input;
00261 }
00262
00263 void KBufferedSocket::setOutputBuffering(bool enable)
00264 {
00265 QMutexLocker locker(mutex());
00266 if (!enable)
00267 {
00268 delete d->output;
00269 d->output = 0L;
00270 }
00271 else if (d->output == 0L)
00272 {
00273 d->output = new KSocketBuffer;
00274 }
00275 }
00276
00277 KIOBufferBase* KBufferedSocket::outputBuffer()
00278 {
00279 return d->output;
00280 }
00281
00282 Q_ULONG KBufferedSocket::bytesToWrite() const
00283 {
00284 if (!d->output)
00285 return 0;
00286
00287 return d->output->length();
00288 }
00289
00290 void KBufferedSocket::closeNow()
00291 {
00292 KStreamSocket::close();
00293 if (d->output)
00294 d->output->clear();
00295 }
00296
00297 bool KBufferedSocket::canReadLine() const
00298 {
00299 if (!d->input)
00300 return false;
00301
00302 return d->input->canReadLine();
00303 }
00304
00305 QCString KBufferedSocket::readLine()
00306 {
00307 return d->input->readLine();
00308 }
00309
00310 void KBufferedSocket::waitForConnect()
00311 {
00312 if (state() != Connecting)
00313 return;
00314
00315 KStreamSocket::setSocketOptions(socketOptions() | Blocking);
00316 connectionEvent();
00317 KStreamSocket::setSocketOptions(socketOptions() & ~Blocking);
00318 }
00319
00320 void KBufferedSocket::slotReadActivity()
00321 {
00322 if (d->input && state() == Connected)
00323 {
00324 mutex()->lock();
00325 Q_LONG len = d->input->receiveFrom(socketDevice());
00326
00327 if (len == -1)
00328 {
00329 if (socketDevice()->error() != WouldBlock)
00330 {
00331
00332 copyError();
00333 mutex()->unlock();
00334 emit gotError(error());
00335 closeNow();
00336 return;
00337 }
00338 }
00339 else if (len == 0)
00340 {
00341
00342 setError(IO_ReadError, RemotelyDisconnected);
00343 mutex()->unlock();
00344 emit gotError(error());
00345 closeNow();
00346 return;
00347 }
00348
00349
00350 mutex()->unlock();
00351 }
00352
00353 if (state() == Connected)
00354 KStreamSocket::slotReadActivity();
00355 else if (emitsReadyRead())
00356 {
00357 if (d->input && !d->input->isEmpty())
00358 {
00359
00360
00361 QTimer::singleShot(0, this, SLOT(slotReadActivity()));
00362 emit readyRead();
00363 }
00364 }
00365 }
00366
00367 void KBufferedSocket::slotWriteActivity()
00368 {
00369 if (d->output && !d->output->isEmpty() &&
00370 (state() == Connected || state() == Closing))
00371 {
00372 mutex()->lock();
00373 Q_LONG len = d->output->sendTo(socketDevice());
00374
00375 if (len == -1)
00376 {
00377 if (socketDevice()->error() != WouldBlock)
00378 {
00379
00380 copyError();
00381 mutex()->unlock();
00382 emit gotError(error());
00383 closeNow();
00384 return;
00385 }
00386 }
00387 else if (len == 0)
00388 {
00389
00390 setError(IO_ReadError, RemotelyDisconnected);
00391 mutex()->unlock();
00392 emit gotError(error());
00393 closeNow();
00394 return;
00395 }
00396
00397 if (d->output->isEmpty())
00398
00399
00400 socketDevice()->writeNotifier()->setEnabled(false);
00401
00402 mutex()->unlock();
00403 emit bytesWritten(len);
00404 }
00405
00406 if (state() != Closing)
00407 KStreamSocket::slotWriteActivity();
00408 else if (d->output && d->output->isEmpty() && state() == Closing)
00409 {
00410 KStreamSocket::close();
00411 }
00412 }
00413
00414 #include "kbufferedsocket.moc"