11#include "scheduler_p.h"
15#include "workerconfig.h"
17#include <kprotocolinfo.h>
18#include <kprotocolmanager.h>
21#include <QDBusConnection>
22#include <QDBusMessage>
26#include <QThreadStorage>
29static const int s_idleWorkerLifetime = 3 * 60;
33static inline Worker *jobSWorker(
SimpleJob *job)
35 return SimpleJobPrivate::get(job)->m_worker;
38static inline int jobCommand(
SimpleJob *job)
40 return SimpleJobPrivate::get(job)->m_command;
43static inline void startJob(
SimpleJob *job, Worker *worker)
45 SimpleJobPrivate::get(job)->start(worker);
48class KIO::SchedulerPrivate
61 qDeleteAll(m_protocols);
64 SchedulerPrivate(
const SchedulerPrivate &) =
delete;
65 SchedulerPrivate &operator=(
const SchedulerPrivate &) =
delete;
69 Worker *m_workerOnHold =
nullptr;
71 bool m_ignoreConfigReparse =
false;
74 void setJobPriority(
SimpleJob *job,
int priority);
78 void removeWorkerOnHold();
80 bool isWorkerOnHoldFor(
const QUrl &url);
81 void updateInternalMetaData(
SimpleJob *job);
84 void setupWorker(KIO::Worker *worker,
const QUrl &url,
const QString &protocol,
bool newWorker,
const KIO::MetaData *config =
nullptr);
86 void slotWorkerDied(KIO::Worker *worker);
99static SchedulerPrivate *schedulerPrivate()
101 if (!s_storage.hasLocalData()) {
102 s_storage.setLocalData(
new SchedulerPrivate);
104 return s_storage.localData();
107Scheduler *Scheduler::self()
109 return schedulerPrivate()->q;
112SchedulerPrivate *Scheduler::d_func()
114 return schedulerPrivate();
118Scheduler *scheduler()
120 return schedulerPrivate()->q;
125int SerialPicker::changedPrioritySerial(
int oldSerial,
int newPriority)
const
127 Q_ASSERT(newPriority >= -10 && newPriority <= 10);
128 newPriority = qBound(-10, newPriority, 10);
129 int unbiasedSerial = oldSerial % m_jobsPerPriority;
130 return unbiasedSerial + newPriority * m_jobsPerPriority;
133WorkerManager::WorkerManager()
135 m_grimTimer.setSingleShot(
true);
139WorkerManager::~WorkerManager()
144void WorkerManager::returnWorker(Worker *worker)
148 m_idleWorkers.insert(worker->host(), worker);
149 scheduleGrimReaper();
152Worker *WorkerManager::takeWorkerForJob(
SimpleJob *job)
154 Worker *worker = schedulerPrivate()->heldWorkerForJob(job);
159 QUrl url = SimpleJobPrivate::get(job)->m_url;
162 if (it == m_idleWorkers.end()) {
163 it = m_idleWorkers.
begin();
165 if (it == m_idleWorkers.end()) {
169 m_idleWorkers.erase(it);
173bool WorkerManager::removeWorker(Worker *worker)
177 for (; it != m_idleWorkers.
end(); ++it) {
178 if (it.
value() == worker) {
179 m_idleWorkers.erase(it);
186void WorkerManager::clear()
188 m_idleWorkers.clear();
193 return m_idleWorkers.values();
196void WorkerManager::scheduleGrimReaper()
198 if (!m_grimTimer.isActive()) {
199 m_grimTimer.start((s_idleWorkerLifetime / 2) * 1000);
204void WorkerManager::grimReaper()
207 while (it != m_idleWorkers.end()) {
208 Worker *worker = it.
value();
209 if (worker->idleTime() >= s_idleWorkerLifetime) {
210 it = m_idleWorkers.erase(it);
220 if (!m_idleWorkers.isEmpty()) {
221 scheduleGrimReaper();
225int HostQueue::lowestSerial()
const
228 if (first != m_queuedJobs.constEnd()) {
231 return SerialPicker::maxSerial;
236 const int serial = SimpleJobPrivate::get(job)->m_schedSerial;
237 Q_ASSERT(serial != 0);
238 Q_ASSERT(!m_queuedJobs.contains(serial));
239 Q_ASSERT(!m_runningJobs.contains(job));
240 m_queuedJobs.insert(serial, job);
245 Q_ASSERT(!m_queuedJobs.isEmpty());
248 m_queuedJobs.erase(first);
249 m_runningJobs.insert(job);
255 const int serial = SimpleJobPrivate::get(job)->m_schedSerial;
256 if (m_runningJobs.remove(job)) {
257 Q_ASSERT(!m_queuedJobs.contains(serial));
260 if (m_queuedJobs.remove(serial)) {
269 ret.
reserve(m_runningJobs.size());
271 Worker *worker = jobSWorker(job);
280 Q_UNUSED(queuesBySerial);
281#ifdef SCHEDULER_DEBUG
284 auto it = queuesBySerial->
cbegin();
285 for (; it != queuesBySerial->
cend(); ++it) {
295 Q_UNUSED(runningJobsCount);
296#ifdef SCHEDULER_DEBUG
297 int realRunningJobsCount = 0;
298 auto it = queues->
cbegin();
299 for (; it != queues->
cend(); ++it) {
300 realRunningJobsCount += it.
value().runningJobsCount();
302 Q_ASSERT(realRunningJobsCount == runningJobsCount);
306 auto it2 = queues->
cbegin();
307 for (; it2 != queues->
cend(); ++it2) {
308 for (
SimpleJob *job : it2.value().runningJobs()) {
316ProtoQueue::ProtoQueue(
int maxWorkers,
int maxWorkersPerHost)
317 : m_maxConnectionsPerHost(maxWorkersPerHost ? maxWorkersPerHost : maxWorkers)
318 , m_maxConnectionsTotal(qMax(maxWorkers, maxWorkersPerHost))
319 , m_runningJobsCount(0)
324 Q_ASSERT(m_maxConnectionsPerHost >= 1);
325 Q_ASSERT(maxWorkers >= maxWorkersPerHost);
326 m_startJobTimer.setSingleShot(
true);
330ProtoQueue::~ProtoQueue()
335 m_workerManager.clear();
336 for (Worker *worker : workers) {
345 HostQueue &hq = m_queuesByHostname[
hostname];
346 const int prevLowestSerial = hq.lowestSerial();
347 Q_ASSERT(hq.runningJobsCount() <= m_maxConnectionsPerHost);
350 Q_ASSERT(SimpleJobPrivate::get(job)->m_schedSerial == 0);
351 SimpleJobPrivate::get(job)->m_schedSerial = m_serialPicker.next();
353 const bool wasQueueEmpty = hq.isQueueEmpty();
358 if (prevLowestSerial != hq.lowestSerial()) {
359 if (hq.runningJobsCount() < m_maxConnectionsPerHost) {
361 if (m_queuesBySerial.remove(prevLowestSerial) == 0) {
362 Q_UNUSED(wasQueueEmpty);
363 Q_ASSERT(wasQueueEmpty);
365 m_queuesBySerial.insert(hq.lowestSerial(), &hq);
367#ifdef SCHEDULER_DEBUG
371 Q_ASSERT(!m_queuesBySerial.contains(prevLowestSerial));
376 m_startJobTimer.start();
378 ensureNoDuplicates(&m_queuesBySerial);
381void ProtoQueue::changeJobPriority(
SimpleJob *job,
int newPrio)
383 SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(job);
385 if (it == m_queuesByHostname.end()) {
388 HostQueue &hq = it.
value();
389 const int prevLowestSerial = hq.lowestSerial();
390 if (hq.isJobRunning(job) || !hq.removeJob(job)) {
393 jobPriv->m_schedSerial = m_serialPicker.changedPrioritySerial(jobPriv->m_schedSerial, newPrio);
395 const bool needReinsert = hq.lowestSerial() != prevLowestSerial;
398 if (needReinsert && m_queuesBySerial.remove(prevLowestSerial)) {
399 m_queuesBySerial.insert(hq.lowestSerial(), &hq);
401 ensureNoDuplicates(&m_queuesBySerial);
404void ProtoQueue::removeJob(
SimpleJob *job)
406 SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(job);
407 HostQueue &hq = m_queuesByHostname[jobPriv->m_url.host()];
408 const int prevLowestSerial = hq.lowestSerial();
409 const int prevRunningJobs = hq.runningJobsCount();
411 Q_ASSERT(hq.runningJobsCount() <= m_maxConnectionsPerHost);
413 if (hq.removeJob(job)) {
414 if (hq.lowestSerial() != prevLowestSerial) {
416 Q_ASSERT(!jobPriv->m_worker);
417 Q_ASSERT(prevRunningJobs == hq.runningJobsCount());
418 if (m_queuesBySerial.remove(prevLowestSerial) == 0) {
420 Q_ASSERT(hq.runningJobsCount() == m_maxConnectionsPerHost);
423 if (prevRunningJobs != hq.runningJobsCount()) {
425 Q_ASSERT(prevRunningJobs - 1 == hq.runningJobsCount());
426 m_runningJobsCount--;
427 Q_ASSERT(m_runningJobsCount >= 0);
430 if (!hq.isQueueEmpty() && hq.runningJobsCount() < m_maxConnectionsPerHost) {
432 m_queuesBySerial.insert(hq.lowestSerial(), &hq);
437 m_queuesByHostname.remove(jobPriv->m_url.host());
440 if (jobPriv->m_worker && jobPriv->m_worker->isAlive()) {
441 m_workerManager.returnWorker(jobPriv->m_worker);
444 m_startJobTimer.start();
447 ensureNoDuplicates(&m_queuesBySerial);
454 Worker *worker = Worker::createWorker(protocol, url, error, errortext);
456 connect(worker, &Worker::workerDied, scheduler(), [](KIO::Worker *worker) {
457 schedulerPrivate()->slotWorkerDied(worker);
460 qCWarning(KIO_CORE) <<
"couldn't create worker:" << errortext;
468bool ProtoQueue::removeWorker(KIO::Worker *worker)
470 const bool removed = m_workerManager.removeWorker(worker);
477 auto it = m_queuesByHostname.
cbegin();
478 for (; it != m_queuesByHostname.
cend(); ++it) {
486void ProtoQueue::startAJob()
488 ensureNoDuplicates(&m_queuesBySerial);
489 verifyRunningJobsCount(&m_queuesByHostname, m_runningJobsCount);
491#ifdef SCHEDULER_DEBUG
493 auto it = m_queuesByHostname.
cbegin();
494 for (; it != m_queuesByHostname.
cend(); ++it) {
501 if (m_runningJobsCount >= m_maxConnectionsTotal) {
502#ifdef SCHEDULER_DEBUG
509 if (first != m_queuesBySerial.end()) {
511 HostQueue *hq = first.value();
512 const int prevLowestSerial = first.key();
513 Q_UNUSED(prevLowestSerial);
514 Q_ASSERT(hq->lowestSerial() == prevLowestSerial);
517 Q_ASSERT(hq->runningJobsCount() < m_maxConnectionsPerHost);
518 SimpleJob *startingJob = hq->takeFirstInQueue();
519 Q_ASSERT(hq->runningJobsCount() <= m_maxConnectionsPerHost);
520 Q_ASSERT(hq->lowestSerial() != prevLowestSerial);
522 m_queuesBySerial.erase(first);
525 if (!hq->isQueueEmpty() && hq->runningJobsCount() < m_maxConnectionsPerHost) {
526 m_queuesBySerial.insert(hq->lowestSerial(), hq);
534 m_runningJobsCount++;
536 bool isNewWorker =
false;
537 Worker *worker = m_workerManager.takeWorkerForJob(startingJob);
538 SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(startingJob);
541 worker = createWorker(jobPriv->m_protocol, startingJob, jobPriv->m_url);
545 jobPriv->m_worker = worker;
546 schedulerPrivate()->setupWorker(worker, jobPriv->m_url, jobPriv->m_protocol, isNewWorker);
547 startJob(startingJob, worker);
553 if (jobPriv->m_schedSerial) {
554 removeJob(startingJob);
555 jobPriv->m_schedSerial = 0;
559#ifdef SCHEDULER_DEBUG
564 if (!m_queuesBySerial.isEmpty()) {
565 m_startJobTimer.start();
569Scheduler::Scheduler()
574 const QString dbusPath = QStringLiteral(
"/KIO/Scheduler");
575 const QString dbusInterface = QStringLiteral(
"org.kde.KIO.Scheduler");
583 QStringLiteral(
"reparseSlaveConfiguration"),
589Scheduler::~Scheduler()
595 schedulerPrivate()->doJob(job);
601 schedulerPrivate()->cancelJob(job);
604void Scheduler::jobFinished(
KIO::SimpleJob *job, KIO::Worker *worker)
606 schedulerPrivate()->jobFinished(job, worker);
611 schedulerPrivate()->putWorkerOnHold(job, url);
614void Scheduler::removeWorkerOnHold()
616 schedulerPrivate()->removeWorkerOnHold();
619bool Scheduler::isWorkerOnHoldFor(
const QUrl &url)
621 return schedulerPrivate()->isWorkerOnHoldFor(url);
624void Scheduler::updateInternalMetaData(
SimpleJob *job)
626 schedulerPrivate()->updateInternalMetaData(job);
629void Scheduler::emitReparseSlaveConfiguration()
637 schedulerPrivate()->m_ignoreConfigReparse =
true;
642void SchedulerPrivate::slotReparseSlaveConfiguration(
const QString &proto,
const QDBusMessage &)
644 if (m_ignoreConfigReparse) {
646 m_ignoreConfigReparse =
false;
652 WorkerConfig::self()->reset();
667 for (; it != endIt; ++it) {
669 for (Worker *worker :
list) {
670 worker->send(CMD_REPARSECONFIGURATION);
677void SchedulerPrivate::doJob(
SimpleJob *job)
680 KIO::SimpleJobPrivate *
const jobPriv = SimpleJobPrivate::get(job);
681 jobPriv->m_protocol = job->
url().
scheme();
683 ProtoQueue *proto = protoQ(jobPriv->m_protocol, job->
url().
host());
684 proto->queueJob(job);
687void SchedulerPrivate::setJobPriority(
SimpleJob *job,
int priority)
690 const QString protocol = SimpleJobPrivate::get(job)->m_protocol;
692 ProtoQueue *proto = protoQ(protocol, job->
url().
host());
693 proto->changeJobPriority(job, priority);
697void SchedulerPrivate::cancelJob(
SimpleJob *job)
699 KIO::SimpleJobPrivate *
const jobPriv = SimpleJobPrivate::get(job);
702 if (jobPriv->m_schedSerial == 0) {
706 Worker *worker = jobSWorker(job);
708 jobFinished(job, worker);
710 ProtoQueue *pq = m_protocols.
value(jobPriv->m_protocol);
712 pq->removeWorker(worker);
718void SchedulerPrivate::jobFinished(
SimpleJob *job, Worker *worker)
721 KIO::SimpleJobPrivate *
const jobPriv = SimpleJobPrivate::get(job);
724 Q_ASSERT(jobPriv->m_schedSerial);
726 ProtoQueue *pq = m_protocols.
value(jobPriv->m_protocol);
734 if (jobPriv->m_internalMetaData.count()) {
736 ProtoQueue *queue = m_protocols.
value(worker->protocol());
739 for (
auto *runningWorker : workers) {
740 if (worker->host() == runningWorker->host()) {
741 worker->setConfig(metaDataFor(worker->protocol(), job->
url()));
748 worker->setJob(
nullptr);
749 worker->disconnect(job);
751 jobPriv->m_schedSerial = 0;
752 jobPriv->m_worker =
nullptr;
755 jobPriv->m_internalMetaData.clear();
761 MetaData configData = WorkerConfig::self()->configData(protocol, host);
766void SchedulerPrivate::setupWorker(KIO::Worker *worker,
const QUrl &url,
const QString &protocol,
bool newWorker,
const KIO::MetaData *config)
768 int port = url.
port();
776 if (newWorker || worker->host() != host || worker->port() != port || worker->user() != user || worker->passwd() != passwd) {
777 MetaData configData = metaDataFor(protocol, url);
779 configData += *config;
782 worker->setConfig(configData);
783 worker->setProtocol(url.
scheme());
784 worker->setHost(host, port, user, passwd);
788void SchedulerPrivate::slotWorkerDied(KIO::Worker *worker)
792 Q_ASSERT(!worker->isAlive());
793 ProtoQueue *pq = m_protocols.
value(worker->protocol());
796 pq->removeJob(worker->job());
799 pq->removeWorker(worker);
801 if (worker == m_workerOnHold) {
802 m_workerOnHold =
nullptr;
806 worker->aboutToDelete();
807 worker->deleteLater();
812 Worker *worker = jobSWorker(job);
814 worker->disconnect(job);
817 worker->setJob(
nullptr);
818 SimpleJobPrivate::get(job)->m_worker =
nullptr;
820 if (m_workerOnHold) {
821 m_workerOnHold->kill();
823 m_workerOnHold = worker;
825 m_workerOnHold->suspend();
828bool SchedulerPrivate::isWorkerOnHoldFor(
const QUrl &url)
830 if (url.
isValid() && m_urlOnHold.
isValid() && url == m_urlOnHold) {
837Worker *SchedulerPrivate::heldWorkerForJob(
SimpleJob *job)
839 Worker *worker =
nullptr;
840 KIO::SimpleJobPrivate *
const jobPriv = SimpleJobPrivate::get(job);
842 if (m_workerOnHold) {
844 const int cmd = jobPriv->m_command;
845 bool canJobReuse = (cmd == CMD_GET);
848 canJobReuse = (canJobReuse || cmd == CMD_SPECIAL);
851 const QString resume = outgoing.
value(QStringLiteral(
"resume"));
852 const QString rangeStart = outgoing.
value(QStringLiteral(
"range-start"));
858 if (job->
url() == m_urlOnHold) {
861 worker = m_workerOnHold;
864 m_workerOnHold->kill();
866 m_workerOnHold =
nullptr;
874void SchedulerPrivate::removeWorkerOnHold()
877 if (m_workerOnHold) {
878 m_workerOnHold->kill();
880 m_workerOnHold =
nullptr;
884ProtoQueue *SchedulerPrivate::protoQ(
const QString &protocol,
const QString &host)
886 ProtoQueue *pq = m_protocols.
value(protocol,
nullptr);
891 int maxWorkersPerHost = -1;
894 const int value = WorkerConfig::self()->configData(protocol, host, QStringLiteral(
"MaxConnections")).toInt(&ok);
896 maxWorkersPerHost = value;
899 if (maxWorkersPerHost == -1) {
903 pq =
new ProtoQueue(maxWorkers, qMin(maxWorkers, maxWorkersPerHost));
904 m_protocols.
insert(protocol, pq);
909void SchedulerPrivate::updateInternalMetaData(
SimpleJob *job)
911 KIO::SimpleJobPrivate *
const jobPriv = SimpleJobPrivate::get(job);
914 const QUrl jobUrl = job->
url();
920 while (it.hasNext()) {
923 WorkerConfig::self()->setConfigData(jobUrl.
scheme(), jobUrl.
host(), it.
key().mid(currHostToken.size()), it.
value());
925 WorkerConfig::self()->setConfigData(jobUrl.
scheme(),
QString(), it.
key().mid(allHostsToken.size()), it.
value());
930#include "moc_scheduler.cpp"
931#include "moc_scheduler_p.cpp"
A simple job (one url and one command).
void slotError(int, const QString &)
const QUrl & url() const
Returns the SimpleJob's URL.
The transfer job pumps data into and/or out of a KIO worker.
static int maxWorkersPerHost(const QString &protocol)
Returns the limit on the number of KIO workers for this protocol per host.
static int maxWorkers(const QString &protocol)
Returns the soft limit on the number of KIO workers for this protocol.
static void reparseConfiguration()
Force a reload of the general config file of KIO workers ( kioslaverc).
A namespace for KIO globals.
void error(QWidget *parent, const QString &text, const QString &title, const KGuiItem &buttonOk, Options options=Notify)
KIOCORE_EXPORT QStringList list(const QString &fileClass)
Returns a list of directories associated with this file-class.
NETWORKMANAGERQT_EXPORT QString hostname()
bool connect(const QString &service, const QString &path, const QString &interface, const QString &name, QObject *receiver, const char *slot)
QDBusConnection sessionBus()
const_iterator cbegin() const const
const_iterator cend() const const
const_iterator constBegin() const const
const_iterator constEnd() const const
const_iterator constFind(const Key &key) const const
iterator insert(const Key &key, const T &value)
Key key(const T &value) const const
T value(const Key &key) const const
void append(QList< T > &&value)
void reserve(qsizetype size)
const_iterator cbegin() const const
const_iterator cend() const const
const_iterator constBegin() const const
Key key(const T &value, const Key &defaultKey) const const
T value(const Key &key, const T &defaultValue) const const
iterator find(const Key &key, const T &value)
T value(const Key &key) const const
void setObjectName(QAnyStringView name)
bool contains(const QSet< T > &other) const const
iterator insert(const T &value)
bool isEmpty() const const
QFuture< ArgsType< Signal > > connect(Sender *sender, Signal signal)
QString host(ComponentFormattingOptions options) const const
bool isValid() const const
QString password(ComponentFormattingOptions options) const const
int port(int defaultPort) const const
QString scheme() const const
QString userName(ComponentFormattingOptions options) const const