KIO

scheduler.cpp
1/*
2 This file is part of the KDE libraries
3 SPDX-FileCopyrightText: 2000 Stephan Kulow <coolo@kde.org>
4 SPDX-FileCopyrightText: 2000 Waldo Bastian <bastian@kde.org>
5 SPDX-FileCopyrightText: 2009, 2010 Andreas Hartmetz <ahartmetz@gmail.com>
6
7 SPDX-License-Identifier: LGPL-2.0-only
8*/
9
10#include "scheduler.h"
11#include "scheduler_p.h"
12
13#include "job_p.h"
14#include "worker_p.h"
15#include "workerconfig.h"
16
17#include <kprotocolinfo.h>
18#include <kprotocolmanager.h>
19
20#ifdef WITH_QTDBUS
21#include <QDBusConnection>
22#include <QDBusMessage>
23#endif
24#include <QHash>
25#include <QThread>
26#include <QThreadStorage>
27
28// Workers may be idle for a certain time (3 minutes) before they are killed.
29static const int s_idleWorkerLifetime = 3 * 60;
30
31using namespace KIO;
32
33static inline Worker *jobSWorker(SimpleJob *job)
34{
35 return SimpleJobPrivate::get(job)->m_worker;
36}
37
38static inline int jobCommand(SimpleJob *job)
39{
40 return SimpleJobPrivate::get(job)->m_command;
41}
42
43static inline void startJob(SimpleJob *job, Worker *worker)
44{
45 SimpleJobPrivate::get(job)->start(worker);
46}
47
48class KIO::SchedulerPrivate
49{
50public:
51 SchedulerPrivate()
52 : q(new Scheduler())
53 {
54 }
55
56 ~SchedulerPrivate()
57 {
58 removeWorkerOnHold();
59 delete q;
60 q = nullptr;
61 qDeleteAll(m_protocols); // ~ProtoQueue will kill and delete all workers
62 }
63
64 SchedulerPrivate(const SchedulerPrivate &) = delete;
65 SchedulerPrivate &operator=(const SchedulerPrivate &) = delete;
66
67 Scheduler *q;
68
69 Worker *m_workerOnHold = nullptr;
70 QUrl m_urlOnHold;
71 bool m_ignoreConfigReparse = false;
72
73 void doJob(SimpleJob *job);
74 void setJobPriority(SimpleJob *job, int priority);
75 void cancelJob(SimpleJob *job);
76 void jobFinished(KIO::SimpleJob *job, KIO::Worker *worker);
77 void putWorkerOnHold(KIO::SimpleJob *job, const QUrl &url);
78 void removeWorkerOnHold();
79 Worker *heldWorkerForJob(KIO::SimpleJob *job);
80 bool isWorkerOnHoldFor(const QUrl &url);
81 void updateInternalMetaData(SimpleJob *job);
82
83 MetaData metaDataFor(const QString &protocol, const QUrl &url);
84 void setupWorker(KIO::Worker *worker, const QUrl &url, const QString &protocol, bool newWorker, const KIO::MetaData *config = nullptr);
85
86 void slotWorkerDied(KIO::Worker *worker);
87
88#ifdef WITH_QTDBUS
89 void slotReparseSlaveConfiguration(const QString &, const QDBusMessage &);
90#endif
91
92 ProtoQueue *protoQ(const QString &protocol, const QString &host);
93
94private:
96};
97
99static SchedulerPrivate *schedulerPrivate()
100{
101 if (!s_storage.hasLocalData()) {
102 s_storage.setLocalData(new SchedulerPrivate);
103 }
104 return s_storage.localData();
105}
106
107Scheduler *Scheduler::self()
108{
109 return schedulerPrivate()->q;
110}
111
112SchedulerPrivate *Scheduler::d_func()
113{
114 return schedulerPrivate();
115}
116
117// static
118Scheduler *scheduler()
119{
120 return schedulerPrivate()->q;
121}
122
123////////////////////////////
124
125int SerialPicker::changedPrioritySerial(int oldSerial, int newPriority) const
126{
127 Q_ASSERT(newPriority >= -10 && newPriority <= 10);
128 newPriority = qBound(-10, newPriority, 10);
129 int unbiasedSerial = oldSerial % m_jobsPerPriority;
130 return unbiasedSerial + newPriority * m_jobsPerPriority;
131}
132
133WorkerManager::WorkerManager()
134{
135 m_grimTimer.setSingleShot(true);
136 connect(&m_grimTimer, &QTimer::timeout, this, &WorkerManager::grimReaper);
137}
138
139WorkerManager::~WorkerManager()
140{
141 grimReaper();
142}
143
144void WorkerManager::returnWorker(Worker *worker)
145{
146 Q_ASSERT(worker);
147 worker->setIdle();
148 m_idleWorkers.insert(worker->host(), worker);
149 scheduleGrimReaper();
150}
151
152Worker *WorkerManager::takeWorkerForJob(SimpleJob *job)
153{
154 Worker *worker = schedulerPrivate()->heldWorkerForJob(job);
155 if (worker) {
156 return worker;
157 }
158
159 QUrl url = SimpleJobPrivate::get(job)->m_url;
160 // TODO take port, username and password into account
161 QMultiHash<QString, Worker *>::Iterator it = m_idleWorkers.find(url.host());
162 if (it == m_idleWorkers.end()) {
163 it = m_idleWorkers.begin();
164 }
165 if (it == m_idleWorkers.end()) {
166 return nullptr;
167 }
168 worker = it.value();
169 m_idleWorkers.erase(it);
170 return worker;
171}
172
173bool WorkerManager::removeWorker(Worker *worker)
174{
175 // ### performance not so great
177 for (; it != m_idleWorkers.end(); ++it) {
178 if (it.value() == worker) {
179 m_idleWorkers.erase(it);
180 return true;
181 }
182 }
183 return false;
184}
185
186void WorkerManager::clear()
187{
188 m_idleWorkers.clear();
189}
190
191QList<Worker *> WorkerManager::allWorkers() const
192{
193 return m_idleWorkers.values();
194}
195
196void WorkerManager::scheduleGrimReaper()
197{
198 if (!m_grimTimer.isActive()) {
199 m_grimTimer.start((s_idleWorkerLifetime / 2) * 1000);
200 }
201}
202
203// private slot
204void WorkerManager::grimReaper()
205{
207 while (it != m_idleWorkers.end()) {
208 Worker *worker = it.value();
209 if (worker->idleTime() >= s_idleWorkerLifetime) {
210 it = m_idleWorkers.erase(it);
211 if (worker->job()) {
212 // qDebug() << "Idle worker" << worker << "still has job" << worker->job();
213 }
214 // avoid invoking slotWorkerDied() because its cleanup services are not needed
215 worker->kill();
216 } else {
217 ++it;
218 }
219 }
220 if (!m_idleWorkers.isEmpty()) {
221 scheduleGrimReaper();
222 }
223}
224
225int HostQueue::lowestSerial() const
226{
228 if (first != m_queuedJobs.constEnd()) {
229 return first.key();
230 }
231 return SerialPicker::maxSerial;
232}
233
234void HostQueue::queueJob(SimpleJob *job)
235{
236 const int serial = SimpleJobPrivate::get(job)->m_schedSerial;
237 Q_ASSERT(serial != 0);
238 Q_ASSERT(!m_queuedJobs.contains(serial));
239 Q_ASSERT(!m_runningJobs.contains(job));
240 m_queuedJobs.insert(serial, job);
241}
242
243SimpleJob *HostQueue::takeFirstInQueue()
244{
245 Q_ASSERT(!m_queuedJobs.isEmpty());
246 QMap<int, SimpleJob *>::iterator first = m_queuedJobs.begin();
247 SimpleJob *job = first.value();
248 m_queuedJobs.erase(first);
249 m_runningJobs.insert(job);
250 return job;
251}
252
253bool HostQueue::removeJob(SimpleJob *job)
254{
255 const int serial = SimpleJobPrivate::get(job)->m_schedSerial;
256 if (m_runningJobs.remove(job)) {
257 Q_ASSERT(!m_queuedJobs.contains(serial));
258 return true;
259 }
260 if (m_queuedJobs.remove(serial)) {
261 return true;
262 }
263 return false;
264}
265
266QList<Worker *> HostQueue::allWorkers() const
267{
268 QList<Worker *> ret;
269 ret.reserve(m_runningJobs.size());
270 for (SimpleJob *job : m_runningJobs) {
271 Worker *worker = jobSWorker(job);
272 Q_ASSERT(worker);
273 ret.append(worker);
274 }
275 return ret;
276}
277
278static void ensureNoDuplicates(QMap<int, HostQueue *> *queuesBySerial)
279{
280 Q_UNUSED(queuesBySerial);
281#ifdef SCHEDULER_DEBUG
282 // a host queue may *never* be in queuesBySerial twice.
284 auto it = queuesBySerial->cbegin();
285 for (; it != queuesBySerial->cend(); ++it) {
286 Q_ASSERT(!seen.contains(it.value()));
287 seen.insert(it.value());
288 }
289#endif
290}
291
292static void verifyRunningJobsCount(QHash<QString, HostQueue> *queues, int runningJobsCount)
293{
294 Q_UNUSED(queues);
295 Q_UNUSED(runningJobsCount);
296#ifdef SCHEDULER_DEBUG
297 int realRunningJobsCount = 0;
298 auto it = queues->cbegin();
299 for (; it != queues->cend(); ++it) {
300 realRunningJobsCount += it.value().runningJobsCount();
301 }
302 Q_ASSERT(realRunningJobsCount == runningJobsCount);
303
304 // ...and of course we may never run the same job twice!
305 QSet<SimpleJob *> seenJobs;
306 auto it2 = queues->cbegin();
307 for (; it2 != queues->cend(); ++it2) {
308 for (SimpleJob *job : it2.value().runningJobs()) {
309 Q_ASSERT(!seenJobs.contains(job));
310 seenJobs.insert(job);
311 }
312 }
313#endif
314}
315
316ProtoQueue::ProtoQueue(int maxWorkers, int maxWorkersPerHost)
317 : m_maxConnectionsPerHost(maxWorkersPerHost ? maxWorkersPerHost : maxWorkers)
318 , m_maxConnectionsTotal(qMax(maxWorkers, maxWorkersPerHost))
319 , m_runningJobsCount(0)
320
321{
322 /*qDebug() << "m_maxConnectionsTotal:" << m_maxConnectionsTotal
323 << "m_maxConnectionsPerHost:" << m_maxConnectionsPerHost;*/
324 Q_ASSERT(m_maxConnectionsPerHost >= 1);
325 Q_ASSERT(maxWorkers >= maxWorkersPerHost);
326 m_startJobTimer.setSingleShot(true);
327 connect(&m_startJobTimer, &QTimer::timeout, this, &ProtoQueue::startAJob);
328}
329
330ProtoQueue::~ProtoQueue()
331{
332 // Gather list of all workers first
333 const QList<Worker *> workers = allWorkers();
334 // Clear the idle workers in the manager to avoid dangling pointers
335 m_workerManager.clear();
336 for (Worker *worker : workers) {
337 // kill the worker process and remove the interface in our process
338 worker->kill();
339 }
340}
341
342void ProtoQueue::queueJob(SimpleJob *job)
343{
344 QString hostname = SimpleJobPrivate::get(job)->m_url.host();
345 HostQueue &hq = m_queuesByHostname[hostname];
346 const int prevLowestSerial = hq.lowestSerial();
347 Q_ASSERT(hq.runningJobsCount() <= m_maxConnectionsPerHost);
348
349 // nevert insert a job twice
350 Q_ASSERT(SimpleJobPrivate::get(job)->m_schedSerial == 0);
351 SimpleJobPrivate::get(job)->m_schedSerial = m_serialPicker.next();
352
353 const bool wasQueueEmpty = hq.isQueueEmpty();
354 hq.queueJob(job);
355 // note that HostQueue::queueJob() into an empty queue changes its lowestSerial() too...
356 // the queue's lowest serial job may have changed, so update the ordered list of queues.
357 // however, we ignore all jobs that would cause more connections to a host than allowed.
358 if (prevLowestSerial != hq.lowestSerial()) {
359 if (hq.runningJobsCount() < m_maxConnectionsPerHost) {
360 // if the connection limit didn't keep the HQ unscheduled it must have been lack of jobs
361 if (m_queuesBySerial.remove(prevLowestSerial) == 0) {
362 Q_UNUSED(wasQueueEmpty);
363 Q_ASSERT(wasQueueEmpty);
364 }
365 m_queuesBySerial.insert(hq.lowestSerial(), &hq);
366 } else {
367#ifdef SCHEDULER_DEBUG
368 // ### this assertion may fail if the limits were modified at runtime!
369 // if the per-host connection limit is already reached the host queue's lowest serial
370 // should not be queued.
371 Q_ASSERT(!m_queuesBySerial.contains(prevLowestSerial));
372#endif
373 }
374 }
375 // just in case; startAJob() will refuse to start a job if it shouldn't.
376 m_startJobTimer.start();
377
378 ensureNoDuplicates(&m_queuesBySerial);
379}
380
381void ProtoQueue::changeJobPriority(SimpleJob *job, int newPrio)
382{
383 SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(job);
384 QHash<QString, HostQueue>::Iterator it = m_queuesByHostname.find(jobPriv->m_url.host());
385 if (it == m_queuesByHostname.end()) {
386 return;
387 }
388 HostQueue &hq = it.value();
389 const int prevLowestSerial = hq.lowestSerial();
390 if (hq.isJobRunning(job) || !hq.removeJob(job)) {
391 return;
392 }
393 jobPriv->m_schedSerial = m_serialPicker.changedPrioritySerial(jobPriv->m_schedSerial, newPrio);
394 hq.queueJob(job);
395 const bool needReinsert = hq.lowestSerial() != prevLowestSerial;
396 // the host queue might be absent from m_queuesBySerial because the connections per host limit
397 // for that host has been reached.
398 if (needReinsert && m_queuesBySerial.remove(prevLowestSerial)) {
399 m_queuesBySerial.insert(hq.lowestSerial(), &hq);
400 }
401 ensureNoDuplicates(&m_queuesBySerial);
402}
403
404void ProtoQueue::removeJob(SimpleJob *job)
405{
406 SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(job);
407 HostQueue &hq = m_queuesByHostname[jobPriv->m_url.host()];
408 const int prevLowestSerial = hq.lowestSerial();
409 const int prevRunningJobs = hq.runningJobsCount();
410
411 Q_ASSERT(hq.runningJobsCount() <= m_maxConnectionsPerHost);
412
413 if (hq.removeJob(job)) {
414 if (hq.lowestSerial() != prevLowestSerial) {
415 // we have dequeued the not yet running job with the lowest serial
416 Q_ASSERT(!jobPriv->m_worker);
417 Q_ASSERT(prevRunningJobs == hq.runningJobsCount());
418 if (m_queuesBySerial.remove(prevLowestSerial) == 0) {
419 // make sure that the queue was not scheduled for a good reason
420 Q_ASSERT(hq.runningJobsCount() == m_maxConnectionsPerHost);
421 }
422 } else {
423 if (prevRunningJobs != hq.runningJobsCount()) {
424 // we have dequeued a previously running job
425 Q_ASSERT(prevRunningJobs - 1 == hq.runningJobsCount());
426 m_runningJobsCount--;
427 Q_ASSERT(m_runningJobsCount >= 0);
428 }
429 }
430 if (!hq.isQueueEmpty() && hq.runningJobsCount() < m_maxConnectionsPerHost) {
431 // this may be a no-op, but it's faster than first checking if it's already in.
432 m_queuesBySerial.insert(hq.lowestSerial(), &hq);
433 }
434
435 if (hq.isEmpty()) {
436 // no queued jobs, no running jobs. this destroys hq from above.
437 m_queuesByHostname.remove(jobPriv->m_url.host());
438 }
439
440 if (jobPriv->m_worker && jobPriv->m_worker->isAlive()) {
441 m_workerManager.returnWorker(jobPriv->m_worker);
442 }
443 // just in case; startAJob() will refuse to start a job if it shouldn't.
444 m_startJobTimer.start();
445 }
446
447 ensureNoDuplicates(&m_queuesBySerial);
448}
449
450Worker *ProtoQueue::createWorker(const QString &protocol, SimpleJob *job, const QUrl &url)
451{
452 int error;
453 QString errortext;
454 Worker *worker = Worker::createWorker(protocol, url, error, errortext);
455 if (worker) {
456 connect(worker, &Worker::workerDied, scheduler(), [](KIO::Worker *worker) {
457 schedulerPrivate()->slotWorkerDied(worker);
458 });
459 } else {
460 qCWarning(KIO_CORE) << "couldn't create worker:" << errortext;
461 if (job) {
462 job->slotError(error, errortext);
463 }
464 }
465 return worker;
466}
467
468bool ProtoQueue::removeWorker(KIO::Worker *worker)
469{
470 const bool removed = m_workerManager.removeWorker(worker);
471 return removed;
472}
473
474QList<Worker *> ProtoQueue::allWorkers() const
475{
476 QList<Worker *> ret(m_workerManager.allWorkers());
477 auto it = m_queuesByHostname.cbegin();
478 for (; it != m_queuesByHostname.cend(); ++it) {
479 ret.append(it.value().allWorkers());
480 }
481
482 return ret;
483}
484
485// private slot
486void ProtoQueue::startAJob()
487{
488 ensureNoDuplicates(&m_queuesBySerial);
489 verifyRunningJobsCount(&m_queuesByHostname, m_runningJobsCount);
490
491#ifdef SCHEDULER_DEBUG
492 // qDebug() << "m_runningJobsCount:" << m_runningJobsCount;
493 auto it = m_queuesByHostname.cbegin();
494 for (; it != m_queuesByHostname.cend(); ++it) {
495 const QList<KIO::SimpleJob *> list = it.value().runningJobs();
496 for (SimpleJob *job : list) {
497 // qDebug() << SimpleJobPrivate::get(job)->m_url;
498 }
499 }
500#endif
501 if (m_runningJobsCount >= m_maxConnectionsTotal) {
502#ifdef SCHEDULER_DEBUG
503 // qDebug() << "not starting any jobs because maxConnectionsTotal has been reached.";
504#endif
505 return;
506 }
507
508 QMap<int, HostQueue *>::iterator first = m_queuesBySerial.begin();
509 if (first != m_queuesBySerial.end()) {
510 // pick a job and maintain the queue invariant: lower serials first
511 HostQueue *hq = first.value();
512 const int prevLowestSerial = first.key();
513 Q_UNUSED(prevLowestSerial);
514 Q_ASSERT(hq->lowestSerial() == prevLowestSerial);
515 // the following assertions should hold due to queueJob(), takeFirstInQueue() and
516 // removeJob() being correct
517 Q_ASSERT(hq->runningJobsCount() < m_maxConnectionsPerHost);
518 SimpleJob *startingJob = hq->takeFirstInQueue();
519 Q_ASSERT(hq->runningJobsCount() <= m_maxConnectionsPerHost);
520 Q_ASSERT(hq->lowestSerial() != prevLowestSerial);
521
522 m_queuesBySerial.erase(first);
523 // we've increased hq's runningJobsCount() by calling nexStartingJob()
524 // so we need to check again.
525 if (!hq->isQueueEmpty() && hq->runningJobsCount() < m_maxConnectionsPerHost) {
526 m_queuesBySerial.insert(hq->lowestSerial(), hq);
527 }
528
529 // always increase m_runningJobsCount because it's correct if there is a worker and if there
530 // is no worker, removeJob() will balance the number again. removeJob() would decrease the
531 // number too much otherwise.
532 // Note that createWorker() can call slotError() on a job which in turn calls removeJob(),
533 // so increase the count here already.
534 m_runningJobsCount++;
535
536 bool isNewWorker = false;
537 Worker *worker = m_workerManager.takeWorkerForJob(startingJob);
538 SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(startingJob);
539 if (!worker) {
540 isNewWorker = true;
541 worker = createWorker(jobPriv->m_protocol, startingJob, jobPriv->m_url);
542 }
543
544 if (worker) {
545 jobPriv->m_worker = worker;
546 schedulerPrivate()->setupWorker(worker, jobPriv->m_url, jobPriv->m_protocol, isNewWorker);
547 startJob(startingJob, worker);
548 } else {
549 // dispose of our records about the job and mark the job as unknown
550 // (to prevent crashes later)
551 // note that the job's slotError() can have called removeJob() first, so check that
552 // it's not a ghost job with null serial already.
553 if (jobPriv->m_schedSerial) {
554 removeJob(startingJob);
555 jobPriv->m_schedSerial = 0;
556 }
557 }
558 } else {
559#ifdef SCHEDULER_DEBUG
560 // qDebug() << "not starting any jobs because there is no queued job.";
561#endif
562 }
563
564 if (!m_queuesBySerial.isEmpty()) {
565 m_startJobTimer.start();
566 }
567}
568
569Scheduler::Scheduler()
570{
571 setObjectName(QStringLiteral("scheduler"));
572
573#ifdef WITH_QTDBUS
574 const QString dbusPath = QStringLiteral("/KIO/Scheduler");
575 const QString dbusInterface = QStringLiteral("org.kde.KIO.Scheduler");
577 // Not needed, right? We just want to emit two signals.
578 // dbus.registerObject(dbusPath, this, QDBusConnection::ExportScriptableSlots |
579 // QDBusConnection::ExportScriptableSignals);
580 dbus.connect(QString(),
581 dbusPath,
582 dbusInterface,
583 QStringLiteral("reparseSlaveConfiguration"),
584 this,
585 SLOT(slotReparseSlaveConfiguration(QString, QDBusMessage)));
586#endif
587}
588
589Scheduler::~Scheduler()
590{
591}
592
593void Scheduler::doJob(SimpleJob *job)
594{
595 schedulerPrivate()->doJob(job);
596}
597
598// static
599void Scheduler::cancelJob(SimpleJob *job)
600{
601 schedulerPrivate()->cancelJob(job);
602}
603
604void Scheduler::jobFinished(KIO::SimpleJob *job, KIO::Worker *worker)
605{
606 schedulerPrivate()->jobFinished(job, worker);
607}
608
609void Scheduler::putWorkerOnHold(KIO::SimpleJob *job, const QUrl &url)
610{
611 schedulerPrivate()->putWorkerOnHold(job, url);
612}
613
614void Scheduler::removeWorkerOnHold()
615{
616 schedulerPrivate()->removeWorkerOnHold();
617}
618
619bool Scheduler::isWorkerOnHoldFor(const QUrl &url)
620{
621 return schedulerPrivate()->isWorkerOnHoldFor(url);
622}
623
624void Scheduler::updateInternalMetaData(SimpleJob *job)
625{
626 schedulerPrivate()->updateInternalMetaData(job);
627}
628
629void Scheduler::emitReparseSlaveConfiguration()
630{
631#ifdef WITH_QTDBUS
632 // Do it immediately in this process, otherwise we might send a request before reparsing
633 // (e.g. when changing useragent in the plugin)
634 schedulerPrivate()->slotReparseSlaveConfiguration(QString(), QDBusMessage());
635#endif
636
637 schedulerPrivate()->m_ignoreConfigReparse = true;
638 Q_EMIT self()->reparseSlaveConfiguration(QString());
639}
640
641#ifdef WITH_QTDBUS
642void SchedulerPrivate::slotReparseSlaveConfiguration(const QString &proto, const QDBusMessage &)
643{
644 if (m_ignoreConfigReparse) {
645 // qDebug() << "Ignoring signal sent by myself";
646 m_ignoreConfigReparse = false;
647 return;
648 }
649
650 // qDebug() << "proto=" << proto;
652 WorkerConfig::self()->reset();
653
654 QHash<QString, ProtoQueue *>::ConstIterator it = proto.isEmpty() ? m_protocols.constBegin() : m_protocols.constFind(proto);
656
657 // not found?
658 if (it == endIt) {
659 return;
660 }
661
662 if (!proto.isEmpty()) {
663 endIt = it;
664 ++endIt;
665 }
666
667 for (; it != endIt; ++it) {
668 const QList<KIO::Worker *> list = it.value()->allWorkers();
669 for (Worker *worker : list) {
670 worker->send(CMD_REPARSECONFIGURATION);
671 worker->resetHost();
672 }
673 }
674}
675#endif
676
677void SchedulerPrivate::doJob(SimpleJob *job)
678{
679 // qDebug() << job;
680 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job);
681 jobPriv->m_protocol = job->url().scheme();
682
683 ProtoQueue *proto = protoQ(jobPriv->m_protocol, job->url().host());
684 proto->queueJob(job);
685}
686
687void SchedulerPrivate::setJobPriority(SimpleJob *job, int priority)
688{
689 // qDebug() << job << priority;
690 const QString protocol = SimpleJobPrivate::get(job)->m_protocol;
691 if (!protocol.isEmpty()) {
692 ProtoQueue *proto = protoQ(protocol, job->url().host());
693 proto->changeJobPriority(job, priority);
694 }
695}
696
697void SchedulerPrivate::cancelJob(SimpleJob *job)
698{
699 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job);
700 // this method is called all over the place in job.cpp, so just do this check here to avoid
701 // much boilerplate in job code.
702 if (jobPriv->m_schedSerial == 0) {
703 // qDebug() << "Doing nothing because I don't know job" << job;
704 return;
705 }
706 Worker *worker = jobSWorker(job);
707 // qDebug() << job << worker;
708 jobFinished(job, worker);
709 if (worker) {
710 ProtoQueue *pq = m_protocols.value(jobPriv->m_protocol);
711 if (pq) {
712 pq->removeWorker(worker);
713 }
714 worker->kill(); // don't use worker after this!
715 }
716}
717
718void SchedulerPrivate::jobFinished(SimpleJob *job, Worker *worker)
719{
720 // qDebug() << job << worker;
721 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job);
722
723 // make sure that we knew about the job!
724 Q_ASSERT(jobPriv->m_schedSerial);
725
726 ProtoQueue *pq = m_protocols.value(jobPriv->m_protocol);
727 if (pq) {
728 pq->removeJob(job);
729 }
730
731 if (worker) {
732 // If we have internal meta-data, tell existing KIO workers to reload
733 // their configuration.
734 if (jobPriv->m_internalMetaData.count()) {
735 // qDebug() << "Updating KIO workers with new internal metadata information";
736 ProtoQueue *queue = m_protocols.value(worker->protocol());
737 if (queue) {
738 const QList<Worker *> workers = queue->allWorkers();
739 for (auto *runningWorker : workers) {
740 if (worker->host() == runningWorker->host()) {
741 worker->setConfig(metaDataFor(worker->protocol(), job->url()));
742 /*qDebug() << "Updated configuration of" << worker->protocol()
743 << "KIO worker, pid=" << worker->worker_pid();*/
744 }
745 }
746 }
747 }
748 worker->setJob(nullptr);
749 worker->disconnect(job);
750 }
751 jobPriv->m_schedSerial = 0; // this marks the job as unscheduled again
752 jobPriv->m_worker = nullptr;
753 // Clear the values in the internal metadata container since they have
754 // already been taken care of above...
755 jobPriv->m_internalMetaData.clear();
756}
757
758MetaData SchedulerPrivate::metaDataFor(const QString &protocol, const QUrl &url)
759{
760 const QString host = url.host();
761 MetaData configData = WorkerConfig::self()->configData(protocol, host);
762
763 return configData;
764}
765
766void SchedulerPrivate::setupWorker(KIO::Worker *worker, const QUrl &url, const QString &protocol, bool newWorker, const KIO::MetaData *config)
767{
768 int port = url.port();
769 if (port == -1) { // no port is -1 in QUrl, but in kde3 we used 0 and the KIO workers assume that.
770 port = 0;
771 }
772 const QString host = url.host();
773 const QString user = url.userName();
774 const QString passwd = url.password();
775
776 if (newWorker || worker->host() != host || worker->port() != port || worker->user() != user || worker->passwd() != passwd) {
777 MetaData configData = metaDataFor(protocol, url);
778 if (config) {
779 configData += *config;
780 }
781
782 worker->setConfig(configData);
783 worker->setProtocol(url.scheme());
784 worker->setHost(host, port, user, passwd);
785 }
786}
787
788void SchedulerPrivate::slotWorkerDied(KIO::Worker *worker)
789{
790 // qDebug() << worker;
791 Q_ASSERT(worker);
792 Q_ASSERT(!worker->isAlive());
793 ProtoQueue *pq = m_protocols.value(worker->protocol());
794 if (pq) {
795 if (worker->job()) {
796 pq->removeJob(worker->job());
797 }
798 // in case this was a connected worker...
799 pq->removeWorker(worker);
800 }
801 if (worker == m_workerOnHold) {
802 m_workerOnHold = nullptr;
803 m_urlOnHold.clear();
804 }
805 // can't use worker->deref() here because we need to use deleteLater
806 worker->aboutToDelete();
807 worker->deleteLater();
808}
809
810void SchedulerPrivate::putWorkerOnHold(KIO::SimpleJob *job, const QUrl &url)
811{
812 Worker *worker = jobSWorker(job);
813 // qDebug() << job << url << worker;
814 worker->disconnect(job);
815 // prevent the fake death of the worker from trying to kill the job again;
816 // cf. Worker::hold(const QUrl &url) called in SchedulerPrivate::publishWorkerOnHold().
817 worker->setJob(nullptr);
818 SimpleJobPrivate::get(job)->m_worker = nullptr;
819
820 if (m_workerOnHold) {
821 m_workerOnHold->kill();
822 }
823 m_workerOnHold = worker;
824 m_urlOnHold = url;
825 m_workerOnHold->suspend();
826}
827
828bool SchedulerPrivate::isWorkerOnHoldFor(const QUrl &url)
829{
830 if (url.isValid() && m_urlOnHold.isValid() && url == m_urlOnHold) {
831 return true;
832 }
833
834 return false;
835}
836
837Worker *SchedulerPrivate::heldWorkerForJob(SimpleJob *job)
838{
839 Worker *worker = nullptr;
840 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job);
841
842 if (m_workerOnHold) {
843 // Make sure that the job wants to do a GET or a POST, and with no offset
844 const int cmd = jobPriv->m_command;
845 bool canJobReuse = (cmd == CMD_GET);
846
847 if (KIO::TransferJob *tJob = qobject_cast<KIO::TransferJob *>(job)) {
848 canJobReuse = (canJobReuse || cmd == CMD_SPECIAL);
849 if (canJobReuse) {
850 KIO::MetaData outgoing = tJob->outgoingMetaData();
851 const QString resume = outgoing.value(QStringLiteral("resume"));
852 const QString rangeStart = outgoing.value(QStringLiteral("range-start"));
853 // qDebug() << "Resume metadata is" << resume;
854 canJobReuse = (resume.isEmpty() || resume == QLatin1Char('0')) && (rangeStart.isEmpty() || rangeStart == QLatin1Char('0'));
855 }
856 }
857
858 if (job->url() == m_urlOnHold) {
859 if (canJobReuse) {
860 // qDebug() << "HOLD: Reusing held worker (" << m_workerOnHold << ")";
861 worker = m_workerOnHold;
862 } else {
863 // qDebug() << "HOLD: Discarding held worker (" << m_workerOnHold << ")";
864 m_workerOnHold->kill();
865 }
866 m_workerOnHold = nullptr;
867 m_urlOnHold.clear();
868 }
869 }
870
871 return worker;
872}
873
874void SchedulerPrivate::removeWorkerOnHold()
875{
876 // qDebug() << m_workerOnHold;
877 if (m_workerOnHold) {
878 m_workerOnHold->kill();
879 }
880 m_workerOnHold = nullptr;
881 m_urlOnHold.clear();
882}
883
884ProtoQueue *SchedulerPrivate::protoQ(const QString &protocol, const QString &host)
885{
886 ProtoQueue *pq = m_protocols.value(protocol, nullptr);
887 if (!pq) {
888 // qDebug() << "creating ProtoQueue instance for" << protocol;
889
890 const int maxWorkers = KProtocolInfo::maxWorkers(protocol);
891 int maxWorkersPerHost = -1;
892 if (!host.isEmpty()) {
893 bool ok = false;
894 const int value = WorkerConfig::self()->configData(protocol, host, QStringLiteral("MaxConnections")).toInt(&ok);
895 if (ok) {
896 maxWorkersPerHost = value;
897 }
898 }
899 if (maxWorkersPerHost == -1) {
900 maxWorkersPerHost = KProtocolInfo::maxWorkersPerHost(protocol);
901 }
902 // Never allow maxWorkersPerHost to exceed maxWorkers.
903 pq = new ProtoQueue(maxWorkers, qMin(maxWorkers, maxWorkersPerHost));
904 m_protocols.insert(protocol, pq);
905 }
906 return pq;
907}
908
909void SchedulerPrivate::updateInternalMetaData(SimpleJob *job)
910{
911 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job);
912 // Preserve all internal meta-data so they can be sent back to the
913 // KIO workers as needed...
914 const QUrl jobUrl = job->url();
915
916 const QLatin1String currHostToken("{internal~currenthost}");
917 const QLatin1String allHostsToken("{internal~allhosts}");
918 // qDebug() << job << jobPriv->m_internalMetaData;
919 QMapIterator<QString, QString> it(jobPriv->m_internalMetaData);
920 while (it.hasNext()) {
921 it.next();
922 if (it.key().startsWith(currHostToken, Qt::CaseInsensitive)) {
923 WorkerConfig::self()->setConfigData(jobUrl.scheme(), jobUrl.host(), it.key().mid(currHostToken.size()), it.value());
924 } else if (it.key().startsWith(allHostsToken, Qt::CaseInsensitive)) {
925 WorkerConfig::self()->setConfigData(jobUrl.scheme(), QString(), it.key().mid(allHostsToken.size()), it.value());
926 }
927 }
928}
929
930#include "moc_scheduler.cpp"
931#include "moc_scheduler_p.cpp"
MetaData is a simple map of key/value strings.
A simple job (one url and one command).
void slotError(int, const QString &)
const QUrl & url() const
Returns the SimpleJob's URL.
Definition simplejob.cpp:70
The transfer job pumps data into and/or out of a KIO worker.
static int maxWorkersPerHost(const QString &protocol)
Returns the limit on the number of KIO workers for this protocol per host.
static int maxWorkers(const QString &protocol)
Returns the soft limit on the number of KIO workers for this protocol.
static void reparseConfiguration()
Force a reload of the general config file of KIO workers ( kioslaverc).
A namespace for KIO globals.
void error(QWidget *parent, const QString &text, const QString &title, const KGuiItem &buttonOk, Options options=Notify)
KIOCORE_EXPORT QStringList list(const QString &fileClass)
Returns a list of directories associated with this file-class.
NETWORKMANAGERQT_EXPORT QString hostname()
bool connect(const QString &service, const QString &path, const QString &interface, const QString &name, QObject *receiver, const char *slot)
QDBusConnection sessionBus()
const_iterator cbegin() const const
const_iterator cend() const const
const_iterator constBegin() const const
const_iterator constEnd() const const
const_iterator constFind(const Key &key) const const
iterator insert(const Key &key, const T &value)
Key key(const T &value) const const
T value(const Key &key) const const
void append(QList< T > &&value)
void reserve(qsizetype size)
iterator begin()
const_iterator cbegin() const const
const_iterator cend() const const
const_iterator constBegin() const const
Key key(const T &value, const Key &defaultKey) const const
T value(const Key &key, const T &defaultValue) const const
iterator begin()
iterator end()
iterator find(const Key &key, const T &value)
T value(const Key &key) const const
Q_EMITQ_EMIT
void setObjectName(QAnyStringView name)
bool contains(const QSet< T > &other) const const
iterator insert(const T &value)
bool isEmpty() const const
CaseInsensitive
QFuture< ArgsType< Signal > > connect(Sender *sender, Signal signal)
void timeout()
void clear()
QString host(ComponentFormattingOptions options) const const
bool isValid() const const
QString password(ComponentFormattingOptions options) const const
int port(int defaultPort) const const
QString scheme() const const
QString userName(ComponentFormattingOptions options) const const
This file is part of the KDE documentation.
Documentation copyright © 1996-2024 The KDE developers.
Generated on Fri Jul 26 2024 11:54:08 by doxygen 1.11.0 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.