KIO

scheduler.cpp
1/*
2 This file is part of the KDE libraries
3 SPDX-FileCopyrightText: 2000 Stephan Kulow <coolo@kde.org>
4 SPDX-FileCopyrightText: 2000 Waldo Bastian <bastian@kde.org>
5 SPDX-FileCopyrightText: 2009, 2010 Andreas Hartmetz <ahartmetz@gmail.com>
6
7 SPDX-License-Identifier: LGPL-2.0-only
8*/
9
10#include "scheduler.h"
11#include "scheduler_p.h"
12
13#include "connection_p.h"
14#include "job_p.h"
15#include "kprotocolmanager_p.h"
16#include "sessiondata_p.h"
17#include "worker_p.h"
18#include "workerconfig.h"
19
20#include <kprotocolinfo.h>
21#include <kprotocolmanager.h>
22
23#ifdef WITH_QTDBUS
24#include <QDBusConnection>
25#include <QDBusMessage>
26#endif
27#include <QHash>
28#include <QThread>
29#include <QThreadStorage>
30
31// Workers may be idle for a certain time (3 minutes) before they are killed.
32static const int s_idleWorkerLifetime = 3 * 60;
33
34using namespace KIO;
35
36static inline Worker *jobSWorker(SimpleJob *job)
37{
38 return SimpleJobPrivate::get(job)->m_worker;
39}
40
41static inline int jobCommand(SimpleJob *job)
42{
43 return SimpleJobPrivate::get(job)->m_command;
44}
45
46static inline void startJob(SimpleJob *job, Worker *worker)
47{
48 SimpleJobPrivate::get(job)->start(worker);
49}
50
51class KIO::SchedulerPrivate
52{
53public:
54 SchedulerPrivate()
55 : q(new Scheduler())
56 {
57 }
58
59 ~SchedulerPrivate()
60 {
61 removeWorkerOnHold();
62 delete q;
63 q = nullptr;
64 qDeleteAll(m_protocols); // ~ProtoQueue will kill and delete all workers
65 }
66
67 SchedulerPrivate(const SchedulerPrivate &) = delete;
68 SchedulerPrivate &operator=(const SchedulerPrivate &) = delete;
69
70 Scheduler *q;
71
72 Worker *m_workerOnHold = nullptr;
73 QUrl m_urlOnHold;
74 bool m_ignoreConfigReparse = false;
75
76 SessionData sessionData;
77
78 void doJob(SimpleJob *job);
79 void setJobPriority(SimpleJob *job, int priority);
80 void cancelJob(SimpleJob *job);
81 void jobFinished(KIO::SimpleJob *job, KIO::Worker *worker);
82 void putWorkerOnHold(KIO::SimpleJob *job, const QUrl &url);
83 void removeWorkerOnHold();
84 Worker *heldWorkerForJob(KIO::SimpleJob *job);
85 bool isWorkerOnHoldFor(const QUrl &url);
86 void updateInternalMetaData(SimpleJob *job);
87
88 MetaData metaDataFor(const QString &protocol, const QStringList &proxyList, const QUrl &url);
89 void setupWorker(KIO::Worker *worker,
90 const QUrl &url,
91 const QString &protocol,
92 const QStringList &proxyList,
93 bool newWorker,
94 const KIO::MetaData *config = nullptr);
95
96 void slotWorkerDied(KIO::Worker *worker);
97
98#ifdef WITH_QTDBUS
99 void slotReparseSlaveConfiguration(const QString &, const QDBusMessage &);
100#endif
101
102 ProtoQueue *protoQ(const QString &protocol, const QString &host);
103
104private:
106};
107
109static SchedulerPrivate *schedulerPrivate()
110{
111 if (!s_storage.hasLocalData()) {
112 s_storage.setLocalData(new SchedulerPrivate);
113 }
114 return s_storage.localData();
115}
116
117Scheduler *Scheduler::self()
118{
119 return schedulerPrivate()->q;
120}
121
122SchedulerPrivate *Scheduler::d_func()
123{
124 return schedulerPrivate();
125}
126
127// static
128Scheduler *scheduler()
129{
130 return schedulerPrivate()->q;
131}
132
133////////////////////////////
134
135int SerialPicker::changedPrioritySerial(int oldSerial, int newPriority) const
136{
137 Q_ASSERT(newPriority >= -10 && newPriority <= 10);
138 newPriority = qBound(-10, newPriority, 10);
139 int unbiasedSerial = oldSerial % m_jobsPerPriority;
140 return unbiasedSerial + newPriority * m_jobsPerPriority;
141}
142
143WorkerManager::WorkerManager()
144{
145 m_grimTimer.setSingleShot(true);
146 connect(&m_grimTimer, &QTimer::timeout, this, &WorkerManager::grimReaper);
147}
148
149WorkerManager::~WorkerManager()
150{
151 grimReaper();
152}
153
154void WorkerManager::returnWorker(Worker *worker)
155{
156 Q_ASSERT(worker);
157 worker->setIdle();
158 m_idleWorkers.insert(worker->host(), worker);
159 scheduleGrimReaper();
160}
161
162Worker *WorkerManager::takeWorkerForJob(SimpleJob *job)
163{
164 Worker *worker = schedulerPrivate()->heldWorkerForJob(job);
165 if (worker) {
166 return worker;
167 }
168
169 QUrl url = SimpleJobPrivate::get(job)->m_url;
170 // TODO take port, username and password into account
171 QMultiHash<QString, Worker *>::Iterator it = m_idleWorkers.find(url.host());
172 if (it == m_idleWorkers.end()) {
173 it = m_idleWorkers.begin();
174 }
175 if (it == m_idleWorkers.end()) {
176 return nullptr;
177 }
178 worker = it.value();
179 m_idleWorkers.erase(it);
180 return worker;
181}
182
183bool WorkerManager::removeWorker(Worker *worker)
184{
185 // ### performance not so great
187 for (; it != m_idleWorkers.end(); ++it) {
188 if (it.value() == worker) {
189 m_idleWorkers.erase(it);
190 return true;
191 }
192 }
193 return false;
194}
195
196void WorkerManager::clear()
197{
198 m_idleWorkers.clear();
199}
200
201QList<Worker *> WorkerManager::allWorkers() const
202{
203 return m_idleWorkers.values();
204}
205
206void WorkerManager::scheduleGrimReaper()
207{
208 if (!m_grimTimer.isActive()) {
209 m_grimTimer.start((s_idleWorkerLifetime / 2) * 1000);
210 }
211}
212
213// private slot
214void WorkerManager::grimReaper()
215{
217 while (it != m_idleWorkers.end()) {
218 Worker *worker = it.value();
219 if (worker->idleTime() >= s_idleWorkerLifetime) {
220 it = m_idleWorkers.erase(it);
221 if (worker->job()) {
222 // qDebug() << "Idle worker" << worker << "still has job" << worker->job();
223 }
224 // avoid invoking slotWorkerDied() because its cleanup services are not needed
225 worker->kill();
226 } else {
227 ++it;
228 }
229 }
230 if (!m_idleWorkers.isEmpty()) {
231 scheduleGrimReaper();
232 }
233}
234
235int HostQueue::lowestSerial() const
236{
238 if (first != m_queuedJobs.constEnd()) {
239 return first.key();
240 }
241 return SerialPicker::maxSerial;
242}
243
244void HostQueue::queueJob(SimpleJob *job)
245{
246 const int serial = SimpleJobPrivate::get(job)->m_schedSerial;
247 Q_ASSERT(serial != 0);
248 Q_ASSERT(!m_queuedJobs.contains(serial));
249 Q_ASSERT(!m_runningJobs.contains(job));
250 m_queuedJobs.insert(serial, job);
251}
252
253SimpleJob *HostQueue::takeFirstInQueue()
254{
255 Q_ASSERT(!m_queuedJobs.isEmpty());
256 QMap<int, SimpleJob *>::iterator first = m_queuedJobs.begin();
257 SimpleJob *job = first.value();
258 m_queuedJobs.erase(first);
259 m_runningJobs.insert(job);
260 return job;
261}
262
263bool HostQueue::removeJob(SimpleJob *job)
264{
265 const int serial = SimpleJobPrivate::get(job)->m_schedSerial;
266 if (m_runningJobs.remove(job)) {
267 Q_ASSERT(!m_queuedJobs.contains(serial));
268 return true;
269 }
270 if (m_queuedJobs.remove(serial)) {
271 return true;
272 }
273 return false;
274}
275
276QList<Worker *> HostQueue::allWorkers() const
277{
278 QList<Worker *> ret;
279 ret.reserve(m_runningJobs.size());
280 for (SimpleJob *job : m_runningJobs) {
281 Worker *worker = jobSWorker(job);
282 Q_ASSERT(worker);
283 ret.append(worker);
284 }
285 return ret;
286}
287
288static void ensureNoDuplicates(QMap<int, HostQueue *> *queuesBySerial)
289{
290 Q_UNUSED(queuesBySerial);
291#ifdef SCHEDULER_DEBUG
292 // a host queue may *never* be in queuesBySerial twice.
294 auto it = queuesBySerial->cbegin();
295 for (; it != queuesBySerial->cend(); ++it) {
296 Q_ASSERT(!seen.contains(it.value()));
297 seen.insert(it.value());
298 }
299#endif
300}
301
302static void verifyRunningJobsCount(QHash<QString, HostQueue> *queues, int runningJobsCount)
303{
304 Q_UNUSED(queues);
305 Q_UNUSED(runningJobsCount);
306#ifdef SCHEDULER_DEBUG
307 int realRunningJobsCount = 0;
308 auto it = queues->cbegin();
309 for (; it != queues->cend(); ++it) {
310 realRunningJobsCount += it.value().runningJobsCount();
311 }
312 Q_ASSERT(realRunningJobsCount == runningJobsCount);
313
314 // ...and of course we may never run the same job twice!
315 QSet<SimpleJob *> seenJobs;
316 auto it2 = queues->cbegin();
317 for (; it2 != queues->cend(); ++it2) {
318 for (SimpleJob *job : it2.value().runningJobs()) {
319 Q_ASSERT(!seenJobs.contains(job));
320 seenJobs.insert(job);
321 }
322 }
323#endif
324}
325
326ProtoQueue::ProtoQueue(int maxWorkers, int maxWorkersPerHost)
327 : m_maxConnectionsPerHost(maxWorkersPerHost ? maxWorkersPerHost : maxWorkers)
328 , m_maxConnectionsTotal(qMax(maxWorkers, maxWorkersPerHost))
329 , m_runningJobsCount(0)
330
331{
332 /*qDebug() << "m_maxConnectionsTotal:" << m_maxConnectionsTotal
333 << "m_maxConnectionsPerHost:" << m_maxConnectionsPerHost;*/
334 Q_ASSERT(m_maxConnectionsPerHost >= 1);
335 Q_ASSERT(maxWorkers >= maxWorkersPerHost);
336 m_startJobTimer.setSingleShot(true);
337 connect(&m_startJobTimer, &QTimer::timeout, this, &ProtoQueue::startAJob);
338}
339
340ProtoQueue::~ProtoQueue()
341{
342 // Gather list of all workers first
343 const QList<Worker *> workers = allWorkers();
344 // Clear the idle workers in the manager to avoid dangling pointers
345 m_workerManager.clear();
346 for (Worker *worker : workers) {
347 // kill the worker process and remove the interface in our process
348 worker->kill();
349 }
350}
351
352void ProtoQueue::queueJob(SimpleJob *job)
353{
354 QString hostname = SimpleJobPrivate::get(job)->m_url.host();
355 HostQueue &hq = m_queuesByHostname[hostname];
356 const int prevLowestSerial = hq.lowestSerial();
357 Q_ASSERT(hq.runningJobsCount() <= m_maxConnectionsPerHost);
358
359 // nevert insert a job twice
360 Q_ASSERT(SimpleJobPrivate::get(job)->m_schedSerial == 0);
361 SimpleJobPrivate::get(job)->m_schedSerial = m_serialPicker.next();
362
363 const bool wasQueueEmpty = hq.isQueueEmpty();
364 hq.queueJob(job);
365 // note that HostQueue::queueJob() into an empty queue changes its lowestSerial() too...
366 // the queue's lowest serial job may have changed, so update the ordered list of queues.
367 // however, we ignore all jobs that would cause more connections to a host than allowed.
368 if (prevLowestSerial != hq.lowestSerial()) {
369 if (hq.runningJobsCount() < m_maxConnectionsPerHost) {
370 // if the connection limit didn't keep the HQ unscheduled it must have been lack of jobs
371 if (m_queuesBySerial.remove(prevLowestSerial) == 0) {
372 Q_UNUSED(wasQueueEmpty);
373 Q_ASSERT(wasQueueEmpty);
374 }
375 m_queuesBySerial.insert(hq.lowestSerial(), &hq);
376 } else {
377#ifdef SCHEDULER_DEBUG
378 // ### this assertion may fail if the limits were modified at runtime!
379 // if the per-host connection limit is already reached the host queue's lowest serial
380 // should not be queued.
381 Q_ASSERT(!m_queuesBySerial.contains(prevLowestSerial));
382#endif
383 }
384 }
385 // just in case; startAJob() will refuse to start a job if it shouldn't.
386 m_startJobTimer.start();
387
388 ensureNoDuplicates(&m_queuesBySerial);
389}
390
391void ProtoQueue::changeJobPriority(SimpleJob *job, int newPrio)
392{
393 SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(job);
394 QHash<QString, HostQueue>::Iterator it = m_queuesByHostname.find(jobPriv->m_url.host());
395 if (it == m_queuesByHostname.end()) {
396 return;
397 }
398 HostQueue &hq = it.value();
399 const int prevLowestSerial = hq.lowestSerial();
400 if (hq.isJobRunning(job) || !hq.removeJob(job)) {
401 return;
402 }
403 jobPriv->m_schedSerial = m_serialPicker.changedPrioritySerial(jobPriv->m_schedSerial, newPrio);
404 hq.queueJob(job);
405 const bool needReinsert = hq.lowestSerial() != prevLowestSerial;
406 // the host queue might be absent from m_queuesBySerial because the connections per host limit
407 // for that host has been reached.
408 if (needReinsert && m_queuesBySerial.remove(prevLowestSerial)) {
409 m_queuesBySerial.insert(hq.lowestSerial(), &hq);
410 }
411 ensureNoDuplicates(&m_queuesBySerial);
412}
413
414void ProtoQueue::removeJob(SimpleJob *job)
415{
416 SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(job);
417 HostQueue &hq = m_queuesByHostname[jobPriv->m_url.host()];
418 const int prevLowestSerial = hq.lowestSerial();
419 const int prevRunningJobs = hq.runningJobsCount();
420
421 Q_ASSERT(hq.runningJobsCount() <= m_maxConnectionsPerHost);
422
423 if (hq.removeJob(job)) {
424 if (hq.lowestSerial() != prevLowestSerial) {
425 // we have dequeued the not yet running job with the lowest serial
426 Q_ASSERT(!jobPriv->m_worker);
427 Q_ASSERT(prevRunningJobs == hq.runningJobsCount());
428 if (m_queuesBySerial.remove(prevLowestSerial) == 0) {
429 // make sure that the queue was not scheduled for a good reason
430 Q_ASSERT(hq.runningJobsCount() == m_maxConnectionsPerHost);
431 }
432 } else {
433 if (prevRunningJobs != hq.runningJobsCount()) {
434 // we have dequeued a previously running job
435 Q_ASSERT(prevRunningJobs - 1 == hq.runningJobsCount());
436 m_runningJobsCount--;
437 Q_ASSERT(m_runningJobsCount >= 0);
438 }
439 }
440 if (!hq.isQueueEmpty() && hq.runningJobsCount() < m_maxConnectionsPerHost) {
441 // this may be a no-op, but it's faster than first checking if it's already in.
442 m_queuesBySerial.insert(hq.lowestSerial(), &hq);
443 }
444
445 if (hq.isEmpty()) {
446 // no queued jobs, no running jobs. this destroys hq from above.
447 m_queuesByHostname.remove(jobPriv->m_url.host());
448 }
449
450 if (jobPriv->m_worker && jobPriv->m_worker->isAlive()) {
451 m_workerManager.returnWorker(jobPriv->m_worker);
452 }
453 // just in case; startAJob() will refuse to start a job if it shouldn't.
454 m_startJobTimer.start();
455 }
456
457 ensureNoDuplicates(&m_queuesBySerial);
458}
459
460Worker *ProtoQueue::createWorker(const QString &protocol, SimpleJob *job, const QUrl &url)
461{
462 int error;
463 QString errortext;
464 Worker *worker = Worker::createWorker(protocol, url, error, errortext);
465 if (worker) {
466 connect(worker, &Worker::workerDied, scheduler(), [](KIO::Worker *worker) {
467 schedulerPrivate()->slotWorkerDied(worker);
468 });
469 } else {
470 qCWarning(KIO_CORE) << "couldn't create worker:" << errortext;
471 if (job) {
472 job->slotError(error, errortext);
473 }
474 }
475 return worker;
476}
477
478bool ProtoQueue::removeWorker(KIO::Worker *worker)
479{
480 const bool removed = m_workerManager.removeWorker(worker);
481 return removed;
482}
483
484QList<Worker *> ProtoQueue::allWorkers() const
485{
486 QList<Worker *> ret(m_workerManager.allWorkers());
487 auto it = m_queuesByHostname.cbegin();
488 for (; it != m_queuesByHostname.cend(); ++it) {
489 ret.append(it.value().allWorkers());
490 }
491
492 return ret;
493}
494
495// private slot
496void ProtoQueue::startAJob()
497{
498 ensureNoDuplicates(&m_queuesBySerial);
499 verifyRunningJobsCount(&m_queuesByHostname, m_runningJobsCount);
500
501#ifdef SCHEDULER_DEBUG
502 // qDebug() << "m_runningJobsCount:" << m_runningJobsCount;
503 auto it = m_queuesByHostname.cbegin();
504 for (; it != m_queuesByHostname.cend(); ++it) {
505 const QList<KIO::SimpleJob *> list = it.value().runningJobs();
506 for (SimpleJob *job : list) {
507 // qDebug() << SimpleJobPrivate::get(job)->m_url;
508 }
509 }
510#endif
511 if (m_runningJobsCount >= m_maxConnectionsTotal) {
512#ifdef SCHEDULER_DEBUG
513 // qDebug() << "not starting any jobs because maxConnectionsTotal has been reached.";
514#endif
515 return;
516 }
517
518 QMap<int, HostQueue *>::iterator first = m_queuesBySerial.begin();
519 if (first != m_queuesBySerial.end()) {
520 // pick a job and maintain the queue invariant: lower serials first
521 HostQueue *hq = first.value();
522 const int prevLowestSerial = first.key();
523 Q_UNUSED(prevLowestSerial);
524 Q_ASSERT(hq->lowestSerial() == prevLowestSerial);
525 // the following assertions should hold due to queueJob(), takeFirstInQueue() and
526 // removeJob() being correct
527 Q_ASSERT(hq->runningJobsCount() < m_maxConnectionsPerHost);
528 SimpleJob *startingJob = hq->takeFirstInQueue();
529 Q_ASSERT(hq->runningJobsCount() <= m_maxConnectionsPerHost);
530 Q_ASSERT(hq->lowestSerial() != prevLowestSerial);
531
532 m_queuesBySerial.erase(first);
533 // we've increased hq's runningJobsCount() by calling nexStartingJob()
534 // so we need to check again.
535 if (!hq->isQueueEmpty() && hq->runningJobsCount() < m_maxConnectionsPerHost) {
536 m_queuesBySerial.insert(hq->lowestSerial(), hq);
537 }
538
539 // always increase m_runningJobsCount because it's correct if there is a worker and if there
540 // is no worker, removeJob() will balance the number again. removeJob() would decrease the
541 // number too much otherwise.
542 // Note that createWorker() can call slotError() on a job which in turn calls removeJob(),
543 // so increase the count here already.
544 m_runningJobsCount++;
545
546 bool isNewWorker = false;
547 Worker *worker = m_workerManager.takeWorkerForJob(startingJob);
548 SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(startingJob);
549 if (!worker) {
550 isNewWorker = true;
551 worker = createWorker(jobPriv->m_protocol, startingJob, jobPriv->m_url);
552 }
553
554 if (worker) {
555 jobPriv->m_worker = worker;
556 schedulerPrivate()->setupWorker(worker, jobPriv->m_url, jobPriv->m_protocol, jobPriv->m_proxyList, isNewWorker);
557 startJob(startingJob, worker);
558 } else {
559 // dispose of our records about the job and mark the job as unknown
560 // (to prevent crashes later)
561 // note that the job's slotError() can have called removeJob() first, so check that
562 // it's not a ghost job with null serial already.
563 if (jobPriv->m_schedSerial) {
564 removeJob(startingJob);
565 jobPriv->m_schedSerial = 0;
566 }
567 }
568 } else {
569#ifdef SCHEDULER_DEBUG
570 // qDebug() << "not starting any jobs because there is no queued job.";
571#endif
572 }
573
574 if (!m_queuesBySerial.isEmpty()) {
575 m_startJobTimer.start();
576 }
577}
578
579Scheduler::Scheduler()
580{
581 setObjectName(QStringLiteral("scheduler"));
582
583#ifdef WITH_QTDBUS
584 const QString dbusPath = QStringLiteral("/KIO/Scheduler");
585 const QString dbusInterface = QStringLiteral("org.kde.KIO.Scheduler");
587 // Not needed, right? We just want to emit two signals.
588 // dbus.registerObject(dbusPath, this, QDBusConnection::ExportScriptableSlots |
589 // QDBusConnection::ExportScriptableSignals);
590 dbus.connect(QString(),
591 dbusPath,
592 dbusInterface,
593 QStringLiteral("reparseSlaveConfiguration"),
594 this,
595 SLOT(slotReparseSlaveConfiguration(QString, QDBusMessage)));
596#endif
597}
598
599Scheduler::~Scheduler()
600{
601}
602
603void Scheduler::doJob(SimpleJob *job)
604{
605 schedulerPrivate()->doJob(job);
606}
607
608// static
609void Scheduler::cancelJob(SimpleJob *job)
610{
611 schedulerPrivate()->cancelJob(job);
612}
613
614void Scheduler::jobFinished(KIO::SimpleJob *job, KIO::Worker *worker)
615{
616 schedulerPrivate()->jobFinished(job, worker);
617}
618
619void Scheduler::putWorkerOnHold(KIO::SimpleJob *job, const QUrl &url)
620{
621 schedulerPrivate()->putWorkerOnHold(job, url);
622}
623
624void Scheduler::removeWorkerOnHold()
625{
626 schedulerPrivate()->removeWorkerOnHold();
627}
628
629bool Scheduler::isWorkerOnHoldFor(const QUrl &url)
630{
631 return schedulerPrivate()->isWorkerOnHoldFor(url);
632}
633
634void Scheduler::updateInternalMetaData(SimpleJob *job)
635{
636 schedulerPrivate()->updateInternalMetaData(job);
637}
638
639void Scheduler::emitReparseSlaveConfiguration()
640{
641#ifdef WITH_QTDBUS
642 // Do it immediately in this process, otherwise we might send a request before reparsing
643 // (e.g. when changing useragent in the plugin)
644 schedulerPrivate()->slotReparseSlaveConfiguration(QString(), QDBusMessage());
645#endif
646
647 schedulerPrivate()->m_ignoreConfigReparse = true;
648 Q_EMIT self()->reparseSlaveConfiguration(QString());
649}
650
651#ifdef WITH_QTDBUS
652void SchedulerPrivate::slotReparseSlaveConfiguration(const QString &proto, const QDBusMessage &)
653{
654 if (m_ignoreConfigReparse) {
655 // qDebug() << "Ignoring signal sent by myself";
656 m_ignoreConfigReparse = false;
657 return;
658 }
659
660 // qDebug() << "proto=" << proto;
662 WorkerConfig::self()->reset();
663 sessionData.reset();
664
665 QHash<QString, ProtoQueue *>::ConstIterator it = proto.isEmpty() ? m_protocols.constBegin() : m_protocols.constFind(proto);
667
668 // not found?
669 if (it == endIt) {
670 return;
671 }
672
673 if (!proto.isEmpty()) {
674 endIt = it;
675 ++endIt;
676 }
677
678 for (; it != endIt; ++it) {
679 const QList<KIO::Worker *> list = it.value()->allWorkers();
680 for (Worker *worker : list) {
681 worker->send(CMD_REPARSECONFIGURATION);
682 worker->resetHost();
683 }
684 }
685}
686#endif
687
688void SchedulerPrivate::doJob(SimpleJob *job)
689{
690 // qDebug() << job;
691 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job);
692 jobPriv->m_proxyList.clear();
693 jobPriv->m_protocol = KProtocolManagerPrivate::workerProtocol(job->url(), jobPriv->m_proxyList);
694
695 ProtoQueue *proto = protoQ(jobPriv->m_protocol, job->url().host());
696 proto->queueJob(job);
697}
698
699void SchedulerPrivate::setJobPriority(SimpleJob *job, int priority)
700{
701 // qDebug() << job << priority;
702 const QString protocol = SimpleJobPrivate::get(job)->m_protocol;
703 if (!protocol.isEmpty()) {
704 ProtoQueue *proto = protoQ(protocol, job->url().host());
705 proto->changeJobPriority(job, priority);
706 }
707}
708
709void SchedulerPrivate::cancelJob(SimpleJob *job)
710{
711 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job);
712 // this method is called all over the place in job.cpp, so just do this check here to avoid
713 // much boilerplate in job code.
714 if (jobPriv->m_schedSerial == 0) {
715 // qDebug() << "Doing nothing because I don't know job" << job;
716 return;
717 }
718 Worker *worker = jobSWorker(job);
719 // qDebug() << job << worker;
720 jobFinished(job, worker);
721 if (worker) {
722 ProtoQueue *pq = m_protocols.value(jobPriv->m_protocol);
723 if (pq) {
724 pq->removeWorker(worker);
725 }
726 worker->kill(); // don't use worker after this!
727 }
728}
729
730void SchedulerPrivate::jobFinished(SimpleJob *job, Worker *worker)
731{
732 // qDebug() << job << worker;
733 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job);
734
735 // make sure that we knew about the job!
736 Q_ASSERT(jobPriv->m_schedSerial);
737
738 ProtoQueue *pq = m_protocols.value(jobPriv->m_protocol);
739 if (pq) {
740 pq->removeJob(job);
741 }
742
743 if (worker) {
744 // If we have internal meta-data, tell existing KIO workers to reload
745 // their configuration.
746 if (jobPriv->m_internalMetaData.count()) {
747 // qDebug() << "Updating KIO workers with new internal metadata information";
748 ProtoQueue *queue = m_protocols.value(worker->protocol());
749 if (queue) {
750 const QList<Worker *> workers = queue->allWorkers();
751 for (auto *runningWorker : workers) {
752 if (worker->host() == runningWorker->host()) {
753 worker->setConfig(metaDataFor(worker->protocol(), jobPriv->m_proxyList, job->url()));
754 /*qDebug() << "Updated configuration of" << worker->protocol()
755 << "KIO worker, pid=" << worker->worker_pid();*/
756 }
757 }
758 }
759 }
760 worker->setJob(nullptr);
761 worker->disconnect(job);
762 }
763 jobPriv->m_schedSerial = 0; // this marks the job as unscheduled again
764 jobPriv->m_worker = nullptr;
765 // Clear the values in the internal metadata container since they have
766 // already been taken care of above...
767 jobPriv->m_internalMetaData.clear();
768}
769
770MetaData SchedulerPrivate::metaDataFor(const QString &protocol, const QStringList &proxyList, const QUrl &url)
771{
772 const QString host = url.host();
773 MetaData configData = WorkerConfig::self()->configData(protocol, host);
774 sessionData.configDataFor(configData, protocol, host);
775 if (proxyList.isEmpty()) {
776 configData.remove(QStringLiteral("UseProxy"));
777 configData.remove(QStringLiteral("ProxyUrls"));
778 } else {
779 configData[QStringLiteral("UseProxy")] = proxyList.first();
780 configData[QStringLiteral("ProxyUrls")] = proxyList.join(QLatin1Char(','));
781 }
782
783 return configData;
784}
785
786void SchedulerPrivate::setupWorker(KIO::Worker *worker,
787 const QUrl &url,
788 const QString &protocol,
789 const QStringList &proxyList,
790 bool newWorker,
791 const KIO::MetaData *config)
792{
793 int port = url.port();
794 if (port == -1) { // no port is -1 in QUrl, but in kde3 we used 0 and the KIO workers assume that.
795 port = 0;
796 }
797 const QString host = url.host();
798 const QString user = url.userName();
799 const QString passwd = url.password();
800
801 if (newWorker || worker->host() != host || worker->port() != port || worker->user() != user || worker->passwd() != passwd) {
802 MetaData configData = metaDataFor(protocol, proxyList, url);
803 if (config) {
804 configData += *config;
805 }
806
807 worker->setConfig(configData);
808 worker->setProtocol(url.scheme());
809 worker->setHost(host, port, user, passwd);
810 }
811}
812
813void SchedulerPrivate::slotWorkerDied(KIO::Worker *worker)
814{
815 // qDebug() << worker;
816 Q_ASSERT(worker);
817 Q_ASSERT(!worker->isAlive());
818 ProtoQueue *pq = m_protocols.value(worker->protocol());
819 if (pq) {
820 if (worker->job()) {
821 pq->removeJob(worker->job());
822 }
823 // in case this was a connected worker...
824 pq->removeWorker(worker);
825 }
826 if (worker == m_workerOnHold) {
827 m_workerOnHold = nullptr;
828 m_urlOnHold.clear();
829 }
830 // can't use worker->deref() here because we need to use deleteLater
831 worker->aboutToDelete();
832 worker->deleteLater();
833}
834
835void SchedulerPrivate::putWorkerOnHold(KIO::SimpleJob *job, const QUrl &url)
836{
837 Worker *worker = jobSWorker(job);
838 // qDebug() << job << url << worker;
839 worker->disconnect(job);
840 // prevent the fake death of the worker from trying to kill the job again;
841 // cf. Worker::hold(const QUrl &url) called in SchedulerPrivate::publishWorkerOnHold().
842 worker->setJob(nullptr);
843 SimpleJobPrivate::get(job)->m_worker = nullptr;
844
845 if (m_workerOnHold) {
846 m_workerOnHold->kill();
847 }
848 m_workerOnHold = worker;
849 m_urlOnHold = url;
850 m_workerOnHold->suspend();
851}
852
853bool SchedulerPrivate::isWorkerOnHoldFor(const QUrl &url)
854{
855 if (url.isValid() && m_urlOnHold.isValid() && url == m_urlOnHold) {
856 return true;
857 }
858
859 return false;
860}
861
862Worker *SchedulerPrivate::heldWorkerForJob(SimpleJob *job)
863{
864 Worker *worker = nullptr;
865 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job);
866
867 if (m_workerOnHold) {
868 // Make sure that the job wants to do a GET or a POST, and with no offset
869 const int cmd = jobPriv->m_command;
870 bool canJobReuse = (cmd == CMD_GET);
871
872 if (KIO::TransferJob *tJob = qobject_cast<KIO::TransferJob *>(job)) {
873 canJobReuse = (canJobReuse || cmd == CMD_SPECIAL);
874 if (canJobReuse) {
875 KIO::MetaData outgoing = tJob->outgoingMetaData();
876 const QString resume = outgoing.value(QStringLiteral("resume"));
877 const QString rangeStart = outgoing.value(QStringLiteral("range-start"));
878 // qDebug() << "Resume metadata is" << resume;
879 canJobReuse = (resume.isEmpty() || resume == QLatin1Char('0')) && (rangeStart.isEmpty() || rangeStart == QLatin1Char('0'));
880 }
881 }
882
883 if (job->url() == m_urlOnHold) {
884 if (canJobReuse) {
885 // qDebug() << "HOLD: Reusing held worker (" << m_workerOnHold << ")";
886 worker = m_workerOnHold;
887 } else {
888 // qDebug() << "HOLD: Discarding held worker (" << m_workerOnHold << ")";
889 m_workerOnHold->kill();
890 }
891 m_workerOnHold = nullptr;
892 m_urlOnHold.clear();
893 }
894 }
895
896 return worker;
897}
898
899void SchedulerPrivate::removeWorkerOnHold()
900{
901 // qDebug() << m_workerOnHold;
902 if (m_workerOnHold) {
903 m_workerOnHold->kill();
904 }
905 m_workerOnHold = nullptr;
906 m_urlOnHold.clear();
907}
908
909ProtoQueue *SchedulerPrivate::protoQ(const QString &protocol, const QString &host)
910{
911 ProtoQueue *pq = m_protocols.value(protocol, nullptr);
912 if (!pq) {
913 // qDebug() << "creating ProtoQueue instance for" << protocol;
914
915 const int maxWorkers = KProtocolInfo::maxWorkers(protocol);
916 int maxWorkersPerHost = -1;
917 if (!host.isEmpty()) {
918 bool ok = false;
919 const int value = WorkerConfig::self()->configData(protocol, host, QStringLiteral("MaxConnections")).toInt(&ok);
920 if (ok) {
921 maxWorkersPerHost = value;
922 }
923 }
924 if (maxWorkersPerHost == -1) {
925 maxWorkersPerHost = KProtocolInfo::maxWorkersPerHost(protocol);
926 }
927 // Never allow maxWorkersPerHost to exceed maxWorkers.
928 pq = new ProtoQueue(maxWorkers, qMin(maxWorkers, maxWorkersPerHost));
929 m_protocols.insert(protocol, pq);
930 }
931 return pq;
932}
933
934void SchedulerPrivate::updateInternalMetaData(SimpleJob *job)
935{
936 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job);
937 // Preserve all internal meta-data so they can be sent back to the
938 // KIO workers as needed...
939 const QUrl jobUrl = job->url();
940
941 const QLatin1String currHostToken("{internal~currenthost}");
942 const QLatin1String allHostsToken("{internal~allhosts}");
943 // qDebug() << job << jobPriv->m_internalMetaData;
944 QMapIterator<QString, QString> it(jobPriv->m_internalMetaData);
945 while (it.hasNext()) {
946 it.next();
947 if (it.key().startsWith(currHostToken, Qt::CaseInsensitive)) {
948 WorkerConfig::self()->setConfigData(jobUrl.scheme(), jobUrl.host(), it.key().mid(currHostToken.size()), it.value());
949 } else if (it.key().startsWith(allHostsToken, Qt::CaseInsensitive)) {
950 WorkerConfig::self()->setConfigData(jobUrl.scheme(), QString(), it.key().mid(allHostsToken.size()), it.value());
951 }
952 }
953}
954
955#include "moc_scheduler.cpp"
956#include "moc_scheduler_p.cpp"
MetaData is a simple map of key/value strings.
Definition metadata.h:23
A simple job (one url and one command).
Definition simplejob.h:27
void slotError(int, const QString &)
const QUrl & url() const
Returns the SimpleJob's URL.
Definition simplejob.cpp:70
The transfer job pumps data into and/or out of a KIO worker.
Definition transferjob.h:26
static int maxWorkersPerHost(const QString &protocol)
Returns the limit on the number of KIO workers for this protocol per host.
static int maxWorkers(const QString &protocol)
Returns the soft limit on the number of KIO workers for this protocol.
static void reparseConfiguration()
Force a reload of the general config file of KIO workers ( kioslaverc).
A namespace for KIO globals.
void error(QWidget *parent, const QString &text, const QString &title, const KGuiItem &buttonOk, Options options=Notify)
KIOCORE_EXPORT QStringList list(const QString &fileClass)
Returns a list of directories associated with this file-class.
NETWORKMANAGERQT_EXPORT QString hostname()
bool connect(const QString &service, const QString &path, const QString &interface, const QString &name, QObject *receiver, const char *slot)
QDBusConnection sessionBus()
const_iterator cbegin() const const
const_iterator cend() const const
const_iterator constBegin() const const
const_iterator constEnd() const const
const_iterator constFind(const Key &key) const const
iterator insert(const Key &key, const T &value)
Key key(const T &value) const const
T value(const Key &key) const const
void append(QList< T > &&value)
T & first()
bool isEmpty() const const
void reserve(qsizetype size)
iterator begin()
const_iterator cbegin() const const
const_iterator cend() const const
const_iterator constBegin() const const
Key key(const T &value, const Key &defaultKey) const const
size_type remove(const Key &key)
T value(const Key &key, const T &defaultValue) const const
iterator begin()
iterator end()
iterator find(const Key &key, const T &value)
T value(const Key &key) const const
Q_EMITQ_EMIT
void setObjectName(QAnyStringView name)
bool contains(const QSet< T > &other) const const
iterator insert(const T &value)
bool isEmpty() const const
QString join(QChar separator) const const
CaseInsensitive
QFuture< ArgsType< Signal > > connect(Sender *sender, Signal signal)
void timeout()
void clear()
QString host(ComponentFormattingOptions options) const const
bool isValid() const const
QString password(ComponentFormattingOptions options) const const
int port(int defaultPort) const const
QString scheme() const const
QString userName(ComponentFormattingOptions options) const const
This file is part of the KDE documentation.
Documentation copyright © 1996-2024 The KDE developers.
Generated on Fri Jun 14 2024 11:51:27 by doxygen 1.10.0 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.