ThreadWeaver

weaver.cpp
1/* -*- C++ -*-
2 This file implements the WeaverImpl class.
3
4 SPDX-FileCopyrightText: 2005-2013 Mirko Boehm <mirko@kde.org>
5
6 SPDX-License-Identifier: LGPL-2.0-or-later
7
8 $Id: WeaverImpl.cpp 30 2005-08-16 16:16:04Z mirko $
9*/
10
11#include "weaver.h"
12
13#include <QCoreApplication>
14#include <QDebug>
15#include <QMutex>
16#include <QDeadlineTimer>
17#include "debuggingaids.h"
18#include "destructedstate.h"
19#include "exception.h"
20#include "inconstructionstate.h"
21#include "job.h"
22#include "managedjobpointer.h"
23#include "queuepolicy.h"
24#include "shuttingdownstate.h"
25#include "state.h"
26#include "suspendedstate.h"
27#include "suspendingstate.h"
28#include "thread.h"
29#include "threadweaver.h"
30#include "weaver_p.h"
31#include "workinghardstate.h"
32
33using namespace ThreadWeaver;
34
35/** @brief Constructs a Weaver object. */
37 : QueueAPI(new Private::Weaver_Private(), parent)
38{
39 qRegisterMetaType<ThreadWeaver::JobPointer>("ThreadWeaver::JobPointer");
40 QMutexLocker l(d()->mutex);
41 Q_UNUSED(l);
42 // initialize state objects:
43 d()->states[InConstruction] = QSharedPointer<State>(new InConstructionState(this));
44 setState_p(InConstruction);
45 d()->states[WorkingHard] = QSharedPointer<State>(new WorkingHardState(this));
46 d()->states[Suspending] = QSharedPointer<State>(new SuspendingState(this));
47 d()->states[Suspended] = QSharedPointer<State>(new SuspendedState(this));
48 d()->states[ShuttingDown] = QSharedPointer<State>(new ShuttingDownState(this));
49 d()->states[Destructed] = QSharedPointer<State>(new DestructedState(this));
50
51 setState_p(WorkingHard);
52}
53
54/** @brief Destructs a Weaver object. */
56{
57 Q_ASSERT_X(state()->stateId() == Destructed, Q_FUNC_INFO, "shutDown() method was not called before Weaver destructor!");
58}
59
60/** @brief Enter Destructed state.
61 *
62 * Once this method returns, it is save to delete this object.
63 */
65{
66 state()->shutDown();
67}
68
69void Weaver::shutDown_p()
70{
71 // the constructor may only be called from the thread that owns this
72 // object (everything else would be what we professionals call "insane")
73
74 REQUIRE(QThread::currentThread() == thread());
75 TWDEBUG(3, "WeaverImpl::shutDown: destroying inventory.\n");
76 d()->semaphore.acquire(d()->createdThreads.loadAcquire());
77 finish();
78 suspend();
79 setState(ShuttingDown);
80 reschedule();
81 d()->jobFinished.wakeAll();
82
83 // problem: Some threads might not be asleep yet, just finding
84 // out if a job is available. Those threads will suspend
85 // waiting for their next job (a rare case, but not impossible).
86 // Therefore, if we encounter a thread that has not exited, we
87 // have to wake it again (which we do in the following for
88 // loop).
89
90 for (;;) {
91 Thread *th = nullptr;
92 {
93 QMutexLocker l(d()->mutex);
94 Q_UNUSED(l);
95 if (d()->inventory.isEmpty()) {
96 break;
97 }
98 th = d()->inventory.takeFirst();
99 }
100 if (!th->isFinished()) {
101 for (;;) {
102 Q_ASSERT(state()->stateId() == ShuttingDown);
103 reschedule();
104 if (th->wait(100)) {
105 break;
106 }
107 TWDEBUG(1,
108 "WeaverImpl::shutDown: thread %i did not exit as expected, "
109 "retrying.\n",
110 th->id());
111 }
112 }
114 delete th;
115 }
116 Q_ASSERT(d()->inventory.isEmpty());
117 TWDEBUG(3, "WeaverImpl::shutDown: done\n");
118 setState(Destructed); // Destructed ignores all calls into the queue API
119}
120
121/** @brief Set the Weaver state.
122 * @see StateId
123 * @see WeaverImplState
124 * @see State
125 */
126void Weaver::setState(StateId id)
127{
128 QMutexLocker l(d()->mutex);
129 Q_UNUSED(l);
130 setState_p(id);
131}
132
133void Weaver::setState_p(StateId id)
134{
135 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
136 State *newState = d()->states[id].data();
137 State *previous = d()->state.fetchAndStoreOrdered(newState);
138 if (previous == nullptr || previous->stateId() != id) {
139 newState->activated();
140 TWDEBUG(2, "WeaverImpl::setState: state changed to \"%s\".\n", newState->stateName().toLatin1().constData());
141 if (id == Suspended) {
142 Q_EMIT(suspended());
143 }
145 }
146}
147
148const State *Weaver::state() const
149{
150 return d()->state.loadAcquire();
151}
152
154{
155 return d()->state.loadAcquire();
156}
157
159{
160 Q_ASSERT_X(cap >= 0, "Weaver Impl", "Thread inventory size has to be larger than or equal to zero.");
161 QMutexLocker l(d()->mutex);
162 Q_UNUSED(l);
163 state()->setMaximumNumberOfThreads(cap);
164 reschedule();
165}
166
167void Weaver::setMaximumNumberOfThreads_p(int cap)
168{
169 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
170 const bool createInitialThread = (d()->inventoryMax == 0 && cap > 0);
171 d()->inventoryMax = cap;
174 }
175}
176
178{
179 QMutexLocker l(d()->mutex);
180 Q_UNUSED(l);
181 return state()->maximumNumberOfThreads();
182}
183
184int Weaver::maximumNumberOfThreads_p() const
185{
186 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
187 return d()->inventoryMax;
188}
189
191{
192 QMutexLocker l(d()->mutex);
193 Q_UNUSED(l);
194 return state()->currentNumberOfThreads();
195}
196
197int Weaver::currentNumberOfThreads_p() const
198{
199 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
200 return d()->inventory.count();
201}
202
204{
205 QMutexLocker l(d()->mutex);
206 Q_UNUSED(l);
207 state()->enqueue(jobs);
208}
209
210void Weaver::enqueue_p(const QList<JobPointer> &jobs)
211{
212 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
213 if (jobs.isEmpty()) {
214 return;
215 }
216 for (const JobPointer &job : jobs) {
217 if (job) {
218 Q_ASSERT(job->status() == Job::Status_New);
219 adjustInventory(jobs.size());
220 TWDEBUG(3, "WeaverImpl::enqueue: queueing job %p.\n", (void *)job.data());
221 job->aboutToBeQueued(this);
222 // find position for insertion:
223 int i = d()->assignments.size();
224 if (i > 0) {
225 while (i > 0 && d()->assignments.at(i - 1)->priority() < job->priority()) {
226 --i;
227 }
228 d()->assignments.insert(i, job);
229 } else {
230 d()->assignments.append(job);
231 }
232 job->setStatus(Job::Status_Queued);
233 reschedule();
234 }
235 }
236}
237
239{
240 QMutexLocker l(d()->mutex);
241 Q_UNUSED(l);
242 return state()->dequeue(job);
243}
244
245bool Weaver::dequeue_p(JobPointer job)
246{
247 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
248 int position = d()->assignments.indexOf(job);
249 if (position != -1) {
250 job->aboutToBeDequeued(this);
251 int newPosition = d()->assignments.indexOf(job);
252 JobPointer job = d()->assignments.takeAt(newPosition);
253 job->setStatus(Job::Status_New);
254 Q_ASSERT(!d()->assignments.contains(job));
255 TWDEBUG(3, "WeaverImpl::dequeue: job %p dequeued, %i jobs left.\n", (void *)job.data(), queueLength_p());
256 // from the queues point of view, a job is just as finished if it gets dequeued:
257 d()->jobFinished.wakeAll();
258 Q_ASSERT(!d()->assignments.contains(job));
259 return true;
260 } else {
261 TWDEBUG(3, "WeaverImpl::dequeue: job %p not found in queue.\n", (void *)job.data());
262 return false;
263 }
264}
265
267{
268 QMutexLocker l(d()->mutex);
269 Q_UNUSED(l);
270 state()->dequeue();
271}
272
273void Weaver::dequeue_p()
274{
275 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
276 TWDEBUG(3, "WeaverImpl::dequeue: dequeueing all jobs.\n");
277 for (int index = 0; index < d()->assignments.size(); ++index) {
278 d()->assignments.at(index)->aboutToBeDequeued(this);
279 }
280 d()->assignments.clear();
281 ENSURE(d()->assignments.isEmpty());
282}
283
285{
286 QMutexLocker l(d()->mutex);
287 Q_UNUSED(l);
288 state()->finish();
289}
290
291void Weaver::finish_p()
292{
293 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
294#ifdef QT_NO_DEBUG
295 const int MaxWaitMilliSeconds = 50;
296#else
297 const int MaxWaitMilliSeconds = 500;
298#endif
299 while (!isIdle_p()) {
300 Q_ASSERT_X(state()->stateId() == WorkingHard, Q_FUNC_INFO, qPrintable(state()->stateName()));
301 TWDEBUG(2, "WeaverImpl::finish: not done, waiting.\n");
302 if (d()->jobFinished.wait(d()->mutex, QDeadlineTimer(MaxWaitMilliSeconds)) == false) {
303 TWDEBUG(2, "WeaverImpl::finish: wait timed out, %i jobs left, waking threads.\n", queueLength_p());
304 reschedule();
305 }
306 }
307 TWDEBUG(2, "WeaverImpl::finish: done.\n\n\n");
308}
309
311{
312 // FIXME?
313 // QMutexLocker l(m_mutex); Q_UNUSED(l);
314 state()->suspend();
315}
316
317void Weaver::suspend_p()
318{
319 // FIXME ?
320}
321
323{
324 // FIXME?
325 // QMutexLocker l(m_mutex); Q_UNUSED(l);
326 state()->resume();
327}
328
329void Weaver::resume_p()
330{
331 // FIXME ?
332}
333
334bool Weaver::isEmpty() const
335{
336 QMutexLocker l(d()->mutex);
337 Q_UNUSED(l);
338 return state()->isEmpty();
339}
340
341bool Weaver::isEmpty_p() const
342{
343 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
344 return d()->assignments.isEmpty();
345}
346
347bool Weaver::isIdle() const
348{
349 QMutexLocker l(d()->mutex);
350 Q_UNUSED(l);
351 return state()->isIdle();
352}
353
354bool Weaver::isIdle_p() const
355{
356 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
357 return isEmpty_p() && d()->active == 0;
358}
359
361{
362 QMutexLocker l(d()->mutex);
363 Q_UNUSED(l);
364 return state()->queueLength();
365}
366
367int Weaver::queueLength_p() const
368{
369 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
370 return d()->assignments.count();
371}
372
374{
375 QMutexLocker l(d()->mutex);
376 Q_UNUSED(l);
377 return state()->requestAbort();
378}
379
381{
382 d()->jobAvailable.wakeAll();
383}
384
385void Weaver::requestAbort_p()
386{
387 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
388 for (int i = 0; i < d()->inventory.size(); ++i) {
389 d()->inventory[i]->requestAbort();
390 }
391}
392
393/** @brief Adjust the inventory size.
394 *
395 * Requires that the mutex is being held when called.
396 *
397 * This method creates threads on demand. Threads in the inventory
398 * are not created upon construction of the WeaverImpl object, but
399 * when jobs are queued. This avoids costly delays on the application
400 * startup time. Threads are created when the inventory size is under
401 * inventoryMin and new jobs are queued.
402 */
404{
405 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
406 // number of threads that can be created:
407 const int reserve = d()->inventoryMax - d()->inventory.count();
408
409 if (reserve > 0) {
410 for (int i = 0; i < qMin(reserve, numberOfNewJobs); ++i) {
412 th->moveToThread(th); // be sane from the start
413 d()->inventory.append(th);
414 th->start();
415 d()->createdThreads.ref();
416 TWDEBUG(2,
417 "WeaverImpl::adjustInventory: thread created, "
418 "%i threads in inventory.\n",
419 currentNumberOfThreads_p());
420 }
421 }
422}
423
424Private::Weaver_Private *Weaver::d()
425{
426 return reinterpret_cast<Private::Weaver_Private *>(QueueSignals::d());
427}
428
429const Private::Weaver_Private *Weaver::d() const
430{
431 return reinterpret_cast<const Private::Weaver_Private *>(QueueSignals::d());
432}
433
434/** @brief Factory method to create the threads.
435 *
436 * Overload in adapted Weaver implementations.
437 */
439{
440 return new Thread(this);
441}
442
443/** @brief Increment the count of active threads. */
448
449/** brief Decrement the count of active threads. */
451{
453 // the done job could have freed another set of jobs, and we do not know how
454 // many - therefore we need to wake all threads:
455 d()->jobFinished.wakeAll();
456}
457
458/** @brief Adjust active thread count.
459 *
460 * This is a helper function for incActiveThreadCount and decActiveThreadCount.
461 */
463{
464 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
465 d()->active += diff;
466 TWDEBUG(4,
467 "WeaverImpl::adjustActiveThreadCount: %i active threads (%i jobs"
468 " in queue).\n",
469 d()->active,
470 queueLength_p());
471
472 if (d()->assignments.isEmpty() && d()->active == 0) {
473 P_ASSERT(diff < 0); // cannot reach zero otherwise
474 Q_EMIT(finished());
475 }
476}
477
478/** @brief Returns the number of active threads.
479 *
480 * Threads are active if they process a job. Requires that the mutex is being held when called.
481 */
483{
484 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
485 return d()->active;
486}
487
488/** @brief Called from a new thread when entering the run method. */
490{
491 d()->semaphore.release(1);
493}
494
495/** @brief Take the first available job out of the queue and return it.
496 *
497 * The job will be removed from the queue (therefore, take). Only jobs that have no unresolved dependencies
498 * are considered available. If only jobs that depend on other unfinished jobs are in the queue, this method
499 * blocks on m_jobAvailable.
500 *
501 * This method will enter suspended state if the active thread count is now zero and
502 * suspendIfAllThreadsInactive is true.
503 * If justReturning is true, do not assign a new job, just process the completed previous one.
504 */
506{
507 QMutexLocker l(d()->mutex);
508 Q_UNUSED(l);
509 Q_ASSERT(threadWasBusy == false || (threadWasBusy == true && d()->active > 0));
510 TWDEBUG(3, "WeaverImpl::takeFirstAvailableJobOrWait: trying to assign new job to thread %i (%s state).\n", th->id(), qPrintable(state()->stateName()));
511 TWDEBUG(5,
512 "WeaverImpl::takeFirstAvailableJobOrWait: %i active threads, was busy: %s, suspend: %s, assign new job: %s.\n",
514 threadWasBusy ? "yes" : "no",
515 suspendIfInactive ? "yes" : "no",
516 !justReturning ? "yes" : "no");
517 d()->deleteExpiredThreads();
519
520 if (threadWasBusy) {
521 // cleanup and send events:
523 }
524 Q_ASSERT(d()->active >= 0);
525
526 if (suspendIfInactive && d()->active == 0 && state()->stateId() == Suspending) {
527 setState_p(Suspended);
528 return JobPointer();
529 }
530
531 if (state()->stateId() != WorkingHard || justReturning) {
532 return JobPointer();
533 }
534
535 if (state()->stateId() == WorkingHard && d()->inventory.size() > d()->inventoryMax) {
536 const int count = d()->inventory.removeAll(th);
537 Q_ASSERT(count == 1);
538 d()->expiredThreads.append(th);
539 throw AbortThread(QStringLiteral("Inventory size exceeded"));
540 }
541
542 JobPointer next;
543 for (int index = 0; index < d()->assignments.size(); ++index) {
544 const JobPointer &candidate = d()->assignments.at(index);
545 if (d()->canBeExecuted(candidate)) {
546 next = candidate;
547 d()->assignments.removeAt(index);
548 break;
549 }
550 }
551 if (next) {
553 TWDEBUG(3,
554 "WeaverImpl::takeFirstAvailableJobOrWait: job %p assigned to thread %i (%s state).\n",
555 next.data(),
556 th->id(),
557 qPrintable(state()->stateName()));
558 return next;
559 }
560
562 return JobPointer();
563}
564
565/** @brief Assign a job to the calling thread.
566 *
567 * This is supposed to be called from the Thread objects in the inventory. Do not call this method from
568 * your code.
569 * Returns 0 if the weaver is shutting down, telling the calling thread to finish and exit. If no jobs are
570 * available and shut down is not in progress, the calling thread is suspended until either condition is met.
571 * @param wasBusy True if the thread is returning from processing a job
572 */
574{
575 return state()->applyForWork(th, wasBusy);
576}
577
578/** @brief Wait for a job to become available. */
580{
581 state()->waitForAvailableJob(th);
582}
583
584/** @brief Blocks the calling thread until jobs can be assigned. */
591
592/** @brief Blocks the calling thread until jobs can be assigned.
593 *
594 * The mutex must be held when calling this method.
595 */
597{
598 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
599 TWDEBUG(4, "WeaverImpl::blockThreadUntilJobsAreBeingAssigned_locked: thread %i blocked (%s state).\n", th->id(), qPrintable(state()->stateName()));
601 d()->jobAvailable.wait(d()->mutex);
602 TWDEBUG(4, "WeaverImpl::blockThreadUntilJobsAreBeingAssigned_locked: thread %i resumed (%s state).\n", th->id(), qPrintable(state()->stateName()));
603}
604
605#include "moc_weaver.cpp"
DestructedState is only active after the thread have been destroyed by the destructor,...
InConstructionState handles the calls to the Weaver object until the constructor has finished.
bool isFinished() const override
Returns true if the jobs's execute method finished.
Definition job.cpp:188
int priority() const override
The queueing priority of the job.
Definition job.cpp:97
void requestAbort() override
Abort the execution of the job.
Definition job.cpp:118
Lambda is a template that takes any type on which operator() is available, and executes it in run().
Definition lambda.h:20
void finished()
Emitted when the Queue has completed all jobs currently queued.
void suspended()
The Queue has been suspended.
void stateChanged(ThreadWeaver::State *)
Emitted when the processing state of the Queue has changed.
ShuttingDownState is enabled when the Weaver destructor is entered.
We use a State pattern to handle the system state in ThreadWeaver.
Definition state.h:56
virtual StateId stateId() const =0
The state Id.
In SuspendedState, jobs are queued, but will not be executed.
SuspendingState is the state after suspend() has been called, but before all threads finished executi...
Thread represents a worker thread in a Queue's inventory.
Definition thread.h:28
int queueLength() const override
Returns the number of pending jobs.
Definition weaver.cpp:360
void enqueue(const QList< JobPointer > &jobs) override
Queue a vector of jobs.
Definition weaver.cpp:203
void finish() override
Finish all queued operations, then return.
Definition weaver.cpp:284
void decActiveThreadCount()
brief Decrement the count of active threads.
Definition weaver.cpp:450
void threadEnteredRun(Thread *thread)
Called from a new thread when entering the run method.
Definition weaver.cpp:489
virtual Thread * createThread()
Factory method to create the threads.
Definition weaver.cpp:438
bool isEmpty() const override
Is the queue empty? The queue is empty if no more jobs are queued.
Definition weaver.cpp:334
void incActiveThreadCount()
Increment the count of active threads.
Definition weaver.cpp:444
void adjustInventory(int noOfNewJobs)
Adjust the inventory size.
Definition weaver.cpp:403
void threadSuspended(ThreadWeaver::Thread *)
A thread has been suspended.
void blockThreadUntilJobsAreBeingAssigned_locked(Thread *th)
Blocks the calling thread until jobs can be assigned.
Definition weaver.cpp:596
void reschedule() override
Reschedule the jobs in the queue.
Definition weaver.cpp:380
void shutDown() override
Enter Destructed state.
Definition weaver.cpp:64
const State * state() const override
Return the state of the weaver object.
Definition weaver.cpp:148
void adjustActiveThreadCount(int diff)
Adjust active thread count.
Definition weaver.cpp:462
JobPointer takeFirstAvailableJobOrSuspendOrWait(Thread *th, bool threadWasBusy, bool suspendIfAllThreadsInactive, bool justReturning)
Take the first available job out of the queue and return it.
Definition weaver.cpp:505
~Weaver() override
Destructs a Weaver object.
Definition weaver.cpp:55
int activeThreadCount()
Returns the number of active threads.
Definition weaver.cpp:482
void dequeue() override
Remove all queued jobs.
Definition weaver.cpp:266
void waitForAvailableJob(Thread *th) override
Wait for a job to become available.
Definition weaver.cpp:579
int maximumNumberOfThreads() const override
Get the maximum number of threads this Weaver may start.
Definition weaver.cpp:177
void threadExited(ThreadWeaver::Thread *)
A thread has exited.
void setState(StateId)
Set the Weaver state.
Definition weaver.cpp:126
void suspend() override
Suspend job execution.
Definition weaver.cpp:310
int currentNumberOfThreads() const override
Returns the current number of threads in the inventory.
Definition weaver.cpp:190
void setMaximumNumberOfThreads(int cap) override
Set the maximum number of threads this Weaver object may start.
Definition weaver.cpp:158
Weaver(QObject *parent=nullptr)
Constructs a Weaver object.
Definition weaver.cpp:36
bool isIdle() const override
Is the weaver idle? The weaver is idle if no jobs are queued and no jobs are processed by the threads...
Definition weaver.cpp:347
void threadStarted(ThreadWeaver::Thread *)
A Thread has been created.
void blockThreadUntilJobsAreBeingAssigned(Thread *th)
Blocks the calling thread until jobs can be assigned.
Definition weaver.cpp:585
JobPointer applyForWork(Thread *thread, bool wasBusy) override
Assign a job to the calling thread.
Definition weaver.cpp:573
void resume() override
Resume job queueing.
Definition weaver.cpp:322
void requestAbort() override
Request aborts of the currently executed jobs.
Definition weaver.cpp:373
Q_EMITQ_EMIT
QThread * thread() const const
T * data() const const
QThread * currentThread()
This file is part of the KDE documentation.
Documentation copyright © 1996-2024 The KDE developers.
Generated on Tue Mar 26 2024 11:14:31 by doxygen 1.10.0 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.