kget
announcetask.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include "announcetask.h"
00021 #include <util/log.h>
00022 #include <torrent/globals.h>
00023 #include "node.h"
00024 #include "pack.h"
00025
00026 using namespace bt;
00027
00028 namespace dht
00029 {
00030
00031 AnnounceTask::AnnounceTask(Database* db,RPCServer* rpc, Node* node,const dht::Key & info_hash,bt::Uint16 port)
00032 : Task(rpc, node),info_hash(info_hash),port(port),db(db)
00033 {}
00034
00035
00036 AnnounceTask::~AnnounceTask()
00037 {}
00038
00039
00040 void AnnounceTask::callFinished(RPCCall* c, MsgBase* rsp)
00041 {
00042
00043
00044
00045 if (c->getMsgMethod() != dht::GET_PEERS)
00046 return;
00047
00048
00049 GetPeersRsp* gpr = dynamic_cast<GetPeersRsp*>(rsp);
00050 if (!gpr)
00051 return;
00052
00053 if (gpr->containsNodes())
00054 {
00055 const QByteArray & n = gpr->getNodes();
00056 Uint32 nval = n.size() / 26;
00057 for (Uint32 i = 0;i < nval;i++)
00058 {
00059
00060 KBucketEntry e = UnpackBucketEntry(n,i*26,4);
00061 if (!todo.contains(e) && !visited.contains(e) && todo.count() < 100)
00062 {
00063 todo.append(e);
00064 }
00065 }
00066
00067 for (PackedNodeContainer::CItr itr = gpr->begin();itr != gpr->end();itr++)
00068 {
00069 const QByteArray & ba = *itr;
00070 KBucketEntry e = UnpackBucketEntry(ba,0,6);
00071 if (!todo.contains(e) && !visited.contains(e) && todo.count() < 100)
00072 {
00073 todo.append(e);
00074 }
00075 }
00076 }
00077 else
00078 {
00079
00080 const DBItemList & items = gpr->getItemList();
00081 for (DBItemList::const_iterator i = items.begin();i != items.end();i++)
00082 {
00083 db->store(info_hash,*i);
00084
00085 returned_items.append(*i);
00086 }
00087
00088
00089 KBucketEntry e(rsp->getOrigin(),rsp->getID());
00090 if (!answered.contains(KBucketEntryAndToken(e,gpr->getToken())) && !answered_visited.contains(e))
00091 {
00092 answered.append(KBucketEntryAndToken(e,gpr->getToken()));
00093 }
00094
00095 emitDataReady();
00096 }
00097 }
00098
00099 void AnnounceTask::callTimeout(RPCCall* )
00100 {
00101
00102 }
00103
00104 void AnnounceTask::update()
00105 {
00106
00107
00108
00109
00110 while (!answered.empty() && canDoRequest())
00111 {
00112 KBucketEntryAndToken & e = answered.first();
00113 if (!answered_visited.contains(e))
00114 {
00115 AnnounceReq* anr = new AnnounceReq(node->getOurID(),info_hash,port,e.getToken());
00116 anr->setOrigin(e.getAddress());
00117 rpcCall(anr);
00118 answered_visited.append(e);
00119 }
00120 answered.pop_front();
00121 }
00122
00123
00124
00125 while (!todo.empty() && canDoRequest())
00126 {
00127 KBucketEntry e = todo.first();
00128
00129 if (!visited.contains(e))
00130 {
00131
00132 GetPeersReq* gpr = new GetPeersReq(node->getOurID(),info_hash);
00133 gpr->setOrigin(e.getAddress());
00134 rpcCall(gpr);
00135 visited.append(e);
00136 }
00137
00138 todo.pop_front();
00139 }
00140
00141 if (todo.empty() && answered.empty() && getNumOutstandingRequests() == 0 && !isFinished())
00142 {
00143 Out(SYS_DHT|LOG_NOTICE) << "DHT: AnnounceTask done" << endl;
00144 done();
00145 }
00146 else if (answered_visited.count() >= dht::K)
00147 {
00148
00149 Out(SYS_DHT|LOG_NOTICE) << "DHT: AnnounceTask done" << endl;
00150 done();
00151 }
00152 }
00153
00154 bool AnnounceTask::takeItem(DBItem & item)
00155 {
00156 if (returned_items.empty())
00157 return false;
00158
00159 item = returned_items.first();
00160 returned_items.pop_front();
00161 return true;
00162 }
00163 }