00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include <errno.h>
00019 #include <fcntl.h>
00020 #include <pthread.h>
00021 #include <stdio.h>
00022 #include <signal.h>
00023 #include <stdlib.h>
00024 #include <unistd.h>
00025
00026 #include <klocale.h>
00027 #include <kextsock.h>
00028 #include <ksocks.h>
00029
00030 #include "knjobdata.h"
00031 #include "knprotocolclient.h"
00032
00033 #include <Q3StrList>
00034 #include <QByteArray>
00035
00036
00037 KNProtocolClient::KNProtocolClient(int NfdPipeIn, int NfdPipeOut) :
00038 job( 0 ),
00039 inputSize( 10000 ),
00040 fdPipeIn( NfdPipeIn ),
00041 fdPipeOut( NfdPipeOut ),
00042 tcpSocket( -1 ),
00043 mTerminate( false )
00044 {
00045 input = new char[inputSize];
00046 }
00047
00048
00049 KNProtocolClient::~KNProtocolClient()
00050 {
00051 if (isConnected())
00052 closeConnection();
00053 delete [] input;
00054 }
00055
00056
00057 void KNProtocolClient::run()
00058 {
00059 if (0!=pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL))
00060 qWarning("pthread_setcancelstate failed!");
00061 if (0!= pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,NULL))
00062 qWarning("pthread_setcanceltype failed!");
00063
00064 signal(SIGPIPE,SIG_IGN);
00065 waitForWork();
00066 }
00067
00068
00069 void KNProtocolClient::insertJob(KNJobData *newJob)
00070 {
00071 job = newJob;
00072 }
00073
00074
00075 void KNProtocolClient::removeJob()
00076 {
00077 job = 0L;
00078 }
00079
00080
00081 void KNProtocolClient::updatePercentage(int percent)
00082 {
00083 byteCountMode=false;
00084 progressValue = percent*10;
00085 sendSignal(TSprogressUpdate);
00086 }
00087
00088
00089
00090 void KNProtocolClient::waitForWork()
00091 {
00092 fd_set fdsR,fdsE;
00093 timeval tv;
00094 int selectRet;
00095
00096 int holdTime = 1000 * account.hold();
00097 while (true) {
00098 if (isConnected()) {
00099 FD_ZERO(&fdsR);
00100 FD_SET(fdPipeIn, &fdsR);
00101 FD_SET(tcpSocket, &fdsR);
00102 FD_ZERO(&fdsE);
00103 FD_SET(tcpSocket, &fdsE);
00104 tv.tv_sec = 0;
00105 tv.tv_usec = 1000;
00106 --holdTime;
00107 selectRet = KSocks::self()->select(FD_SETSIZE, &fdsR, NULL, &fdsE, &tv);
00108 if (selectRet == 0) {
00109 if (holdTime <= 0) {
00110 kDebug(5003) << "hold time elapsed, closing connection.";
00111 closeConnection();
00112 holdTime = 1000 * account.hold();
00113 } else {
00114 if ( mTerminate ) {
00115 closeConnection();
00116 return;
00117 }
00118 continue;
00119 }
00120 } else {
00121 if (((selectRet > 0)&&(!FD_ISSET(fdPipeIn,&fdsR)))||(selectRet == -1)) {
00122 kDebug(5003) << "connection broken, closing it";
00123 closeSocket();
00124 }
00125 }
00126 }
00127
00128 struct timeval timeout;
00129 do {
00130 timeout.tv_sec = 0;
00131 timeout.tv_usec = 1000;
00132 FD_ZERO(&fdsR);
00133 FD_SET(fdPipeIn, &fdsR);
00134 if (mTerminate)
00135 return;
00136 } while (select(FD_SETSIZE, &fdsR, NULL, NULL, &timeout) <= 0);
00137
00138 clearPipe();
00139
00140 timer.start();
00141
00142 sendSignal(TSjobStarted);
00143 if (job) {
00144
00145
00146 if (job->net()&&!(account == *job->account())) {
00147 account = *job->account();
00148 if (isConnected())
00149 closeConnection();
00150 }
00151
00152 input[0] = 0;
00153 thisLine = input;
00154 nextLine = input;
00155 inputEnd = input;
00156 progressValue = 10;
00157 predictedLines = -1;
00158 doneLines = 0;
00159 byteCount = 0;
00160 byteCountMode = true;
00161
00162 if (!job->net())
00163 processJob();
00164 else {
00165 if (!isConnected())
00166 openConnection();
00167
00168 if (isConnected())
00169 processJob();
00170 }
00171 errorPrefix.clear();
00172
00173 clearPipe();
00174 }
00175 sendSignal(TSworkDone);
00176 }
00177 }
00178
00179
00180 void KNProtocolClient::processJob()
00181 {}
00182
00183
00184
00185 bool KNProtocolClient::openConnection()
00186 {
00187 sendSignal(TSconnect);
00188
00189 kDebug(5003) << "opening connection";
00190
00191 if (account.server().isEmpty()) {
00192 job->setErrorString(i18n("Unable to resolve hostname"));
00193 return false;
00194 }
00195
00196 KExtendedSocket ks;
00197
00198 ks.setAddress(account.server(), account.port());
00199 ks.setTimeout(account.timeout());
00200 if (ks.connect() < 0) {
00201 if (ks.status() == IO_LookupError) {
00202 job->setErrorString(i18n("Unable to resolve hostname"));
00203 } else if (ks.status() == IO_ConnectError) {
00204 job->setErrorString(i18n("Unable to connect:\n%1", KExtendedSocket::strError(ks.status(), errno)));
00205 } else if (ks.status() == IO_TimeOutError)
00206 job->setErrorString(i18n("A delay occurred which exceeded the\ncurrent timeout limit."));
00207 else
00208 job->setErrorString(i18n("Unable to connect:\n%1", KExtendedSocket::strError(ks.status(), errno)));
00209
00210 closeSocket();
00211 return false;
00212 }
00213
00214 tcpSocket = ks.fd();
00215 ks.release();
00216
00217 return true;
00218 }
00219
00220
00221
00222 void KNProtocolClient::closeConnection()
00223 {
00224 fd_set fdsW;
00225 timeval tv;
00226
00227 kDebug(5003) << "closing connection";
00228
00229 FD_ZERO(&fdsW);
00230 FD_SET(tcpSocket, &fdsW);
00231 tv.tv_sec = 0;
00232 tv.tv_usec = 0;
00233 int ret = KSocks::self()->select(FD_SETSIZE, NULL, &fdsW, NULL, &tv);
00234
00235 if (ret > 0) {
00236 QByteArray cmd = "QUIT\r\n";
00237 int todo = cmd.length();
00238 KSocks::self()->write(tcpSocket,&cmd.data()[0],todo);
00239 }
00240 closeSocket();
00241 }
00242
00243
00244
00245 bool KNProtocolClient::sendCommand(const QByteArray &cmd, int &rep)
00246 {
00247 if (!sendStr(cmd + "\r\n"))
00248 return false;
00249 if (!getNextResponse(rep))
00250 return false;
00251 return true;
00252 }
00253
00254
00255
00256 bool KNProtocolClient::sendCommandWCheck(const QByteArray &cmd, int rep)
00257 {
00258 int code;
00259
00260 if (!sendCommand(cmd,code))
00261 return false;
00262 if (code!=rep) {
00263 handleErrors();
00264 return false;
00265 }
00266 return true;
00267 }
00268
00269
00270
00271 bool KNProtocolClient::sendMsg(const QByteArray &msg)
00272 {
00273 const char *line = msg.data();
00274 const char *end;
00275 QByteArray buffer;
00276 size_t length;
00277 char inter[10000];
00278
00279 progressValue = 100;
00280 predictedLines = msg.length()/80;
00281
00282 while ((end = ::strstr(line,"\r\n"))) {
00283 if (line[0]=='.')
00284 buffer.append(".");
00285 length = end-line+2;
00286 if ((buffer.length()>1)&&((buffer.length()+length)>1024)) {
00287 if (!sendStr(buffer))
00288 return false;
00289 buffer = "";
00290 }
00291 if (length > 9500) {
00292 job->setErrorString(i18n("Message size exceeded the size of the internal buffer."));
00293 closeSocket();
00294 return false;
00295 }
00296 memcpy(inter,line,length);
00297 inter[length]=0;
00298 buffer += inter;
00299 line = end+2;
00300 doneLines++;
00301 }
00302 buffer += ".\r\n";
00303 if (!sendStr(buffer))
00304 return false;
00305
00306 return true;
00307 }
00308
00309
00310
00311 bool KNProtocolClient::getNextLine()
00312 {
00313 thisLine = nextLine;
00314 nextLine = strstr(thisLine,"\r\n");
00315 if (nextLine) {
00316 nextLine[0] = 0;
00317 nextLine[1] = 0;
00318 nextLine+=2;
00319 return true;
00320 }
00321 unsigned int div = inputEnd-thisLine+1;
00322 memmove(input,thisLine,div);
00323 thisLine = input;
00324 inputEnd = input+div-1;
00325 do {
00326 div = inputEnd-thisLine+1;
00327 if ((div) > inputSize-100) {
00328 inputSize += 10000;
00329 char *newInput = new char[inputSize];
00330 memmove(newInput,input,div);
00331 delete [] input;
00332 input = newInput;
00333 thisLine = input;
00334 inputEnd = input+div-1;
00335 kDebug(5003) << "input buffer enlarged";
00336 }
00337 if (!waitForRead())
00338 return false;
00339
00340 int received;
00341 do {
00342 received = KSocks::self()->read(tcpSocket, inputEnd, inputSize-(inputEnd-input)-1);
00343 } while ((received<0)&&(errno==EINTR));
00344
00345 if (received <= 0) {
00346 job->setErrorString(i18n("The connection is broken."));
00347 closeSocket();
00348 return false;
00349 }
00350
00351
00352 for (int i=0; i<received; i++)
00353 if (inputEnd[i] == 0) {
00354 memmove(inputEnd+i,inputEnd+i+1,received-i-1);
00355 received--;
00356 i--;
00357 }
00358
00359 inputEnd += received;
00360 inputEnd[0] = 0;
00361
00362 byteCount += received;
00363
00364 } while (!(nextLine = strstr(thisLine,"\r\n")));
00365
00366 if (timer.elapsed()>50) {
00367 timer.start();
00368 if (predictedLines > 0)
00369 progressValue = 100 + (doneLines*900/predictedLines);
00370 sendSignal(TSprogressUpdate);
00371 }
00372
00373 nextLine[0] = 0;
00374 nextLine[1] = 0;
00375 nextLine+=2;
00376 return true;
00377 }
00378
00379
00380
00381 bool KNProtocolClient::getMsg(Q3StrList &msg)
00382 {
00383 char *line;
00384
00385 while (getNextLine()) {
00386 line = getCurrentLine();
00387 if (line[0]=='.') {
00388 if (line[1]=='.')
00389 line++;
00390 else
00391 if (line[1]==0)
00392 return true;
00393 }
00394 msg.append(line);
00395 doneLines++;
00396 }
00397
00398 return false;
00399 }
00400
00401
00402
00403 bool KNProtocolClient::getNextResponse(int &code)
00404 {
00405 if (!getNextLine())
00406 return false;
00407 code = -1;
00408 code = atoi(thisLine);
00409 return true;
00410 }
00411
00412
00413
00414 bool KNProtocolClient::checkNextResponse(int code)
00415 {
00416 if (!getNextLine())
00417 return false;
00418 if (atoi(thisLine)!=code) {
00419 handleErrors();
00420 return false;
00421 }
00422 return true;
00423 }
00424
00425
00426
00427
00428 void KNProtocolClient::handleErrors()
00429 {
00430 if (errorPrefix.isEmpty())
00431 job->setErrorString(i18n("An error occurred:\n%1", thisLine));
00432 else
00433 job->setErrorString(errorPrefix + thisLine);
00434
00435 closeConnection();
00436 }
00437
00438
00439 void KNProtocolClient::sendSignal(threadSignal s)
00440 {
00441 int signal=(int)s;
00442
00443 write(fdPipeOut, &signal, sizeof(int));
00444 }
00445
00446
00447
00448 bool KNProtocolClient::waitForRead()
00449 {
00450 fd_set fdsR,fdsE;
00451 timeval tv;
00452
00453 int ret;
00454 do {
00455 FD_ZERO(&fdsR);
00456 FD_SET(fdPipeIn, &fdsR);
00457 FD_SET(tcpSocket, &fdsR);
00458 FD_ZERO(&fdsE);
00459 FD_SET(tcpSocket, &fdsE);
00460 FD_SET(fdPipeIn, &fdsE);
00461 tv.tv_sec = account.timeout();
00462 tv.tv_usec = 0;
00463 ret = KSocks::self()->select(FD_SETSIZE, &fdsR, NULL, &fdsE, &tv);
00464 } while ((ret<0)&&(errno==EINTR));
00465
00466 if (ret == -1) {
00467 if (job) {
00468 QString str = i18n("Communication error:\n");
00469 str += strerror(errno);
00470 job->setErrorString(str);
00471 }
00472 closeSocket();
00473 return false;
00474 }
00475 if (ret == 0) {
00476 if (job)
00477 job->setErrorString(i18n("A delay occurred which exceeded the\ncurrent timeout limit."));
00478 closeConnection();
00479 return false;
00480 }
00481 if (ret > 0) {
00482 if (FD_ISSET(fdPipeIn,&fdsR)) {
00483 kDebug(5003) << "got stop signal";
00484 closeConnection();
00485 return false;
00486 }
00487 if (FD_ISSET(tcpSocket,&fdsE)||FD_ISSET(fdPipeIn,&fdsE)) {
00488 if (job)
00489 job->setErrorString(i18n("The connection is broken."));
00490 closeSocket();
00491 return false;
00492 }
00493 if (FD_ISSET(tcpSocket,&fdsR))
00494 return true;
00495 }
00496
00497 if (job)
00498 job->setErrorString(i18n("Communication error"));
00499 closeSocket();
00500 return false;
00501 }
00502
00503
00504
00505 bool KNProtocolClient::waitForWrite()
00506 {
00507 fd_set fdsR,fdsW,fdsE;
00508 timeval tv;
00509
00510 int ret;
00511 do {
00512 FD_ZERO(&fdsR);
00513 FD_SET(fdPipeIn, &fdsR);
00514 FD_SET(tcpSocket, &fdsR);
00515 FD_ZERO(&fdsW);
00516 FD_SET(tcpSocket, &fdsW);
00517 FD_ZERO(&fdsE);
00518 FD_SET(tcpSocket, &fdsE);
00519 FD_SET(fdPipeIn, &fdsE);
00520 tv.tv_sec = account.timeout();
00521 tv.tv_usec = 0;
00522 ret = KSocks::self()->select(FD_SETSIZE, &fdsR, &fdsW, &fdsE, &tv);
00523 } while ((ret<0)&&(errno==EINTR));
00524
00525
00526 if (ret == -1) {
00527 if (job) {
00528 QString str = i18n("Communication error:\n");
00529 str += strerror(errno);
00530 job->setErrorString(str);
00531 }
00532 closeSocket();
00533 return false;
00534 }
00535 if (ret == 0) {
00536 if (job)
00537 job->setErrorString(i18n("A delay occurred which exceeded the\ncurrent timeout limit."));
00538 closeConnection();
00539 return false;
00540 }
00541 if (ret > 0) {
00542 if (FD_ISSET(fdPipeIn,&fdsR)) {
00543 kDebug(5003) << "got stop signal";
00544 closeConnection();
00545 return false;
00546 }
00547 if (FD_ISSET(tcpSocket,&fdsR)||FD_ISSET(tcpSocket,&fdsE)||FD_ISSET(fdPipeIn,&fdsE)) {
00548 if (job)
00549 job->setErrorString(i18n("The connection is broken."));
00550 closeSocket();
00551 return false;
00552 }
00553 if (FD_ISSET(tcpSocket,&fdsW))
00554 return true;
00555 }
00556
00557 if (job)
00558 job->setErrorString(i18n("Communication error"));
00559 closeSocket();
00560 return false;
00561 }
00562
00563
00564 void KNProtocolClient::closeSocket()
00565 {
00566 if (-1 != tcpSocket) {
00567 close(tcpSocket);
00568 tcpSocket = -1;
00569 }
00570 }
00571
00572
00573
00574 bool KNProtocolClient::sendStr(const QByteArray &str)
00575 {
00576 int ret;
00577 int todo = str.length();
00578 int done = 0;
00579
00580 while (todo > 0) {
00581 if (!waitForWrite())
00582 return false;
00583 ret = KSocks::self()->write(tcpSocket,&str.data()[done],todo);
00584 if (ret <= 0) {
00585 if (job) {
00586 QString str = i18n("Communication error:\n");
00587 str += strerror(errno);
00588 job->setErrorString(str);
00589 }
00590 closeSocket();
00591 return false;
00592 } else {
00593 done += ret;
00594 todo -= ret;
00595 }
00596 byteCount += ret;
00597 }
00598 if (timer.elapsed()>50) {
00599 timer.start();
00600 if (predictedLines > 0)
00601 progressValue = 100 + (doneLines/predictedLines)*900;
00602 sendSignal(TSprogressUpdate);
00603 }
00604 return true;
00605 }
00606
00607
00608
00609 void KNProtocolClient::clearPipe()
00610 {
00611 fd_set fdsR;
00612 timeval tv;
00613 int selectRet;
00614 char buf;
00615
00616 tv.tv_sec = 0;
00617 tv.tv_usec = 0;
00618 do {
00619 FD_ZERO(&fdsR);
00620 FD_SET(fdPipeIn,&fdsR);
00621 if (1==(selectRet=select(FD_SETSIZE,&fdsR,NULL,NULL,&tv)))
00622 if ( read(fdPipeIn, &buf, 1 ) == -1 )
00623 ::perror( "clearPipe()" );
00624 } while (selectRet == 1);
00625 }
00626