KIO

scheduler.cpp
1/*
2 This file is part of the KDE libraries
3 SPDX-FileCopyrightText: 2000 Stephan Kulow <coolo@kde.org>
4 SPDX-FileCopyrightText: 2000 Waldo Bastian <bastian@kde.org>
5 SPDX-FileCopyrightText: 2009, 2010 Andreas Hartmetz <ahartmetz@gmail.com>
6
7 SPDX-License-Identifier: LGPL-2.0-only
8*/
9
10#include "scheduler.h"
11#include "scheduler_p.h"
12
13#include "job_p.h"
14#include "worker_p.h"
15#include "workerconfig.h"
16
17#include <kprotocolinfo.h>
18#include <kprotocolmanager.h>
19
20#ifdef WITH_QTDBUS
21#include <QDBusConnection>
22#include <QDBusMessage>
23#endif
24#include <QHash>
25#include <QThread>
26#include <QThreadStorage>
27
28// Workers may be idle for a certain time (3 minutes) before they are killed.
29static const int s_idleWorkerLifetime = 3 * 60;
30
31using namespace KIO;
32
33static inline Worker *jobSWorker(SimpleJob *job)
34{
35 return SimpleJobPrivate::get(job)->m_worker;
36}
37
38static inline void startJob(SimpleJob *job, Worker *worker)
39{
40 SimpleJobPrivate::get(job)->start(worker);
41}
42
43class KIO::SchedulerPrivate
44{
45public:
46 SchedulerPrivate()
47 : q(new Scheduler())
48 {
49 }
50
51 ~SchedulerPrivate()
52 {
53 removeWorkerOnHold();
54 delete q;
55 q = nullptr;
56 qDeleteAll(m_protocols); // ~ProtoQueue will kill and delete all workers
57 }
58
59 SchedulerPrivate(const SchedulerPrivate &) = delete;
60 SchedulerPrivate &operator=(const SchedulerPrivate &) = delete;
61
62 Scheduler *q;
63
64 Worker *m_workerOnHold = nullptr;
65 QUrl m_urlOnHold;
66 bool m_ignoreConfigReparse = false;
67
68 void doJob(SimpleJob *job);
69 void cancelJob(SimpleJob *job);
70 void jobFinished(KIO::SimpleJob *job, KIO::Worker *worker);
71 void putWorkerOnHold(KIO::SimpleJob *job, const QUrl &url);
72 void removeWorkerOnHold();
73 Worker *heldWorkerForJob(KIO::SimpleJob *job);
74 bool isWorkerOnHoldFor(const QUrl &url);
75 void updateInternalMetaData(SimpleJob *job);
76
77 MetaData metaDataFor(const QString &protocol, const QUrl &url);
78 void setupWorker(KIO::Worker *worker, const QUrl &url, const QString &protocol, bool newWorker, const KIO::MetaData *config = nullptr);
79
80 void slotWorkerDied(KIO::Worker *worker);
81
82#ifdef WITH_QTDBUS
83 void slotReparseSlaveConfiguration(const QString &, const QDBusMessage &);
84#endif
85
86 ProtoQueue *protoQ(const QString &protocol, const QString &host);
87
88private:
90};
91
93static SchedulerPrivate *schedulerPrivate()
94{
95 if (!s_storage.hasLocalData()) {
96 s_storage.setLocalData(new SchedulerPrivate);
97 }
98 return s_storage.localData();
99}
100
101Scheduler *Scheduler::self()
102{
103 return schedulerPrivate()->q;
104}
105
106SchedulerPrivate *Scheduler::d_func()
107{
108 return schedulerPrivate();
109}
110
111// static
112Scheduler *scheduler()
113{
114 return schedulerPrivate()->q;
115}
116
117////////////////////////////
118
119WorkerManager::WorkerManager()
120{
121 m_grimTimer.setSingleShot(true);
122 connect(&m_grimTimer, &QTimer::timeout, this, &WorkerManager::grimReaper);
123}
124
125WorkerManager::~WorkerManager()
126{
127 grimReaper();
128}
129
130void WorkerManager::returnWorker(Worker *worker)
131{
132 Q_ASSERT(worker);
133 worker->setIdle();
134 m_idleWorkers.insert(worker->host(), worker);
135 scheduleGrimReaper();
136}
137
138Worker *WorkerManager::takeWorkerForJob(SimpleJob *job)
139{
140 Worker *worker = schedulerPrivate()->heldWorkerForJob(job);
141 if (worker) {
142 return worker;
143 }
144
145 QUrl url = SimpleJobPrivate::get(job)->m_url;
146 // TODO take port, username and password into account
147 QMultiHash<QString, Worker *>::Iterator it = m_idleWorkers.find(url.host());
148 if (it == m_idleWorkers.end()) {
149 it = m_idleWorkers.begin();
150 }
151 if (it == m_idleWorkers.end()) {
152 return nullptr;
153 }
154 worker = it.value();
155 m_idleWorkers.erase(it);
156 return worker;
157}
158
159bool WorkerManager::removeWorker(Worker *worker)
160{
161 // ### performance not so great
163 for (; it != m_idleWorkers.end(); ++it) {
164 if (it.value() == worker) {
165 m_idleWorkers.erase(it);
166 return true;
167 }
168 }
169 return false;
170}
171
172void WorkerManager::clear()
173{
174 m_idleWorkers.clear();
175}
176
177QList<Worker *> WorkerManager::allWorkers() const
178{
179 return m_idleWorkers.values();
180}
181
182void WorkerManager::scheduleGrimReaper()
183{
184 if (!m_grimTimer.isActive()) {
185 m_grimTimer.start((s_idleWorkerLifetime / 2) * 1000);
186 }
187}
188
189// private slot
190void WorkerManager::grimReaper()
191{
193 while (it != m_idleWorkers.end()) {
194 Worker *worker = it.value();
195 if (worker->idleTime() >= s_idleWorkerLifetime) {
196 it = m_idleWorkers.erase(it);
197 if (worker->job()) {
198 // qDebug() << "Idle worker" << worker << "still has job" << worker->job();
199 }
200 // avoid invoking slotWorkerDied() because its cleanup services are not needed
201 worker->kill();
202 } else {
203 ++it;
204 }
205 }
206 if (!m_idleWorkers.isEmpty()) {
207 scheduleGrimReaper();
208 }
209}
210
211int HostQueue::lowestSerial() const
212{
213 QMap<int, SimpleJob *>::ConstIterator first = m_queuedJobs.constBegin();
214 if (first != m_queuedJobs.constEnd()) {
215 return first.key();
216 }
217 return SerialPicker::maxSerial;
218}
219
220void HostQueue::queueJob(SimpleJob *job)
221{
222 const int serial = SimpleJobPrivate::get(job)->m_schedSerial;
223 Q_ASSERT(serial != 0);
224 Q_ASSERT(!m_queuedJobs.contains(serial));
225 Q_ASSERT(!m_runningJobs.contains(job));
226 m_queuedJobs.insert(serial, job);
227}
228
229SimpleJob *HostQueue::takeFirstInQueue()
230{
231 Q_ASSERT(!m_queuedJobs.isEmpty());
232 QMap<int, SimpleJob *>::iterator first = m_queuedJobs.begin();
233 SimpleJob *job = first.value();
234 m_queuedJobs.erase(first);
235 m_runningJobs.insert(job);
236 return job;
237}
238
239bool HostQueue::removeJob(SimpleJob *job)
240{
241 const int serial = SimpleJobPrivate::get(job)->m_schedSerial;
242 if (m_runningJobs.remove(job)) {
243 Q_ASSERT(!m_queuedJobs.contains(serial));
244 return true;
245 }
246 if (m_queuedJobs.remove(serial)) {
247 return true;
248 }
249 return false;
250}
251
252QList<Worker *> HostQueue::allWorkers() const
253{
254 QList<Worker *> ret;
255 ret.reserve(m_runningJobs.size());
256 for (SimpleJob *job : m_runningJobs) {
257 Worker *worker = jobSWorker(job);
258 Q_ASSERT(worker);
259 ret.append(worker);
260 }
261 return ret;
262}
263
264static void ensureNoDuplicates(QMap<int, HostQueue *> *queuesBySerial)
265{
266 Q_UNUSED(queuesBySerial);
267#ifdef SCHEDULER_DEBUG
268 // a host queue may *never* be in queuesBySerial twice.
270 auto it = queuesBySerial->cbegin();
271 for (; it != queuesBySerial->cend(); ++it) {
272 Q_ASSERT(!seen.contains(it.value()));
273 seen.insert(it.value());
274 }
275#endif
276}
277
278static void verifyRunningJobsCount(QHash<QString, HostQueue> *queues, int runningJobsCount)
279{
280 Q_UNUSED(queues);
281 Q_UNUSED(runningJobsCount);
282#ifdef SCHEDULER_DEBUG
283 int realRunningJobsCount = 0;
284 auto it = queues->cbegin();
285 for (; it != queues->cend(); ++it) {
286 realRunningJobsCount += it.value().runningJobsCount();
287 }
288 Q_ASSERT(realRunningJobsCount == runningJobsCount);
289
290 // ...and of course we may never run the same job twice!
291 QSet<SimpleJob *> seenJobs;
292 auto it2 = queues->cbegin();
293 for (; it2 != queues->cend(); ++it2) {
294 for (SimpleJob *job : it2.value().runningJobs()) {
295 Q_ASSERT(!seenJobs.contains(job));
296 seenJobs.insert(job);
297 }
298 }
299#endif
300}
301
302ProtoQueue::ProtoQueue(int maxWorkers, int maxWorkersPerHost)
303 : m_maxConnectionsPerHost(maxWorkersPerHost ? maxWorkersPerHost : maxWorkers)
304 , m_maxConnectionsTotal(qMax(maxWorkers, maxWorkersPerHost))
305 , m_runningJobsCount(0)
306
307{
308 /*qDebug() << "m_maxConnectionsTotal:" << m_maxConnectionsTotal
309 << "m_maxConnectionsPerHost:" << m_maxConnectionsPerHost;*/
310 Q_ASSERT(m_maxConnectionsPerHost >= 1);
311 Q_ASSERT(maxWorkers >= maxWorkersPerHost);
312 m_startJobTimer.setSingleShot(true);
313 connect(&m_startJobTimer, &QTimer::timeout, this, &ProtoQueue::startAJob);
314}
315
316ProtoQueue::~ProtoQueue()
317{
318 // Gather list of all workers first
319 const QList<Worker *> workers = allWorkers();
320 // Clear the idle workers in the manager to avoid dangling pointers
321 m_workerManager.clear();
322 for (Worker *worker : workers) {
323 // kill the worker process and remove the interface in our process
324 worker->kill();
325 }
326}
327
328void ProtoQueue::queueJob(SimpleJob *job)
329{
330 QString hostname = SimpleJobPrivate::get(job)->m_url.host();
331 HostQueue &hq = m_queuesByHostname[hostname];
332 const int prevLowestSerial = hq.lowestSerial();
333 Q_ASSERT(hq.runningJobsCount() <= m_maxConnectionsPerHost);
334
335 // nevert insert a job twice
336 Q_ASSERT(SimpleJobPrivate::get(job)->m_schedSerial == 0);
337 SimpleJobPrivate::get(job)->m_schedSerial = m_serialPicker.next();
338
339 const bool wasQueueEmpty = hq.isQueueEmpty();
340 hq.queueJob(job);
341 // note that HostQueue::queueJob() into an empty queue changes its lowestSerial() too...
342 // the queue's lowest serial job may have changed, so update the ordered list of queues.
343 // however, we ignore all jobs that would cause more connections to a host than allowed.
344 if (prevLowestSerial != hq.lowestSerial()) {
345 if (hq.runningJobsCount() < m_maxConnectionsPerHost) {
346 // if the connection limit didn't keep the HQ unscheduled it must have been lack of jobs
347 if (m_queuesBySerial.remove(prevLowestSerial) == 0) {
348 Q_UNUSED(wasQueueEmpty);
349 Q_ASSERT(wasQueueEmpty);
350 }
351 m_queuesBySerial.insert(hq.lowestSerial(), &hq);
352 } else {
353#ifdef SCHEDULER_DEBUG
354 // ### this assertion may fail if the limits were modified at runtime!
355 // if the per-host connection limit is already reached the host queue's lowest serial
356 // should not be queued.
357 Q_ASSERT(!m_queuesBySerial.contains(prevLowestSerial));
358#endif
359 }
360 }
361 // just in case; startAJob() will refuse to start a job if it shouldn't.
362 m_startJobTimer.start();
363
364 ensureNoDuplicates(&m_queuesBySerial);
365}
366
367void ProtoQueue::removeJob(SimpleJob *job)
368{
369 SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(job);
370 HostQueue &hq = m_queuesByHostname[jobPriv->m_url.host()];
371 const int prevLowestSerial = hq.lowestSerial();
372 const int prevRunningJobs = hq.runningJobsCount();
373
374 Q_ASSERT(hq.runningJobsCount() <= m_maxConnectionsPerHost);
375
376 if (hq.removeJob(job)) {
377 if (hq.lowestSerial() != prevLowestSerial) {
378 // we have dequeued the not yet running job with the lowest serial
379 Q_ASSERT(!jobPriv->m_worker);
380 Q_ASSERT(prevRunningJobs == hq.runningJobsCount());
381 if (m_queuesBySerial.remove(prevLowestSerial) == 0) {
382 // make sure that the queue was not scheduled for a good reason
383 Q_ASSERT(hq.runningJobsCount() == m_maxConnectionsPerHost);
384 }
385 } else {
386 if (prevRunningJobs != hq.runningJobsCount()) {
387 // we have dequeued a previously running job
388 Q_ASSERT(prevRunningJobs - 1 == hq.runningJobsCount());
389 m_runningJobsCount--;
390 Q_ASSERT(m_runningJobsCount >= 0);
391 }
392 }
393 if (!hq.isQueueEmpty() && hq.runningJobsCount() < m_maxConnectionsPerHost) {
394 // this may be a no-op, but it's faster than first checking if it's already in.
395 m_queuesBySerial.insert(hq.lowestSerial(), &hq);
396 }
397
398 if (hq.isEmpty()) {
399 // no queued jobs, no running jobs. this destroys hq from above.
400 m_queuesByHostname.remove(jobPriv->m_url.host());
401 }
402
403 if (jobPriv->m_worker && jobPriv->m_worker->isAlive()) {
404 m_workerManager.returnWorker(jobPriv->m_worker);
405 }
406 // just in case; startAJob() will refuse to start a job if it shouldn't.
407 m_startJobTimer.start();
408 }
409
410 ensureNoDuplicates(&m_queuesBySerial);
411}
412
413Worker *ProtoQueue::createWorker(const QString &protocol, SimpleJob *job, const QUrl &url)
414{
415 int error;
416 QString errortext;
417 Worker *worker = Worker::createWorker(protocol, url, error, errortext);
418 if (worker) {
419 connect(worker, &Worker::workerDied, scheduler(), [](KIO::Worker *worker) {
420 schedulerPrivate()->slotWorkerDied(worker);
421 });
422 } else {
423 qCWarning(KIO_CORE) << "couldn't create worker:" << errortext;
424 if (job) {
425 job->slotError(error, errortext);
426 }
427 }
428 return worker;
429}
430
431bool ProtoQueue::removeWorker(KIO::Worker *worker)
432{
433 const bool removed = m_workerManager.removeWorker(worker);
434 return removed;
435}
436
437QList<Worker *> ProtoQueue::allWorkers() const
438{
439 QList<Worker *> ret(m_workerManager.allWorkers());
440 auto it = m_queuesByHostname.cbegin();
441 for (; it != m_queuesByHostname.cend(); ++it) {
442 ret.append(it.value().allWorkers());
443 }
444
445 return ret;
446}
447
448// private slot
449void ProtoQueue::startAJob()
450{
451 ensureNoDuplicates(&m_queuesBySerial);
452 verifyRunningJobsCount(&m_queuesByHostname, m_runningJobsCount);
453
454#ifdef SCHEDULER_DEBUG
455 // qDebug() << "m_runningJobsCount:" << m_runningJobsCount;
456 auto it = m_queuesByHostname.cbegin();
457 for (; it != m_queuesByHostname.cend(); ++it) {
458 const QList<KIO::SimpleJob *> list = it.value().runningJobs();
459 for (SimpleJob *job : list) {
460 // qDebug() << SimpleJobPrivate::get(job)->m_url;
461 }
462 }
463#endif
464 if (m_runningJobsCount >= m_maxConnectionsTotal) {
465#ifdef SCHEDULER_DEBUG
466 // qDebug() << "not starting any jobs because maxConnectionsTotal has been reached.";
467#endif
468 return;
469 }
470
471 QMap<int, HostQueue *>::iterator first = m_queuesBySerial.begin();
472 if (first != m_queuesBySerial.end()) {
473 // pick a job and maintain the queue invariant: lower serials first
474 HostQueue *hq = first.value();
475 const int prevLowestSerial = first.key();
476 Q_UNUSED(prevLowestSerial);
477 Q_ASSERT(hq->lowestSerial() == prevLowestSerial);
478 // the following assertions should hold due to queueJob(), takeFirstInQueue() and
479 // removeJob() being correct
480 Q_ASSERT(hq->runningJobsCount() < m_maxConnectionsPerHost);
481 SimpleJob *startingJob = hq->takeFirstInQueue();
482 Q_ASSERT(hq->runningJobsCount() <= m_maxConnectionsPerHost);
483 Q_ASSERT(hq->lowestSerial() != prevLowestSerial);
484
485 m_queuesBySerial.erase(first);
486 // we've increased hq's runningJobsCount() by calling nexStartingJob()
487 // so we need to check again.
488 if (!hq->isQueueEmpty() && hq->runningJobsCount() < m_maxConnectionsPerHost) {
489 m_queuesBySerial.insert(hq->lowestSerial(), hq);
490 }
491
492 // always increase m_runningJobsCount because it's correct if there is a worker and if there
493 // is no worker, removeJob() will balance the number again. removeJob() would decrease the
494 // number too much otherwise.
495 // Note that createWorker() can call slotError() on a job which in turn calls removeJob(),
496 // so increase the count here already.
497 m_runningJobsCount++;
498
499 bool isNewWorker = false;
500 Worker *worker = m_workerManager.takeWorkerForJob(startingJob);
501 SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(startingJob);
502 if (!worker) {
503 isNewWorker = true;
504 worker = createWorker(jobPriv->m_protocol, startingJob, jobPriv->m_url);
505 }
506
507 if (worker) {
508 jobPriv->m_worker = worker;
509 schedulerPrivate()->setupWorker(worker, jobPriv->m_url, jobPriv->m_protocol, isNewWorker);
510 startJob(startingJob, worker);
511 } else {
512 // dispose of our records about the job and mark the job as unknown
513 // (to prevent crashes later)
514 // note that the job's slotError() can have called removeJob() first, so check that
515 // it's not a ghost job with null serial already.
516 if (jobPriv->m_schedSerial) {
517 removeJob(startingJob);
518 jobPriv->m_schedSerial = 0;
519 }
520 }
521 } else {
522#ifdef SCHEDULER_DEBUG
523 // qDebug() << "not starting any jobs because there is no queued job.";
524#endif
525 }
526
527 if (!m_queuesBySerial.isEmpty()) {
528 m_startJobTimer.start();
529 }
530}
531
532Scheduler::Scheduler()
533{
534 setObjectName(QStringLiteral("scheduler"));
535
536#ifdef WITH_QTDBUS
537 const QString dbusPath = QStringLiteral("/KIO/Scheduler");
538 const QString dbusInterface = QStringLiteral("org.kde.KIO.Scheduler");
540 // Not needed, right? We just want to emit two signals.
541 // dbus.registerObject(dbusPath, this, QDBusConnection::ExportScriptableSlots |
542 // QDBusConnection::ExportScriptableSignals);
543 dbus.connect(QString(),
544 dbusPath,
545 dbusInterface,
546 QStringLiteral("reparseSlaveConfiguration"),
547 this,
548 SLOT(slotReparseSlaveConfiguration(QString, QDBusMessage)));
549#endif
550}
551
552Scheduler::~Scheduler()
553{
554}
555
556void Scheduler::doJob(SimpleJob *job)
557{
558 schedulerPrivate()->doJob(job);
559}
560
561// static
562void Scheduler::cancelJob(SimpleJob *job)
563{
564 schedulerPrivate()->cancelJob(job);
565}
566
567void Scheduler::jobFinished(KIO::SimpleJob *job, KIO::Worker *worker)
568{
569 schedulerPrivate()->jobFinished(job, worker);
570}
571
572void Scheduler::putWorkerOnHold(KIO::SimpleJob *job, const QUrl &url)
573{
574 schedulerPrivate()->putWorkerOnHold(job, url);
575}
576
577void Scheduler::removeWorkerOnHold()
578{
579 schedulerPrivate()->removeWorkerOnHold();
580}
581
582bool Scheduler::isWorkerOnHoldFor(const QUrl &url)
583{
584 return schedulerPrivate()->isWorkerOnHoldFor(url);
585}
586
587void Scheduler::updateInternalMetaData(SimpleJob *job)
588{
589 schedulerPrivate()->updateInternalMetaData(job);
590}
591
592void Scheduler::emitReparseSlaveConfiguration()
593{
594#ifdef WITH_QTDBUS
595 // Do it immediately in this process, otherwise we might send a request before reparsing
596 // (e.g. when changing useragent in the plugin)
597 schedulerPrivate()->slotReparseSlaveConfiguration(QString(), QDBusMessage());
598#endif
599
600 schedulerPrivate()->m_ignoreConfigReparse = true;
601 Q_EMIT self()->reparseSlaveConfiguration(QString());
602}
603
604#ifdef WITH_QTDBUS
605void SchedulerPrivate::slotReparseSlaveConfiguration(const QString &proto, const QDBusMessage &)
606{
607 if (m_ignoreConfigReparse) {
608 // qDebug() << "Ignoring signal sent by myself";
609 m_ignoreConfigReparse = false;
610 return;
611 }
612
613 // qDebug() << "proto=" << proto;
615 WorkerConfig::self()->reset();
616
617 QHash<QString, ProtoQueue *>::ConstIterator it = proto.isEmpty() ? m_protocols.constBegin() : m_protocols.constFind(proto);
619
620 // not found?
621 if (it == endIt) {
622 return;
623 }
624
625 if (!proto.isEmpty()) {
626 endIt = it;
627 ++endIt;
628 }
629
630 for (; it != endIt; ++it) {
631 const QList<KIO::Worker *> list = it.value()->allWorkers();
632 for (Worker *worker : list) {
633 worker->send(CMD_REPARSECONFIGURATION);
634 worker->resetHost();
635 }
636 }
637}
638#endif
639
640void SchedulerPrivate::doJob(SimpleJob *job)
641{
642 // qDebug() << job;
643 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job);
644 jobPriv->m_protocol = job->url().scheme();
645
646 ProtoQueue *proto = protoQ(jobPriv->m_protocol, job->url().host());
647 proto->queueJob(job);
648}
649
650void SchedulerPrivate::cancelJob(SimpleJob *job)
651{
652 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job);
653 // this method is called all over the place in job.cpp, so just do this check here to avoid
654 // much boilerplate in job code.
655 if (jobPriv->m_schedSerial == 0) {
656 // qDebug() << "Doing nothing because I don't know job" << job;
657 return;
658 }
659 Worker *worker = jobSWorker(job);
660 // qDebug() << job << worker;
661 jobFinished(job, worker);
662 if (worker) {
663 ProtoQueue *pq = m_protocols.value(jobPriv->m_protocol);
664 if (pq) {
665 pq->removeWorker(worker);
666 }
667 worker->kill(); // don't use worker after this!
668 }
669}
670
671void SchedulerPrivate::jobFinished(SimpleJob *job, Worker *worker)
672{
673 // qDebug() << job << worker;
674 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job);
675
676 // make sure that we knew about the job!
677 Q_ASSERT(jobPriv->m_schedSerial);
678
679 ProtoQueue *pq = m_protocols.value(jobPriv->m_protocol);
680 if (pq) {
681 pq->removeJob(job);
682 }
683
684 if (worker) {
685 // If we have internal meta-data, tell existing KIO workers to reload
686 // their configuration.
687 if (jobPriv->m_internalMetaData.count()) {
688 // qDebug() << "Updating KIO workers with new internal metadata information";
689 ProtoQueue *queue = m_protocols.value(worker->protocol());
690 if (queue) {
691 const QList<Worker *> workers = queue->allWorkers();
692 for (auto *runningWorker : workers) {
693 if (worker->host() == runningWorker->host()) {
694 worker->setConfig(metaDataFor(worker->protocol(), job->url()));
695 /*qDebug() << "Updated configuration of" << worker->protocol()
696 << "KIO worker, pid=" << worker->worker_pid();*/
697 }
698 }
699 }
700 }
701 worker->setJob(nullptr);
702 worker->disconnect(job);
703 }
704 jobPriv->m_schedSerial = 0; // this marks the job as unscheduled again
705 jobPriv->m_worker = nullptr;
706 // Clear the values in the internal metadata container since they have
707 // already been taken care of above...
708 jobPriv->m_internalMetaData.clear();
709}
710
711MetaData SchedulerPrivate::metaDataFor(const QString &protocol, const QUrl &url)
712{
713 const QString host = url.host();
714 MetaData configData = WorkerConfig::self()->configData(protocol, host);
715
716 return configData;
717}
718
719void SchedulerPrivate::setupWorker(KIO::Worker *worker, const QUrl &url, const QString &protocol, bool newWorker, const KIO::MetaData *config)
720{
721 int port = url.port();
722 if (port == -1) { // no port is -1 in QUrl, but in kde3 we used 0 and the KIO workers assume that.
723 port = 0;
724 }
725 const QString host = url.host();
726 const QString user = url.userName();
727 const QString passwd = url.password();
728
729 if (newWorker || worker->host() != host || worker->port() != port || worker->user() != user || worker->passwd() != passwd) {
730 MetaData configData = metaDataFor(protocol, url);
731 if (config) {
732 configData += *config;
733 }
734
735 worker->setConfig(configData);
736 worker->setProtocol(url.scheme());
737 worker->setHost(host, port, user, passwd);
738 }
739}
740
741void SchedulerPrivate::slotWorkerDied(KIO::Worker *worker)
742{
743 // qDebug() << worker;
744 Q_ASSERT(worker);
745 Q_ASSERT(!worker->isAlive());
746 ProtoQueue *pq = m_protocols.value(worker->protocol());
747 if (pq) {
748 if (worker->job()) {
749 pq->removeJob(worker->job());
750 }
751 // in case this was a connected worker...
752 pq->removeWorker(worker);
753 }
754 if (worker == m_workerOnHold) {
755 m_workerOnHold = nullptr;
756 m_urlOnHold.clear();
757 }
758 // can't use worker->deref() here because we need to use deleteLater
759 worker->aboutToDelete();
760 worker->deleteLater();
761}
762
763void SchedulerPrivate::putWorkerOnHold(KIO::SimpleJob *job, const QUrl &url)
764{
765 Worker *worker = jobSWorker(job);
766 // qDebug() << job << url << worker;
767 worker->disconnect(job);
768 // prevent the fake death of the worker from trying to kill the job again;
769 // cf. Worker::hold(const QUrl &url) called in SchedulerPrivate::publishWorkerOnHold().
770 worker->setJob(nullptr);
771 SimpleJobPrivate::get(job)->m_worker = nullptr;
772
773 if (m_workerOnHold) {
774 m_workerOnHold->kill();
775 }
776 m_workerOnHold = worker;
777 m_urlOnHold = url;
778 m_workerOnHold->suspend();
779}
780
781bool SchedulerPrivate::isWorkerOnHoldFor(const QUrl &url)
782{
783 if (url.isValid() && m_urlOnHold.isValid() && url == m_urlOnHold) {
784 return true;
785 }
786
787 return false;
788}
789
790Worker *SchedulerPrivate::heldWorkerForJob(SimpleJob *job)
791{
792 Worker *worker = nullptr;
793 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job);
794
795 if (m_workerOnHold) {
796 // Make sure that the job wants to do a GET or a POST, and with no offset
797 const int cmd = jobPriv->m_command;
798 bool canJobReuse = (cmd == CMD_GET);
799
800 if (KIO::TransferJob *tJob = qobject_cast<KIO::TransferJob *>(job)) {
801 canJobReuse = (canJobReuse || cmd == CMD_SPECIAL);
802 if (canJobReuse) {
803 KIO::MetaData outgoing = tJob->outgoingMetaData();
804 const QString resume = outgoing.value(QStringLiteral("resume"));
805 const QString rangeStart = outgoing.value(QStringLiteral("range-start"));
806 // qDebug() << "Resume metadata is" << resume;
807 canJobReuse = (resume.isEmpty() || resume == QLatin1Char('0')) && (rangeStart.isEmpty() || rangeStart == QLatin1Char('0'));
808 }
809 }
810
811 if (job->url() == m_urlOnHold) {
812 if (canJobReuse) {
813 // qDebug() << "HOLD: Reusing held worker (" << m_workerOnHold << ")";
814 worker = m_workerOnHold;
815 } else {
816 // qDebug() << "HOLD: Discarding held worker (" << m_workerOnHold << ")";
817 m_workerOnHold->kill();
818 }
819 m_workerOnHold = nullptr;
820 m_urlOnHold.clear();
821 }
822 }
823
824 return worker;
825}
826
827void SchedulerPrivate::removeWorkerOnHold()
828{
829 // qDebug() << m_workerOnHold;
830 if (m_workerOnHold) {
831 m_workerOnHold->kill();
832 }
833 m_workerOnHold = nullptr;
834 m_urlOnHold.clear();
835}
836
837ProtoQueue *SchedulerPrivate::protoQ(const QString &protocol, const QString &host)
838{
839 ProtoQueue *pq = m_protocols.value(protocol, nullptr);
840 if (!pq) {
841 // qDebug() << "creating ProtoQueue instance for" << protocol;
842
843 const int maxWorkers = KProtocolInfo::maxWorkers(protocol);
844 int maxWorkersPerHost = -1;
845 if (!host.isEmpty()) {
846 bool ok = false;
847 const int value = WorkerConfig::self()->configData(protocol, host, QStringLiteral("MaxConnections")).toInt(&ok);
848 if (ok) {
849 maxWorkersPerHost = value;
850 }
851 }
852 if (maxWorkersPerHost == -1) {
853 maxWorkersPerHost = KProtocolInfo::maxWorkersPerHost(protocol);
854 }
855 // Never allow maxWorkersPerHost to exceed maxWorkers.
856 pq = new ProtoQueue(maxWorkers, qMin(maxWorkers, maxWorkersPerHost));
857 m_protocols.insert(protocol, pq);
858 }
859 return pq;
860}
861
862void SchedulerPrivate::updateInternalMetaData(SimpleJob *job)
863{
864 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job);
865 // Preserve all internal meta-data so they can be sent back to the
866 // KIO workers as needed...
867 const QUrl jobUrl = job->url();
868
869 const QLatin1String currHostToken("{internal~currenthost}");
870 const QLatin1String allHostsToken("{internal~allhosts}");
871 // qDebug() << job << jobPriv->m_internalMetaData;
872 QMapIterator<QString, QString> it(jobPriv->m_internalMetaData);
873 while (it.hasNext()) {
874 it.next();
875 if (it.key().startsWith(currHostToken, Qt::CaseInsensitive)) {
876 WorkerConfig::self()->setConfigData(jobUrl.scheme(), jobUrl.host(), it.key().mid(currHostToken.size()), it.value());
877 } else if (it.key().startsWith(allHostsToken, Qt::CaseInsensitive)) {
878 WorkerConfig::self()->setConfigData(jobUrl.scheme(), QString(), it.key().mid(allHostsToken.size()), it.value());
879 }
880 }
881}
882
883#include "moc_scheduler.cpp"
884#include "moc_scheduler_p.cpp"
MetaData is a simple map of key/value strings.
A simple job (one url and one command).
void slotError(int, const QString &)
const QUrl & url() const
Returns the SimpleJob's URL.
Definition simplejob.cpp:70
The transfer job pumps data into and/or out of a KIO worker.
static int maxWorkersPerHost(const QString &protocol)
Returns the limit on the number of KIO workers for this protocol per host.
static int maxWorkers(const QString &protocol)
Returns the soft limit on the number of KIO workers for this protocol.
static void reparseConfiguration()
Force a reload of the general config file of KIO workers ( kioslaverc).
A namespace for KIO globals.
void error(QWidget *parent, const QString &text, const QString &title, const KGuiItem &buttonOk, Options options=Notify)
KIOCORE_EXPORT QStringList list(const QString &fileClass)
Returns a list of directories associated with this file-class.
NETWORKMANAGERQT_EXPORT QString hostname()
bool connect(const QString &service, const QString &path, const QString &interface, const QString &name, QObject *receiver, const char *slot)
QDBusConnection sessionBus()
const_iterator cbegin() const const
const_iterator cend() const const
const_iterator constBegin() const const
const_iterator constEnd() const const
const_iterator constFind(const Key &key) const const
iterator insert(const Key &key, const T &value)
Key key(const T &value) const const
T value(const Key &key) const const
void append(QList< T > &&value)
void reserve(qsizetype size)
ConstIterator
iterator begin()
const_iterator cbegin() const const
const_iterator cend() const const
Key key(const T &value, const Key &defaultKey) const const
T value(const Key &key, const T &defaultValue) const const
iterator begin()
const_iterator cbegin() const const
const_iterator cend() const const
iterator end()
iterator find(const Key &key, const T &value)
T value(const Key &key) const const
Q_EMITQ_EMIT
void setObjectName(QAnyStringView name)
bool contains(const QSet< T > &other) const const
iterator insert(const T &value)
bool isEmpty() const const
CaseInsensitive
QFuture< ArgsType< Signal > > connect(Sender *sender, Signal signal)
void timeout()
void clear()
QString host(ComponentFormattingOptions options) const const
bool isValid() const const
QString password(ComponentFormattingOptions options) const const
int port(int defaultPort) const const
QString scheme() const const
QString userName(ComponentFormattingOptions options) const const
This file is part of the KDE documentation.
Documentation copyright © 1996-2025 The KDE developers.
Generated on Fri Jan 3 2025 11:56:12 by doxygen 1.12.0 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.