00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "connection.h"
00023 #include "connection_p.h"
00024
00025 #include <errno.h>
00026
00027 #include <QQueue>
00028 #include <QPointer>
00029 #include <QTime>
00030
00031 #include "kdebug.h"
00032 #include "kcomponentdata.h"
00033 #include "kglobal.h"
00034 #include "klocale.h"
00035 #include "kstandarddirs.h"
00036 #include "ktemporaryfile.h"
00037 #include "kurl.h"
00038
00039 using namespace KIO;
00040
00041 class KIO::ConnectionPrivate
00042 {
00043 public:
00044 inline ConnectionPrivate()
00045 : backend(0), suspended(false)
00046 { }
00047
00048 void dequeue();
00049 void commandReceived(const Task &task);
00050 void disconnected();
00051 void setBackend(AbstractConnectionBackend *b);
00052
00053 QQueue<Task> outgoingTasks;
00054 QQueue<Task> incomingTasks;
00055 AbstractConnectionBackend *backend;
00056 Connection *q;
00057 bool suspended;
00058 };
00059
00060 class KIO::ConnectionServerPrivate
00061 {
00062 public:
00063 inline ConnectionServerPrivate()
00064 : backend(0)
00065 { }
00066
00067 ConnectionServer *q;
00068 AbstractConnectionBackend *backend;
00069 };
00070
00071 void ConnectionPrivate::dequeue()
00072 {
00073 if (!backend || suspended)
00074 return;
00075
00076 while (!outgoingTasks.isEmpty()) {
00077 const Task task = outgoingTasks.dequeue();
00078 q->sendnow(task.cmd, task.data);
00079 }
00080
00081 if (!incomingTasks.isEmpty())
00082 emit q->readyRead();
00083 }
00084
00085 void ConnectionPrivate::commandReceived(const Task &task)
00086 {
00087
00088 if (!suspended && incomingTasks.isEmpty())
00089 QMetaObject::invokeMethod(q, "dequeue", Qt::QueuedConnection);
00090 incomingTasks.enqueue(task);
00091 }
00092
00093 void ConnectionPrivate::disconnected()
00094 {
00095 q->close();
00096 QMetaObject::invokeMethod(q, "readyRead", Qt::QueuedConnection);
00097 }
00098
00099 void ConnectionPrivate::setBackend(AbstractConnectionBackend *b)
00100 {
00101 backend = b;
00102 if (backend) {
00103 q->connect(backend, SIGNAL(commandReceived(Task)), SLOT(commandReceived(Task)));
00104 q->connect(backend, SIGNAL(disconnected()), SLOT(disconnected()));
00105 backend->setSuspended(suspended);
00106 }
00107 }
00108
00109 AbstractConnectionBackend::AbstractConnectionBackend(QObject *parent)
00110 : QObject(parent), state(Idle)
00111 {
00112 }
00113
00114 AbstractConnectionBackend::~AbstractConnectionBackend()
00115 {
00116 }
00117
00118 SocketConnectionBackend::SocketConnectionBackend(Mode m, QObject *parent)
00119 : AbstractConnectionBackend(parent), socket(0), len(-1), cmd(0),
00120 signalEmitted(false), mode(m)
00121 {
00122 localServer = 0;
00123
00124 }
00125
00126 SocketConnectionBackend::~SocketConnectionBackend()
00127 {
00128 if (mode == LocalSocketMode && localServer &&
00129 localServer->localSocketType() == KLocalSocket::UnixSocket)
00130 QFile::remove(localServer->localPath());
00131 }
00132
00133 void SocketConnectionBackend::setSuspended(bool enable)
00134 {
00135 if (state != Connected)
00136 return;
00137 Q_ASSERT(socket);
00138 Q_ASSERT(!localServer);
00139
00140 if (enable) {
00141
00142 socket->setReadBufferSize(1);
00143 } else {
00144
00145 socket->setReadBufferSize(StandardBufferSize);
00146 if (socket->bytesAvailable() >= HeaderSize) {
00147
00148 QMetaObject::invokeMethod(this, "socketReadyRead", Qt::QueuedConnection);
00149 }
00150
00151
00152
00153
00154 QByteArray data = socket->read(socket->bytesAvailable() + 1);
00155 for (int i = data.size(); --i >= 0; )
00156 socket->ungetChar(data[i]);
00157 }
00158 }
00159
00160 bool SocketConnectionBackend::connectToRemote(const KUrl &url)
00161 {
00162 Q_ASSERT(state == Idle);
00163 Q_ASSERT(!socket);
00164 Q_ASSERT(!localServer);
00165
00166 if (mode == LocalSocketMode) {
00167 KLocalSocket *sock = new KLocalSocket(this);
00168 QString path = url.path();
00169 KLocalSocket::LocalSocketType type = KLocalSocket::UnixSocket;
00170
00171 if (url.queryItem(QLatin1String("abstract")) == QLatin1String("1"))
00172 type = KLocalSocket::AbstractUnixSocket;
00173
00174 sock->connectToPath(path);
00175 socket = sock;
00176 } else {
00177 socket = new QTcpSocket(this);
00178 socket->connectToHost(url.host(),url.port());
00179
00180 if (!socket->waitForConnected(1000)) {
00181 state = Idle;
00182 kDebug() << "could not connect to " << url;
00183 return false;
00184 }
00185 }
00186 connect(socket, SIGNAL(readyRead()), SLOT(socketReadyRead()));
00187 connect(socket, SIGNAL(disconnected()), SLOT(socketDisconnected()));
00188 state = Connected;
00189 return true;
00190 }
00191
00192 void SocketConnectionBackend::socketDisconnected()
00193 {
00194 state = Idle;
00195 emit disconnected();
00196 }
00197
00198 bool SocketConnectionBackend::listenForRemote()
00199 {
00200 Q_ASSERT(state == Idle);
00201 Q_ASSERT(!socket);
00202 Q_ASSERT(!localServer);
00203
00204 if (mode == LocalSocketMode) {
00205 QString prefix = KStandardDirs::locateLocal("socket", KGlobal::mainComponent().componentName());
00206 KTemporaryFile *socketfile = new KTemporaryFile();
00207 socketfile->setPrefix(prefix);
00208 socketfile->setSuffix(QLatin1String(".slave-socket"));
00209 if (!socketfile->open())
00210 {
00211 errorString = i18n("Unable to create io-slave: %1", strerror(errno));
00212 delete socketfile;
00213 return false;
00214 }
00215
00216 QString sockname = socketfile->fileName();
00217 KUrl addressUrl(sockname);
00218 addressUrl.setProtocol("local");
00219 address = addressUrl.url();
00220 delete socketfile;
00221
00222 localServer = new KLocalSocketServer(this);
00223 if (!localServer->listen(sockname, KLocalSocket::UnixSocket)) {
00224 errorString = localServer->errorString();
00225 delete localServer;
00226 return false;
00227 }
00228
00229 connect(localServer, SIGNAL(newConnection()), SIGNAL(newConnection()));
00230 } else {
00231 tcpServer = new QTcpServer(this);
00232 tcpServer->listen(QHostAddress::LocalHost);
00233 if (!tcpServer->isListening()) {
00234 errorString = tcpServer->errorString();
00235 delete tcpServer;
00236 return false;
00237 }
00238
00239 address = "tcp://127.0.0.1:" + QString::number(tcpServer->serverPort());
00240 connect(tcpServer, SIGNAL(newConnection()), SIGNAL(newConnection()));
00241 }
00242
00243 state = Listening;
00244 return true;
00245 }
00246
00247 bool SocketConnectionBackend::waitForIncomingTask(int ms)
00248 {
00249 Q_ASSERT(state == Connected);
00250 Q_ASSERT(socket);
00251 if (socket->state() != QAbstractSocket::ConnectedState) {
00252 state = Idle;
00253 return false;
00254 }
00255
00256 signalEmitted = false;
00257 if (socket->bytesAvailable())
00258 socketReadyRead();
00259 if (signalEmitted)
00260 return true;
00261
00262
00263 QTime timer;
00264 timer.start();
00265
00266 while (socket->state() == QAbstractSocket::ConnectedState && !signalEmitted &&
00267 (ms == -1 || timer.elapsed() < ms))
00268 if (!socket->waitForReadyRead(ms == -1 ? -1 : ms - timer.elapsed()))
00269 break;
00270
00271 if (signalEmitted)
00272 return true;
00273 if (socket->state() != QAbstractSocket::ConnectedState)
00274 state = Idle;
00275 return false;
00276 }
00277
00278 bool SocketConnectionBackend::sendCommand(const Task &task)
00279 {
00280 Q_ASSERT(state == Connected);
00281 Q_ASSERT(socket);
00282
00283 static char buffer[HeaderSize + 2];
00284 sprintf(buffer, "%6x_%2x_", task.data.size(), task.cmd);
00285 socket->write(buffer, HeaderSize);
00286 socket->write(task.data);
00287
00288
00289
00290
00291
00292
00293 while (socket->bytesToWrite() > 0 && socket->state() == QAbstractSocket::ConnectedState)
00294 socket->waitForBytesWritten(-1);
00295
00296 return socket->state() == QAbstractSocket::ConnectedState;
00297 }
00298
00299 AbstractConnectionBackend *SocketConnectionBackend::nextPendingConnection()
00300 {
00301 Q_ASSERT(state == Listening);
00302 Q_ASSERT(localServer || tcpServer);
00303 Q_ASSERT(!socket);
00304
00305
00306
00307 QTcpSocket *newSocket;
00308 if (mode == LocalSocketMode)
00309 newSocket = localServer->nextPendingConnection();
00310 else
00311 newSocket = tcpServer->nextPendingConnection();
00312 if (!newSocket)
00313 return 0;
00314
00315 SocketConnectionBackend *result = new SocketConnectionBackend(Mode(mode));
00316 result->state = Connected;
00317 result->socket = newSocket;
00318 newSocket->setParent(result);
00319 connect(newSocket, SIGNAL(readyRead()), result, SLOT(socketReadyRead()));
00320 connect(newSocket, SIGNAL(disconnected()), result, SLOT(socketDisconnected()));
00321
00322 return result;
00323 }
00324
00325 void SocketConnectionBackend::socketReadyRead()
00326 {
00327 bool shouldReadAnother;
00328 do {
00329 if (!socket)
00330
00331 return;
00332
00333
00334 if (len == -1) {
00335
00336 static char buffer[HeaderSize];
00337
00338 if (socket->bytesAvailable() < HeaderSize) {
00339 return;
00340 }
00341
00342 socket->read(buffer, sizeof buffer);
00343 buffer[6] = 0;
00344 buffer[9] = 0;
00345
00346 char *p = buffer;
00347 while( *p == ' ' ) p++;
00348 len = strtol( p, 0L, 16 );
00349
00350 p = buffer + 7;
00351 while( *p == ' ' ) p++;
00352 cmd = strtol( p, 0L, 16 );
00353
00354
00355
00356 }
00357
00358 QPointer<SocketConnectionBackend> that = this;
00359
00360
00361 if (socket->bytesAvailable() >= len) {
00362 Task task;
00363 task.cmd = cmd;
00364 if (len)
00365 task.data = socket->read(len);
00366 len = -1;
00367
00368 signalEmitted = true;
00369 emit commandReceived(task);
00370 } else if (len > StandardBufferSize) {
00371 kDebug(7017) << this << "Jumbo packet of" << len << "bytes";
00372 socket->setReadBufferSize(len + 1);
00373 }
00374
00375
00376 if (that.isNull())
00377 return;
00378
00379
00380 if (len == -1)
00381 shouldReadAnother = socket->bytesAvailable() >= HeaderSize;
00382 else
00383 shouldReadAnother = socket->bytesAvailable() >= len;
00384 }
00385 while (shouldReadAnother);
00386 }
00387
00388 Connection::Connection(QObject *parent)
00389 : QObject(parent), d(new ConnectionPrivate)
00390 {
00391 d->q = this;
00392 }
00393
00394 Connection::~Connection()
00395 {
00396 close();
00397 delete d;
00398 }
00399
00400 void Connection::suspend()
00401 {
00402
00403 d->suspended = true;
00404 if (d->backend)
00405 d->backend->setSuspended(true);
00406 }
00407
00408 void Connection::resume()
00409 {
00410
00411 QMetaObject::invokeMethod(this, "dequeue", Qt::QueuedConnection);
00412
00413
00414 d->suspended = false;
00415 if (d->backend)
00416 d->backend->setSuspended(false);
00417 }
00418
00419 void Connection::close()
00420 {
00421 if (d->backend) {
00422 d->backend->disconnect(this);
00423 d->backend->deleteLater();
00424 d->backend = 0;
00425 }
00426 d->outgoingTasks.clear();
00427 d->incomingTasks.clear();
00428 }
00429
00430 bool Connection::isConnected() const
00431 {
00432 return d->backend && d->backend->state == AbstractConnectionBackend::Connected;
00433 }
00434
00435 bool Connection::inited() const
00436 {
00437 return d->backend;
00438 }
00439
00440 bool Connection::suspended() const
00441 {
00442 return d->suspended;
00443 }
00444
00445 void Connection::connectToRemote(const QString &address)
00446 {
00447
00448 KUrl url = address;
00449 QString scheme = url.protocol();
00450
00451 if (scheme == QLatin1String("local")) {
00452 d->setBackend(new SocketConnectionBackend(SocketConnectionBackend::LocalSocketMode, this));
00453 } else if (scheme == QLatin1String("tcp")) {
00454 d->setBackend(new SocketConnectionBackend(SocketConnectionBackend::TcpSocketMode, this));
00455 } else {
00456 kWarning(7017) << "Unknown requested KIO::Connection protocol='" << scheme
00457 << "' (" << address << ")";
00458 Q_ASSERT(0);
00459 return;
00460 }
00461
00462
00463 if (!d->backend->connectToRemote(url)) {
00464
00465 delete d->backend;
00466 d->backend = 0;
00467 return;
00468 }
00469
00470 d->dequeue();
00471 }
00472
00473 QString Connection::errorString() const
00474 {
00475 if (d->backend)
00476 return d->backend->errorString;
00477 return QString();
00478 }
00479
00480 bool Connection::send(int cmd, const QByteArray& data)
00481 {
00482 if (!inited() || !d->outgoingTasks.isEmpty()) {
00483 Task task;
00484 task.cmd = cmd;
00485 task.data = data;
00486 d->outgoingTasks.enqueue(task);
00487 return true;
00488 } else {
00489 return sendnow(cmd, data);
00490 }
00491 }
00492
00493 bool Connection::sendnow(int _cmd, const QByteArray &data)
00494 {
00495 if (data.size() > 0xffffff)
00496 return false;
00497
00498 if (!isConnected())
00499 return false;
00500
00501
00502 Task task;
00503 task.cmd = _cmd;
00504 task.data = data;
00505 return d->backend->sendCommand(task);
00506 }
00507
00508 bool Connection::hasTaskAvailable() const
00509 {
00510 return !d->incomingTasks.isEmpty();
00511 }
00512
00513 bool Connection::waitForIncomingTask(int ms)
00514 {
00515 if (!isConnected())
00516 return false;
00517
00518 if (d->backend)
00519 return d->backend->waitForIncomingTask(ms);
00520 return false;
00521 }
00522
00523 int Connection::read( int* _cmd, QByteArray &data )
00524 {
00525
00526 if (d->incomingTasks.isEmpty()) {
00527
00528 return -1;
00529 }
00530 const Task task = d->incomingTasks.dequeue();
00531
00532
00533 *_cmd = task.cmd;
00534 data = task.data;
00535
00536
00537 if (!d->suspended && !d->incomingTasks.isEmpty())
00538 QMetaObject::invokeMethod(this, "dequeue", Qt::QueuedConnection);
00539
00540 return data.size();
00541 }
00542
00543 ConnectionServer::ConnectionServer(QObject *parent)
00544 : QObject(parent), d(new ConnectionServerPrivate)
00545 {
00546 d->q = this;
00547 }
00548
00549 ConnectionServer::~ConnectionServer()
00550 {
00551 delete d;
00552 }
00553
00554 void ConnectionServer::listenForRemote()
00555 {
00556 #ifdef Q_WS_WIN
00557 d->backend = new SocketConnectionBackend(SocketConnectionBackend::TcpSocketMode, this);
00558 #else
00559 d->backend = new SocketConnectionBackend(SocketConnectionBackend::LocalSocketMode, this);
00560 #endif
00561 if (!d->backend->listenForRemote()) {
00562 delete d->backend;
00563 d->backend = 0;
00564 return;
00565 }
00566
00567 connect(d->backend, SIGNAL(newConnection()), SIGNAL(newConnection()));
00568 kDebug(7017) << "Listening on " << d->backend->address;
00569 }
00570
00571 QString ConnectionServer::address() const
00572 {
00573 if (d->backend)
00574 return d->backend->address;
00575 return QString();
00576 }
00577
00578 bool ConnectionServer::isListening() const
00579 {
00580 return d->backend && d->backend->state == AbstractConnectionBackend::Listening;
00581 }
00582
00583 void ConnectionServer::close()
00584 {
00585 delete d->backend;
00586 d->backend = 0;
00587 }
00588
00589 Connection *ConnectionServer::nextPendingConnection()
00590 {
00591 if (!isListening())
00592 return 0;
00593
00594 AbstractConnectionBackend *newBackend = d->backend->nextPendingConnection();
00595 if (!newBackend)
00596 return 0;
00597
00598 Connection *result = new Connection;
00599 result->d->setBackend(newBackend);
00600 newBackend->setParent(result);
00601
00602 return result;
00603 }
00604
00605 void ConnectionServer::setNextPendingConnection(Connection *conn)
00606 {
00607 AbstractConnectionBackend *newBackend = d->backend->nextPendingConnection();
00608 Q_ASSERT(newBackend);
00609
00610 conn->d->backend = newBackend;
00611 conn->d->setBackend(newBackend);
00612 newBackend->setParent(conn);
00613
00614 conn->d->dequeue();
00615 }
00616
00617 #include "connection_p.moc"
00618 #include "connection.moc"