11#include "scheduler_p.h"
13#include "connection_p.h"
15#include "kprotocolmanager_p.h"
16#include "sessiondata_p.h"
18#include "workerconfig.h"
20#include <kprotocolinfo.h>
21#include <kprotocolmanager.h>
23#ifndef KIO_ANDROID_STUB
24#include <QDBusConnection>
25#include <QDBusMessage>
29#include <QThreadStorage>
32static const int s_idleWorkerLifetime = 3 * 60;
36static inline Worker *jobSWorker(
SimpleJob *job)
38 return SimpleJobPrivate::get(job)->m_worker;
41static inline int jobCommand(
SimpleJob *job)
43 return SimpleJobPrivate::get(job)->m_command;
46static inline void startJob(
SimpleJob *job, Worker *worker)
48 SimpleJobPrivate::get(job)->start(worker);
51class KIO::SchedulerPrivate
64 qDeleteAll(m_protocols);
67 SchedulerPrivate(
const SchedulerPrivate &) =
delete;
68 SchedulerPrivate &operator=(
const SchedulerPrivate &) =
delete;
72 Worker *m_workerOnHold =
nullptr;
74 bool m_ignoreConfigReparse =
false;
76 SessionData sessionData;
79 void setJobPriority(
SimpleJob *job,
int priority);
83 void removeWorkerOnHold();
85 bool isWorkerOnHoldFor(
const QUrl &url);
86 void updateInternalMetaData(
SimpleJob *job);
89 void setupWorker(KIO::Worker *worker,
96 void slotWorkerDied(KIO::Worker *worker);
98#ifndef KIO_ANDROID_STUB
109static SchedulerPrivate *schedulerPrivate()
111 if (!s_storage.hasLocalData()) {
112 s_storage.setLocalData(
new SchedulerPrivate);
114 return s_storage.localData();
117Scheduler *Scheduler::self()
119 return schedulerPrivate()->q;
122SchedulerPrivate *Scheduler::d_func()
124 return schedulerPrivate();
128Scheduler *scheduler()
130 return schedulerPrivate()->q;
135int SerialPicker::changedPrioritySerial(
int oldSerial,
int newPriority)
const
137 Q_ASSERT(newPriority >= -10 && newPriority <= 10);
138 newPriority = qBound(-10, newPriority, 10);
139 int unbiasedSerial = oldSerial % m_jobsPerPriority;
140 return unbiasedSerial + newPriority * m_jobsPerPriority;
143WorkerManager::WorkerManager()
145 m_grimTimer.setSingleShot(
true);
149WorkerManager::~WorkerManager()
154void WorkerManager::returnWorker(Worker *worker)
158 m_idleWorkers.insert(worker->host(), worker);
159 scheduleGrimReaper();
162Worker *WorkerManager::takeWorkerForJob(
SimpleJob *job)
164 Worker *worker = schedulerPrivate()->heldWorkerForJob(job);
169 QUrl url = SimpleJobPrivate::get(job)->m_url;
172 if (it == m_idleWorkers.end()) {
173 it = m_idleWorkers.
begin();
175 if (it == m_idleWorkers.end()) {
179 m_idleWorkers.erase(it);
183bool WorkerManager::removeWorker(Worker *worker)
187 for (; it != m_idleWorkers.
end(); ++it) {
188 if (it.
value() == worker) {
189 m_idleWorkers.erase(it);
196void WorkerManager::clear()
198 m_idleWorkers.clear();
203 return m_idleWorkers.values();
206void WorkerManager::scheduleGrimReaper()
208 if (!m_grimTimer.isActive()) {
209 m_grimTimer.start((s_idleWorkerLifetime / 2) * 1000);
214void WorkerManager::grimReaper()
217 while (it != m_idleWorkers.end()) {
218 Worker *worker = it.
value();
219 if (worker->idleTime() >= s_idleWorkerLifetime) {
220 it = m_idleWorkers.erase(it);
230 if (!m_idleWorkers.isEmpty()) {
231 scheduleGrimReaper();
235int HostQueue::lowestSerial()
const
238 if (first != m_queuedJobs.constEnd()) {
241 return SerialPicker::maxSerial;
246 const int serial = SimpleJobPrivate::get(job)->m_schedSerial;
247 Q_ASSERT(serial != 0);
248 Q_ASSERT(!m_queuedJobs.contains(serial));
249 Q_ASSERT(!m_runningJobs.contains(job));
250 m_queuedJobs.insert(serial, job);
255 Q_ASSERT(!m_queuedJobs.isEmpty());
258 m_queuedJobs.erase(first);
259 m_runningJobs.insert(job);
265 const int serial = SimpleJobPrivate::get(job)->m_schedSerial;
266 if (m_runningJobs.remove(job)) {
267 Q_ASSERT(!m_queuedJobs.contains(serial));
270 if (m_queuedJobs.remove(serial)) {
279 ret.
reserve(m_runningJobs.size());
281 Worker *worker = jobSWorker(job);
290 Q_UNUSED(queuesBySerial);
291#ifdef SCHEDULER_DEBUG
294 auto it = queuesBySerial->
cbegin();
295 for (; it != queuesBySerial->
cend(); ++it) {
305 Q_UNUSED(runningJobsCount);
306#ifdef SCHEDULER_DEBUG
307 int realRunningJobsCount = 0;
308 auto it = queues->
cbegin();
309 for (; it != queues->
cend(); ++it) {
310 realRunningJobsCount += it.
value().runningJobsCount();
312 Q_ASSERT(realRunningJobsCount == runningJobsCount);
316 auto it2 = queues->
cbegin();
317 for (; it2 != queues->
cend(); ++it2) {
318 for (
SimpleJob *job : it2.value().runningJobs()) {
326ProtoQueue::ProtoQueue(
int maxWorkers,
int maxWorkersPerHost)
327 : m_maxConnectionsPerHost(maxWorkersPerHost ? maxWorkersPerHost : maxWorkers)
328 , m_maxConnectionsTotal(qMax(maxWorkers, maxWorkersPerHost))
329 , m_runningJobsCount(0)
334 Q_ASSERT(m_maxConnectionsPerHost >= 1);
335 Q_ASSERT(maxWorkers >= maxWorkersPerHost);
336 m_startJobTimer.setSingleShot(
true);
340ProtoQueue::~ProtoQueue()
345 m_workerManager.clear();
346 for (Worker *worker : workers) {
355 HostQueue &hq = m_queuesByHostname[
hostname];
356 const int prevLowestSerial = hq.lowestSerial();
357 Q_ASSERT(hq.runningJobsCount() <= m_maxConnectionsPerHost);
360 Q_ASSERT(SimpleJobPrivate::get(job)->m_schedSerial == 0);
361 SimpleJobPrivate::get(job)->m_schedSerial = m_serialPicker.next();
363 const bool wasQueueEmpty = hq.isQueueEmpty();
368 if (prevLowestSerial != hq.lowestSerial()) {
369 if (hq.runningJobsCount() < m_maxConnectionsPerHost) {
371 if (m_queuesBySerial.remove(prevLowestSerial) == 0) {
372 Q_UNUSED(wasQueueEmpty);
373 Q_ASSERT(wasQueueEmpty);
375 m_queuesBySerial.insert(hq.lowestSerial(), &hq);
377#ifdef SCHEDULER_DEBUG
381 Q_ASSERT(!m_queuesBySerial.contains(prevLowestSerial));
386 m_startJobTimer.start();
388 ensureNoDuplicates(&m_queuesBySerial);
391void ProtoQueue::changeJobPriority(
SimpleJob *job,
int newPrio)
393 SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(job);
395 if (it == m_queuesByHostname.end()) {
398 HostQueue &hq = it.
value();
399 const int prevLowestSerial = hq.lowestSerial();
400 if (hq.isJobRunning(job) || !hq.removeJob(job)) {
403 jobPriv->m_schedSerial = m_serialPicker.changedPrioritySerial(jobPriv->m_schedSerial, newPrio);
405 const bool needReinsert = hq.lowestSerial() != prevLowestSerial;
408 if (needReinsert && m_queuesBySerial.remove(prevLowestSerial)) {
409 m_queuesBySerial.insert(hq.lowestSerial(), &hq);
411 ensureNoDuplicates(&m_queuesBySerial);
414void ProtoQueue::removeJob(
SimpleJob *job)
416 SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(job);
417 HostQueue &hq = m_queuesByHostname[jobPriv->m_url.host()];
418 const int prevLowestSerial = hq.lowestSerial();
419 const int prevRunningJobs = hq.runningJobsCount();
421 Q_ASSERT(hq.runningJobsCount() <= m_maxConnectionsPerHost);
423 if (hq.removeJob(job)) {
424 if (hq.lowestSerial() != prevLowestSerial) {
426 Q_ASSERT(!jobPriv->m_worker);
427 Q_ASSERT(prevRunningJobs == hq.runningJobsCount());
428 if (m_queuesBySerial.remove(prevLowestSerial) == 0) {
430 Q_ASSERT(hq.runningJobsCount() == m_maxConnectionsPerHost);
433 if (prevRunningJobs != hq.runningJobsCount()) {
435 Q_ASSERT(prevRunningJobs - 1 == hq.runningJobsCount());
436 m_runningJobsCount--;
437 Q_ASSERT(m_runningJobsCount >= 0);
440 if (!hq.isQueueEmpty() && hq.runningJobsCount() < m_maxConnectionsPerHost) {
442 m_queuesBySerial.insert(hq.lowestSerial(), &hq);
447 m_queuesByHostname.remove(jobPriv->m_url.host());
450 if (jobPriv->m_worker && jobPriv->m_worker->isAlive()) {
451 m_workerManager.returnWorker(jobPriv->m_worker);
454 m_startJobTimer.start();
457 ensureNoDuplicates(&m_queuesBySerial);
464 Worker *worker = Worker::createWorker(protocol, url, error, errortext);
466 connect(worker, &Worker::workerDied, scheduler(), [](KIO::Worker *worker) {
467 schedulerPrivate()->slotWorkerDied(worker);
470 qCWarning(KIO_CORE) <<
"couldn't create worker:" << errortext;
478bool ProtoQueue::removeWorker(KIO::Worker *worker)
480 const bool removed = m_workerManager.removeWorker(worker);
487 auto it = m_queuesByHostname.
cbegin();
488 for (; it != m_queuesByHostname.
cend(); ++it) {
496void ProtoQueue::startAJob()
498 ensureNoDuplicates(&m_queuesBySerial);
499 verifyRunningJobsCount(&m_queuesByHostname, m_runningJobsCount);
501#ifdef SCHEDULER_DEBUG
503 auto it = m_queuesByHostname.
cbegin();
504 for (; it != m_queuesByHostname.
cend(); ++it) {
511 if (m_runningJobsCount >= m_maxConnectionsTotal) {
512#ifdef SCHEDULER_DEBUG
519 if (first != m_queuesBySerial.end()) {
521 HostQueue *hq = first.value();
522 const int prevLowestSerial = first.key();
523 Q_UNUSED(prevLowestSerial);
524 Q_ASSERT(hq->lowestSerial() == prevLowestSerial);
527 Q_ASSERT(hq->runningJobsCount() < m_maxConnectionsPerHost);
528 SimpleJob *startingJob = hq->takeFirstInQueue();
529 Q_ASSERT(hq->runningJobsCount() <= m_maxConnectionsPerHost);
530 Q_ASSERT(hq->lowestSerial() != prevLowestSerial);
532 m_queuesBySerial.erase(first);
535 if (!hq->isQueueEmpty() && hq->runningJobsCount() < m_maxConnectionsPerHost) {
536 m_queuesBySerial.insert(hq->lowestSerial(), hq);
544 m_runningJobsCount++;
546 bool isNewWorker =
false;
547 Worker *worker = m_workerManager.takeWorkerForJob(startingJob);
548 SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(startingJob);
551 worker = createWorker(jobPriv->m_protocol, startingJob, jobPriv->m_url);
555 jobPriv->m_worker = worker;
556 schedulerPrivate()->setupWorker(worker, jobPriv->m_url, jobPriv->m_protocol, jobPriv->m_proxyList, isNewWorker);
557 startJob(startingJob, worker);
563 if (jobPriv->m_schedSerial) {
564 removeJob(startingJob);
565 jobPriv->m_schedSerial = 0;
569#ifdef SCHEDULER_DEBUG
574 if (!m_queuesBySerial.isEmpty()) {
575 m_startJobTimer.start();
579Scheduler::Scheduler()
583#ifndef KIO_ANDROID_STUB
584 const QString dbusPath = QStringLiteral(
"/KIO/Scheduler");
585 const QString dbusInterface = QStringLiteral(
"org.kde.KIO.Scheduler");
593 QStringLiteral(
"reparseSlaveConfiguration"),
599Scheduler::~Scheduler()
605 schedulerPrivate()->doJob(job);
611 schedulerPrivate()->cancelJob(job);
614void Scheduler::jobFinished(
KIO::SimpleJob *job, KIO::Worker *worker)
616 schedulerPrivate()->jobFinished(job, worker);
621 schedulerPrivate()->putWorkerOnHold(job, url);
624void Scheduler::removeWorkerOnHold()
626 schedulerPrivate()->removeWorkerOnHold();
629bool Scheduler::isWorkerOnHoldFor(
const QUrl &url)
631 return schedulerPrivate()->isWorkerOnHoldFor(url);
634void Scheduler::updateInternalMetaData(
SimpleJob *job)
636 schedulerPrivate()->updateInternalMetaData(job);
639void Scheduler::emitReparseSlaveConfiguration()
641#ifndef KIO_ANDROID_STUB
647 schedulerPrivate()->m_ignoreConfigReparse =
true;
651#ifndef KIO_ANDROID_STUB
652void SchedulerPrivate::slotReparseSlaveConfiguration(
const QString &proto,
const QDBusMessage &)
654 if (m_ignoreConfigReparse) {
656 m_ignoreConfigReparse =
false;
662 WorkerConfig::self()->reset();
678 for (; it != endIt; ++it) {
680 for (Worker *worker :
list) {
681 worker->send(CMD_REPARSECONFIGURATION);
688void SchedulerPrivate::doJob(
SimpleJob *job)
691 KIO::SimpleJobPrivate *
const jobPriv = SimpleJobPrivate::get(job);
692 jobPriv->m_proxyList.clear();
693 jobPriv->m_protocol = KProtocolManagerPrivate::workerProtocol(job->
url(), jobPriv->m_proxyList);
695 ProtoQueue *proto = protoQ(jobPriv->m_protocol, job->
url().
host());
696 proto->queueJob(job);
699void SchedulerPrivate::setJobPriority(
SimpleJob *job,
int priority)
702 const QString protocol = SimpleJobPrivate::get(job)->m_protocol;
704 ProtoQueue *proto = protoQ(protocol, job->
url().
host());
705 proto->changeJobPriority(job, priority);
709void SchedulerPrivate::cancelJob(
SimpleJob *job)
711 KIO::SimpleJobPrivate *
const jobPriv = SimpleJobPrivate::get(job);
714 if (jobPriv->m_schedSerial == 0) {
718 Worker *worker = jobSWorker(job);
720 jobFinished(job, worker);
722 ProtoQueue *pq = m_protocols.
value(jobPriv->m_protocol);
724 pq->removeWorker(worker);
730void SchedulerPrivate::jobFinished(
SimpleJob *job, Worker *worker)
733 KIO::SimpleJobPrivate *
const jobPriv = SimpleJobPrivate::get(job);
736 Q_ASSERT(jobPriv->m_schedSerial);
738 ProtoQueue *pq = m_protocols.
value(jobPriv->m_protocol);
746 if (jobPriv->m_internalMetaData.count()) {
748 ProtoQueue *queue = m_protocols.
value(worker->protocol());
751 for (
auto *runningWorker : workers) {
752 if (worker->host() == runningWorker->host()) {
753 worker->setConfig(metaDataFor(worker->protocol(), jobPriv->m_proxyList, job->
url()));
760 worker->setJob(
nullptr);
761 worker->disconnect(job);
763 jobPriv->m_schedSerial = 0;
764 jobPriv->m_worker =
nullptr;
767 jobPriv->m_internalMetaData.clear();
773 MetaData configData = WorkerConfig::self()->configData(protocol, host);
774 sessionData.configDataFor(configData, protocol, host);
776 configData.
remove(QStringLiteral(
"UseProxy"));
777 configData.
remove(QStringLiteral(
"ProxyUrls"));
779 configData[QStringLiteral(
"UseProxy")] = proxyList.
first();
780 configData[QStringLiteral(
"ProxyUrls")] = proxyList.
join(
QLatin1Char(
','));
786void SchedulerPrivate::setupWorker(KIO::Worker *worker,
793 int port = url.
port();
801 if (newWorker || worker->host() != host || worker->port() != port || worker->user() != user || worker->passwd() != passwd) {
802 MetaData configData = metaDataFor(protocol, proxyList, url);
804 configData += *config;
807 worker->setConfig(configData);
808 worker->setProtocol(url.
scheme());
809 worker->setHost(host, port, user, passwd);
813void SchedulerPrivate::slotWorkerDied(KIO::Worker *worker)
817 Q_ASSERT(!worker->isAlive());
818 ProtoQueue *pq = m_protocols.
value(worker->protocol());
821 pq->removeJob(worker->job());
824 pq->removeWorker(worker);
826 if (worker == m_workerOnHold) {
827 m_workerOnHold =
nullptr;
831 worker->aboutToDelete();
832 worker->deleteLater();
837 Worker *worker = jobSWorker(job);
839 worker->disconnect(job);
842 worker->setJob(
nullptr);
843 SimpleJobPrivate::get(job)->m_worker =
nullptr;
845 if (m_workerOnHold) {
846 m_workerOnHold->kill();
848 m_workerOnHold = worker;
850 m_workerOnHold->suspend();
853bool SchedulerPrivate::isWorkerOnHoldFor(
const QUrl &url)
855 if (url.
isValid() && m_urlOnHold.
isValid() && url == m_urlOnHold) {
862Worker *SchedulerPrivate::heldWorkerForJob(
SimpleJob *job)
864 Worker *worker =
nullptr;
865 KIO::SimpleJobPrivate *
const jobPriv = SimpleJobPrivate::get(job);
867 if (m_workerOnHold) {
869 const int cmd = jobPriv->m_command;
870 bool canJobReuse = (cmd == CMD_GET);
873 canJobReuse = (canJobReuse || cmd == CMD_SPECIAL);
876 const QString resume = outgoing.
value(QStringLiteral(
"resume"));
877 const QString rangeStart = outgoing.
value(QStringLiteral(
"range-start"));
883 if (job->
url() == m_urlOnHold) {
886 worker = m_workerOnHold;
889 m_workerOnHold->kill();
891 m_workerOnHold =
nullptr;
899void SchedulerPrivate::removeWorkerOnHold()
902 if (m_workerOnHold) {
903 m_workerOnHold->kill();
905 m_workerOnHold =
nullptr;
909ProtoQueue *SchedulerPrivate::protoQ(
const QString &protocol,
const QString &host)
911 ProtoQueue *pq = m_protocols.
value(protocol,
nullptr);
916 int maxWorkersPerHost = -1;
919 const int value = WorkerConfig::self()->configData(protocol, host, QStringLiteral(
"MaxConnections")).toInt(&ok);
921 maxWorkersPerHost = value;
924 if (maxWorkersPerHost == -1) {
928 pq =
new ProtoQueue(maxWorkers, qMin(maxWorkers, maxWorkersPerHost));
929 m_protocols.
insert(protocol, pq);
934void SchedulerPrivate::updateInternalMetaData(
SimpleJob *job)
936 KIO::SimpleJobPrivate *
const jobPriv = SimpleJobPrivate::get(job);
939 const QUrl jobUrl = job->
url();
945 while (it.hasNext()) {
948 WorkerConfig::self()->setConfigData(jobUrl.
scheme(), jobUrl.
host(), it.
key().mid(currHostToken.size()), it.
value());
950 WorkerConfig::self()->setConfigData(jobUrl.
scheme(),
QString(), it.
key().mid(allHostsToken.size()), it.
value());
955#include "moc_scheduler.cpp"
956#include "moc_scheduler_p.cpp"
A simple job (one url and one command).
void slotError(int, const QString &)
const QUrl & url() const
Returns the SimpleJob's URL.
The transfer job pumps data into and/or out of a KIO worker.
static int maxWorkersPerHost(const QString &protocol)
Returns the limit on the number of KIO workers for this protocol per host.
static int maxWorkers(const QString &protocol)
Returns the soft limit on the number of KIO workers for this protocol.
static void reparseConfiguration()
Force a reload of the general config file of KIO workers ( kioslaverc).
A namespace for KIO globals.
void error(QWidget *parent, const QString &text, const QString &title, const KGuiItem &buttonOk, Options options=Notify)
KIOCORE_EXPORT QStringList list(const QString &fileClass)
Returns a list of directories associated with this file-class.
NETWORKMANAGERQT_EXPORT QString hostname()
bool connect(const QString &service, const QString &path, const QString &interface, const QString &name, QObject *receiver, const char *slot)
QDBusConnection sessionBus()
const_iterator cbegin() const const
const_iterator cend() const const
const_iterator constBegin() const const
const_iterator constEnd() const const
const_iterator constFind(const Key &key) const const
iterator insert(const Key &key, const T &value)
Key key(const T &value) const const
T value(const Key &key) const const
void append(QList< T > &&value)
bool isEmpty() const const
void reserve(qsizetype size)
const_iterator cbegin() const const
const_iterator cend() const const
const_iterator constBegin() const const
Key key(const T &value, const Key &defaultKey) const const
size_type remove(const Key &key)
T value(const Key &key, const T &defaultValue) const const
iterator find(const Key &key, const T &value)
T value(const Key &key) const const
void setObjectName(QAnyStringView name)
bool contains(const QSet< T > &other) const const
iterator insert(const T &value)
bool isEmpty() const const
QString join(QChar separator) const const
QFuture< ArgsType< Signal > > connect(Sender *sender, Signal signal)
QString host(ComponentFormattingOptions options) const const
bool isValid() const const
QString password(ComponentFormattingOptions options) const const
int port(int defaultPort) const const
QString scheme() const const
QString userName(ComponentFormattingOptions options) const const