10 #include "scheduler.h"
11 #include "scheduler_p.h"
13 #include "connection_p.h"
15 #include "sessiondata_p.h"
17 #include "workerconfig.h"
19 #include <kprotocolinfo.h>
20 #include <kprotocolmanager.h>
22 #ifndef KIO_ANDROID_STUB
23 #include <QDBusConnection>
24 #include <QDBusMessage>
28 #include <QThreadStorage>
31 static const int s_idleSlaveLifetime = 3 * 60;
35 static inline Slave *jobSlave(
SimpleJob *job)
40 static inline int jobCommand(
SimpleJob *job)
45 static inline void startJob(
SimpleJob *job, Slave *slave)
50 class KIO::SchedulerPrivate
63 qDeleteAll(m_protocols);
66 SchedulerPrivate(
const SchedulerPrivate &) =
delete;
67 SchedulerPrivate &operator=(
const SchedulerPrivate &) =
delete;
71 Slave *m_slaveOnHold =
nullptr;
73 bool m_ignoreConfigReparse =
false;
75 SessionData sessionData;
78 #if KIOCORE_BUILD_DEPRECATED_SINCE(4, 5)
81 void setJobPriority(
SimpleJob *job,
int priority);
85 void removeSlaveOnHold();
86 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 91)
89 bool disconnectSlave(KIO::Slave *slave);
92 bool isSlaveOnHoldFor(
const QUrl &url);
93 void updateInternalMetaData(
SimpleJob *job);
99 void slotSlaveDied(KIO::Slave *slave);
101 #ifndef KIO_ANDROID_STUB
105 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 91)
106 void slotSlaveConnected();
107 void slotSlaveError(
int error,
const QString &errorMsg);
117 static SchedulerPrivate *schedulerPrivate()
119 if (!s_storage.hasLocalData()) {
120 s_storage.setLocalData(
new SchedulerPrivate);
122 return s_storage.localData();
127 return schedulerPrivate()->q;
130 SchedulerPrivate *Scheduler::d_func()
132 return schedulerPrivate();
138 return schedulerPrivate()->q;
143 int SerialPicker::changedPrioritySerial(
int oldSerial,
int newPriority)
const
145 Q_ASSERT(newPriority >= -10 && newPriority <= 10);
146 newPriority = qBound(-10, newPriority, 10);
147 int unbiasedSerial = oldSerial % m_jobsPerPriority;
148 return unbiasedSerial + newPriority * m_jobsPerPriority;
151 SlaveKeeper::SlaveKeeper()
153 m_grimTimer.setSingleShot(
true);
154 connect(&m_grimTimer, &
QTimer::timeout,
this, &SlaveKeeper::grimReaper);
157 SlaveKeeper::~SlaveKeeper()
162 void SlaveKeeper::returnSlave(Slave *slave)
166 m_idleSlaves.insert(slave->host(), slave);
167 scheduleGrimReaper();
170 Slave *SlaveKeeper::takeSlaveForJob(
SimpleJob *job)
172 Slave *slave = schedulerPrivate()->heldSlaveForJob(job);
180 if (it == m_idleSlaves.end()) {
181 it = m_idleSlaves.
begin();
183 if (it == m_idleSlaves.end()) {
187 m_idleSlaves.erase(it);
191 bool SlaveKeeper::removeSlave(Slave *slave)
195 for (; it != m_idleSlaves.
end(); ++it) {
196 if (it.
value() == slave) {
197 m_idleSlaves.erase(it);
204 void SlaveKeeper::clear()
206 m_idleSlaves.clear();
211 return m_idleSlaves.values();
214 void SlaveKeeper::scheduleGrimReaper()
216 if (!m_grimTimer.isActive()) {
217 m_grimTimer.start((s_idleSlaveLifetime / 2) * 1000);
222 void SlaveKeeper::grimReaper()
225 while (it != m_idleSlaves.end()) {
226 Slave *slave = it.
value();
227 if (slave->idleTime() >= s_idleSlaveLifetime) {
228 it = m_idleSlaves.
erase(it);
238 if (!m_idleSlaves.isEmpty()) {
239 scheduleGrimReaper();
243 int HostQueue::lowestSerial()
const
246 if (first != m_queuedJobs.constEnd()) {
249 return SerialPicker::maxSerial;
255 Q_ASSERT(serial != 0);
256 Q_ASSERT(!m_queuedJobs.contains(serial));
257 Q_ASSERT(!m_runningJobs.contains(job));
258 m_queuedJobs.insert(serial, job);
263 Q_ASSERT(!m_queuedJobs.isEmpty());
266 m_queuedJobs.erase(first);
267 m_runningJobs.insert(job);
271 bool HostQueue::removeJob(
SimpleJob *job)
274 if (m_runningJobs.remove(job)) {
275 Q_ASSERT(!m_queuedJobs.contains(serial));
278 if (m_queuedJobs.remove(serial)) {
287 ret.
reserve(m_runningJobs.size());
289 Slave *slave = jobSlave(job);
296 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 91)
297 ConnectedSlaveQueue::ConnectedSlaveQueue()
299 m_startJobsTimer.setSingleShot(
true);
300 connect(&m_startJobsTimer, &
QTimer::timeout,
this, &ConnectedSlaveQueue::startRunnableJobs);
303 bool ConnectedSlaveQueue::queueJob(
SimpleJob *job, Slave *slave)
306 if (it == m_connectedSlaves.end()) {
311 PerSlaveQueue &jobs = it.
value();
312 jobs.waitingList.append(job);
313 if (!jobs.runningJob) {
315 m_runnableSlaves.insert(slave);
316 m_startJobsTimer.start();
321 bool ConnectedSlaveQueue::removeJob(
SimpleJob *job)
323 Slave *slave = jobSlave(job);
326 if (it == m_connectedSlaves.end()) {
329 PerSlaveQueue &jobs = it.
value();
330 if (jobs.runningJob || jobs.waitingList.isEmpty()) {
333 Q_ASSERT(!m_runnableSlaves.contains(slave));
336 const bool removedRunning = jobs.runningJob == job;
337 const bool removedWaiting = jobs.waitingList.removeAll(job) != 0;
338 if (removedRunning) {
339 jobs.runningJob =
nullptr;
340 Q_ASSERT(!removedWaiting);
342 const bool removedTheJob = removedRunning || removedWaiting;
344 if (!slave->isAlive()) {
346 return removedTheJob;
349 if (removedRunning && jobs.waitingList.count()) {
350 m_runnableSlaves.insert(slave);
351 m_startJobsTimer.start();
353 if (removedWaiting && jobs.waitingList.isEmpty()) {
354 m_runnableSlaves.remove(slave);
356 return removedTheJob;
359 void ConnectedSlaveQueue::addSlave(Slave *slave)
362 if (!m_connectedSlaves.contains(slave)) {
363 m_connectedSlaves.insert(slave, PerSlaveQueue());
367 bool ConnectedSlaveQueue::removeSlave(Slave *slave)
370 if (it == m_connectedSlaves.end()) {
373 PerSlaveQueue &jobs = it.
value();
383 m_connectedSlaves.erase(it);
384 m_runnableSlaves.remove(slave);
391 bool ConnectedSlaveQueue::isIdle(Slave *slave)
394 if (it == m_connectedSlaves.end()) {
397 return it.
value().runningJob ==
nullptr;
401 void ConnectedSlaveQueue::startRunnableJobs()
404 while (it != m_runnableSlaves.end()) {
406 if (!slave->isConnected()) {
408 m_startJobsTimer.start();
412 it = m_runnableSlaves.
erase(it);
413 PerSlaveQueue &jobs = m_connectedSlaves[slave];
414 SimpleJob *job = jobs.waitingList.takeFirst();
415 Q_ASSERT(!jobs.runningJob);
416 jobs.runningJob = job;
420 const int port = url.
port() == -1 ? 0 : url.
port();
424 slave->setConfig(configData);
425 slave->setProtocol(url.
scheme());
429 Q_ASSERT(slave->protocol() == url.
scheme());
430 Q_ASSERT(slave->host() == url.
host());
431 Q_ASSERT(slave->port() == port);
432 startJob(job, slave);
439 Q_UNUSED(queuesBySerial);
440 #ifdef SCHEDULER_DEBUG
443 auto it = queuesBySerial->
cbegin();
444 for (; it != queuesBySerial->
cend(); ++it) {
445 Q_ASSERT(!seen.
contains(it.value()));
454 Q_UNUSED(runningJobsCount);
455 #ifdef SCHEDULER_DEBUG
456 int realRunningJobsCount = 0;
457 auto it = queues->
cbegin();
458 for (; it != queues->
cend(); ++it) {
459 realRunningJobsCount += it.value().runningJobsCount();
461 Q_ASSERT(realRunningJobsCount == runningJobsCount);
465 auto it2 = queues->
cbegin();
466 for (; it2 != queues->
cend(); ++it2) {
467 for (
SimpleJob *job : it2.value().runningJobs()) {
475 ProtoQueue::ProtoQueue(
int maxWorkers,
int maxWorkersPerHost)
476 : m_maxConnectionsPerHost(maxWorkersPerHost ? maxWorkersPerHost : maxWorkers)
477 , m_maxConnectionsTotal(qMax(maxWorkers, maxWorkersPerHost))
478 , m_runningJobsCount(0)
483 Q_ASSERT(m_maxConnectionsPerHost >= 1);
484 Q_ASSERT(maxWorkers >= maxWorkersPerHost);
485 m_startJobTimer.setSingleShot(
true);
489 ProtoQueue::~ProtoQueue()
494 m_slaveKeeper.clear();
495 for (Slave *slave : slaves) {
501 void ProtoQueue::queueJob(
SimpleJob *job)
504 HostQueue &hq = m_queuesByHostname[
hostname];
505 const int prevLowestSerial = hq.lowestSerial();
506 Q_ASSERT(hq.runningJobsCount() <= m_maxConnectionsPerHost);
512 const bool wasQueueEmpty = hq.isQueueEmpty();
517 if (prevLowestSerial != hq.lowestSerial()) {
518 if (hq.runningJobsCount() < m_maxConnectionsPerHost) {
520 if (m_queuesBySerial.remove(prevLowestSerial) == 0) {
521 Q_UNUSED(wasQueueEmpty);
522 Q_ASSERT(wasQueueEmpty);
524 m_queuesBySerial.insert(hq.lowestSerial(), &hq);
526 #ifdef SCHEDULER_DEBUG
530 Q_ASSERT(!m_queuesBySerial.contains(prevLowestSerial));
535 m_startJobTimer.start();
537 ensureNoDuplicates(&m_queuesBySerial);
540 void ProtoQueue::changeJobPriority(
SimpleJob *job,
int newPrio)
544 if (it == m_queuesByHostname.end()) {
547 HostQueue &hq = it.
value();
548 const int prevLowestSerial = hq.lowestSerial();
549 if (hq.isJobRunning(job) || !hq.removeJob(job)) {
552 jobPriv->m_schedSerial = m_serialPicker.changedPrioritySerial(jobPriv->m_schedSerial, newPrio);
554 const bool needReinsert = hq.lowestSerial() != prevLowestSerial;
557 if (needReinsert && m_queuesBySerial.remove(prevLowestSerial)) {
558 m_queuesBySerial.insert(hq.lowestSerial(), &hq);
560 ensureNoDuplicates(&m_queuesBySerial);
563 void ProtoQueue::removeJob(
SimpleJob *job)
566 HostQueue &hq = m_queuesByHostname[jobPriv->m_url.host()];
567 const int prevLowestSerial = hq.lowestSerial();
568 const int prevRunningJobs = hq.runningJobsCount();
570 Q_ASSERT(hq.runningJobsCount() <= m_maxConnectionsPerHost);
572 if (hq.removeJob(job)) {
573 if (hq.lowestSerial() != prevLowestSerial) {
575 Q_ASSERT(!jobPriv->m_slave);
576 Q_ASSERT(prevRunningJobs == hq.runningJobsCount());
577 if (m_queuesBySerial.remove(prevLowestSerial) == 0) {
579 Q_ASSERT(hq.runningJobsCount() == m_maxConnectionsPerHost);
582 if (prevRunningJobs != hq.runningJobsCount()) {
584 Q_ASSERT(prevRunningJobs - 1 == hq.runningJobsCount());
585 m_runningJobsCount--;
586 Q_ASSERT(m_runningJobsCount >= 0);
589 if (!hq.isQueueEmpty() && hq.runningJobsCount() < m_maxConnectionsPerHost) {
591 m_queuesBySerial.insert(hq.lowestSerial(), &hq);
596 m_queuesByHostname.remove(jobPriv->m_url.host());
599 if (jobPriv->m_slave && jobPriv->m_slave->isAlive()) {
600 m_slaveKeeper.returnSlave(jobPriv->m_slave);
603 m_startJobTimer.start();
604 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 91)
609 const bool removed = m_connectedSlaveQueue.removeJob(job);
615 ensureNoDuplicates(&m_queuesBySerial);
622 Slave *slave = Slave::createSlave(protocol, url, error, errortext);
624 connect(slave, &Slave::slaveDied, scheduler(), [](KIO::Slave *slave) {
625 schedulerPrivate()->slotSlaveDied(slave);
628 qCWarning(KIO_CORE) <<
"couldn't create worker:" << errortext;
636 bool ProtoQueue::removeSlave(KIO::Slave *slave)
638 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 91)
639 const bool removedConnected = m_connectedSlaveQueue.removeSlave(slave);
640 const bool removedUnconnected = m_slaveKeeper.removeSlave(slave);
641 Q_ASSERT(!(removedConnected && removedUnconnected));
642 return removedConnected || removedUnconnected;
644 const bool removed = m_slaveKeeper.removeSlave(slave);
652 auto it = m_queuesByHostname.
cbegin();
653 for (; it != m_queuesByHostname.
cend(); ++it) {
656 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 91)
657 ret.
append(m_connectedSlaveQueue.allSlaves());
663 void ProtoQueue::startAJob()
665 ensureNoDuplicates(&m_queuesBySerial);
666 verifyRunningJobsCount(&m_queuesByHostname, m_runningJobsCount);
668 #ifdef SCHEDULER_DEBUG
670 auto it = m_queuesByHostname.
cbegin();
671 for (; it != m_queuesByHostname.
cend(); ++it) {
678 if (m_runningJobsCount >= m_maxConnectionsTotal) {
679 #ifdef SCHEDULER_DEBUG
686 if (first != m_queuesBySerial.end()) {
688 HostQueue *hq = first.value();
689 const int prevLowestSerial = first.key();
690 Q_UNUSED(prevLowestSerial);
691 Q_ASSERT(hq->lowestSerial() == prevLowestSerial);
694 Q_ASSERT(hq->runningJobsCount() < m_maxConnectionsPerHost);
695 SimpleJob *startingJob = hq->takeFirstInQueue();
696 Q_ASSERT(hq->runningJobsCount() <= m_maxConnectionsPerHost);
697 Q_ASSERT(hq->lowestSerial() != prevLowestSerial);
699 m_queuesBySerial.erase(first);
702 if (!hq->isQueueEmpty() && hq->runningJobsCount() < m_maxConnectionsPerHost) {
703 m_queuesBySerial.insert(hq->lowestSerial(), hq);
711 m_runningJobsCount++;
713 bool isNewSlave =
false;
714 Slave *slave = m_slaveKeeper.takeSlaveForJob(startingJob);
718 slave = createSlave(jobPriv->m_protocol, startingJob, jobPriv->m_url);
722 jobPriv->m_slave = slave;
723 schedulerPrivate()->setupSlave(slave, jobPriv->m_url, jobPriv->m_protocol, jobPriv->m_proxyList, isNewSlave);
724 startJob(startingJob, slave);
730 if (jobPriv->m_schedSerial) {
731 removeJob(startingJob);
732 jobPriv->m_schedSerial = 0;
736 #ifdef SCHEDULER_DEBUG
741 if (!m_queuesBySerial.isEmpty()) {
742 m_startJobTimer.start();
746 Scheduler::Scheduler()
750 #ifndef KIO_ANDROID_STUB
751 const QString dbusPath = QStringLiteral(
"/KIO/Scheduler");
752 const QString dbusInterface = QStringLiteral(
"org.kde.KIO.Scheduler");
760 QStringLiteral(
"reparseSlaveConfiguration"),
766 Scheduler::~Scheduler()
772 schedulerPrivate()->doJob(job);
775 #if KIOCORE_BUILD_DEPRECATED_SINCE(4, 5)
778 schedulerPrivate()->scheduleJob(job);
782 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 90)
785 schedulerPrivate()->setJobPriority(job, priority);
790 void Scheduler::setSimpleJobPriority(
SimpleJob *job,
int priority)
792 schedulerPrivate()->setJobPriority(job, priority);
797 schedulerPrivate()->cancelJob(job);
802 schedulerPrivate()->jobFinished(job, slave);
805 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 101)
808 schedulerPrivate()->putSlaveOnHold(job, url);
814 schedulerPrivate()->putSlaveOnHold(job, url);
817 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 101)
820 schedulerPrivate()->removeSlaveOnHold();
826 schedulerPrivate()->removeSlaveOnHold();
829 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 88)
835 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 101)
838 return schedulerPrivate()->isSlaveOnHoldFor(url);
844 return schedulerPrivate()->isSlaveOnHoldFor(url);
849 schedulerPrivate()->updateInternalMetaData(job);
852 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 91)
855 return schedulerPrivate()->getConnectedSlave(url, config);
860 return schedulerPrivate()->assignJobToSlave(slave, job);
865 return schedulerPrivate()->disconnectSlave(slave);
869 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 103)
876 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 103)
883 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 103)
884 bool Scheduler::disconnect(
const QObject *sender,
const char *signal,
const QObject *receiver,
const char *member)
890 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 103)
897 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 88)
903 void Scheduler::emitReparseSlaveConfiguration()
905 #ifndef KIO_ANDROID_STUB
911 schedulerPrivate()->m_ignoreConfigReparse =
true;
915 #ifndef KIO_ANDROID_STUB
916 void SchedulerPrivate::slotReparseSlaveConfiguration(
const QString &proto,
const QDBusMessage &)
918 if (m_ignoreConfigReparse) {
920 m_ignoreConfigReparse =
false;
926 WorkerConfig::self()->reset();
943 for (; it != endIt; ++it) {
945 for (Slave *slave : list) {
946 slave->send(CMD_REPARSECONFIGURATION);
953 void SchedulerPrivate::doJob(
SimpleJob *job)
957 jobPriv->m_proxyList.clear();
960 ProtoQueue *proto = protoQ(jobPriv->m_protocol, job->
url().
host());
961 proto->queueJob(job);
964 #if KIOCORE_BUILD_DEPRECATED_SINCE(4, 5)
965 void SchedulerPrivate::scheduleJob(
SimpleJob *job)
968 setJobPriority(job, 1);
972 void SchedulerPrivate::setJobPriority(
SimpleJob *job,
int priority)
977 ProtoQueue *proto = protoQ(protocol, job->
url().
host());
978 proto->changeJobPriority(job, priority);
982 void SchedulerPrivate::cancelJob(
SimpleJob *job)
987 if (jobPriv->m_schedSerial == 0) {
991 Slave *slave = jobSlave(job);
993 jobFinished(job, slave);
995 ProtoQueue *pq = m_protocols.value(jobPriv->m_protocol);
997 pq->removeSlave(slave);
1003 void SchedulerPrivate::jobFinished(
SimpleJob *job, Slave *slave)
1009 Q_ASSERT(jobPriv->m_schedSerial);
1011 ProtoQueue *pq = m_protocols.value(jobPriv->m_protocol);
1019 if (jobPriv->m_internalMetaData.count()) {
1021 ProtoQueue *queue = m_protocols.value(slave->protocol());
1024 for (
auto *runningSlave : slaves) {
1025 if (slave->host() == runningSlave->host()) {
1026 slave->setConfig(metaDataFor(slave->protocol(), jobPriv->m_proxyList, job->
url()));
1033 slave->setJob(
nullptr);
1034 slave->disconnect(job);
1036 jobPriv->m_schedSerial = 0;
1037 jobPriv->m_slave =
nullptr;
1040 jobPriv->m_internalMetaData.
clear();
1046 MetaData configData = WorkerConfig::self()->configData(protocol,
host);
1047 sessionData.configDataFor(configData, protocol,
host);
1049 configData.
remove(QStringLiteral(
"UseProxy"));
1050 configData.
remove(QStringLiteral(
"ProxyUrls"));
1052 configData[QStringLiteral(
"UseProxy")] = proxyList.
first();
1053 configData[QStringLiteral(
"ProxyUrls")] = proxyList.
join(
QLatin1Char(
','));
1062 configData[QStringLiteral(
"autoLoginUser")] = l.login;
1063 configData[QStringLiteral(
"autoLoginPass")] = l.password;
1067 for (; it != l.macdef.
constEnd(); ++it) {
1070 configData[QStringLiteral(
"autoLoginMacro")] = macdef;
1078 void SchedulerPrivate::setupSlave(KIO::Slave *slave,
1085 int port = url.
port();
1093 if (newSlave || slave->host() !=
host || slave->port() != port || slave->user() != user || slave->passwd() != passwd) {
1094 MetaData configData = metaDataFor(protocol, proxyList, url);
1099 slave->setConfig(configData);
1100 slave->setProtocol(url.
scheme());
1101 slave->setHost(
host, port, user, passwd);
1105 void SchedulerPrivate::slotSlaveDied(KIO::Slave *slave)
1109 Q_ASSERT(!slave->isAlive());
1110 ProtoQueue *pq = m_protocols.value(slave->protocol());
1113 pq->removeJob(slave->job());
1116 pq->removeSlave(slave);
1118 if (slave == m_slaveOnHold) {
1119 m_slaveOnHold =
nullptr;
1120 m_urlOnHold.clear();
1123 slave->aboutToDelete();
1124 slave->deleteLater();
1129 Slave *slave = jobSlave(job);
1131 slave->disconnect(job);
1134 slave->setJob(
nullptr);
1137 if (m_slaveOnHold) {
1138 m_slaveOnHold->kill();
1140 m_slaveOnHold = slave;
1142 m_slaveOnHold->suspend();
1145 bool SchedulerPrivate::isSlaveOnHoldFor(
const QUrl &url)
1147 if (url.
isValid() && m_urlOnHold.isValid() && url == m_urlOnHold) {
1154 Slave *SchedulerPrivate::heldSlaveForJob(
SimpleJob *job)
1156 Slave *slave =
nullptr;
1159 if (m_slaveOnHold) {
1161 const int cmd = jobPriv->m_command;
1162 bool canJobReuse = (cmd == CMD_GET || cmd == CMD_MULTI_GET);
1165 canJobReuse = (canJobReuse || cmd == CMD_SPECIAL);
1168 const QString resume = outgoing.
value(QStringLiteral(
"resume"));
1169 const QString rangeStart = outgoing.
value(QStringLiteral(
"range-start"));
1175 if (job->
url() == m_urlOnHold) {
1178 slave = m_slaveOnHold;
1181 m_slaveOnHold->kill();
1183 m_slaveOnHold =
nullptr;
1184 m_urlOnHold.clear();
1191 void SchedulerPrivate::removeSlaveOnHold()
1194 if (m_slaveOnHold) {
1195 m_slaveOnHold->kill();
1197 m_slaveOnHold =
nullptr;
1198 m_urlOnHold.clear();
1201 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 91)
1202 Slave *SchedulerPrivate::getConnectedSlave(
const QUrl &url,
const KIO::MetaData &config)
1206 ProtoQueue *pq = protoQ(protocol, url.
host());
1208 Slave *slave = pq->createSlave(protocol,
nullptr, url);
1210 setupSlave(slave, url, protocol, proxyList,
true, &config);
1211 pq->m_connectedSlaveQueue.addSlave(slave);
1213 slave->send(CMD_CONNECT);
1214 q->connect(slave, SIGNAL(connected()), SLOT(slotSlaveConnected()));
1221 void SchedulerPrivate::slotSlaveConnected()
1224 Slave *slave =
static_cast<Slave *
>(q->sender());
1225 slave->setConnected(
true);
1226 q->disconnect(slave, SIGNAL(connected()), q, SLOT(slotSlaveConnected()));
1227 Q_EMIT q->slaveConnected(slave);
1230 void SchedulerPrivate::slotSlaveError(
int errorNr,
const QString &errorMsg)
1232 Slave *slave =
static_cast<Slave *
>(q->sender());
1234 ProtoQueue *pq = protoQ(slave->protocol(), slave->host());
1235 if (!slave->isConnected() || pq->m_connectedSlaveQueue.isIdle(slave)) {
1238 Q_EMIT q->slaveError(slave, errorNr, errorMsg);
1245 ProtoQueue *pq = m_protocols.value(protocol,
nullptr);
1250 int maxWorkersPerHost = -1;
1251 if (!
host.isEmpty()) {
1253 const int value = WorkerConfig::self()->configData(protocol,
host, QStringLiteral(
"MaxConnections")).toInt(&ok);
1255 maxWorkersPerHost = value;
1258 if (maxWorkersPerHost == -1) {
1262 pq =
new ProtoQueue(maxWorkers, qMin(maxWorkers, maxWorkersPerHost));
1263 m_protocols.insert(protocol, pq);
1268 #if KIOCORE_BUILD_DEPRECATED_SINCE(5, 91)
1269 bool SchedulerPrivate::assignJobToSlave(KIO::Slave *slave,
SimpleJob *job)
1273 ProtoQueue *pq = m_protocols.value(slave->protocol());
1276 return pq->m_connectedSlaveQueue.queueJob(job, slave);
1281 bool SchedulerPrivate::disconnectSlave(KIO::Slave *slave)
1284 ProtoQueue *pq = m_protocols.value(slave->protocol());
1285 return (pq ? pq->m_connectedSlaveQueue.removeSlave(slave) :
false);
1289 void SchedulerPrivate::updateInternalMetaData(
SimpleJob *job)
1294 const QUrl jobUrl = job->
url();
1296 const QLatin1String currHostToken(
"{internal~currenthost}");
1300 while (it.hasNext()) {
1303 WorkerConfig::self()->setConfigData(jobUrl.
scheme(), jobUrl.
host(), it.
key().mid(currHostToken.size()), it.
value());
1305 WorkerConfig::self()->setConfigData(jobUrl.
scheme(),
QString(), it.
key().mid(allHostsToken.size()), it.
value());
1310 #include "moc_scheduler.cpp"
1311 #include "moc_scheduler_p.cpp"