ThreadWeaver

job.cpp
1/* -*- C++ -*-
2 This file implements the Job class.
3
4 SPDX-FileCopyrightText: 2004-2013 Mirko Boehm <mirko@kde.org>
5
6 SPDX-License-Identifier: LGPL-2.0-or-later
7
8 $Id: Job.cpp 20 2005-08-08 21:02:51Z mirko $
9*/
10
11#include "job.h"
12#include "job_p.h"
13
14#include "debuggingaids.h"
15#include "thread.h"
16#include <QAtomicInt>
17#include <QAtomicPointer>
18#include <QList>
19#include <QMutex>
20
21#include "dependencypolicy.h"
22#include "exception.h"
23#include "executewrapper_p.h"
24#include "executor_p.h"
25#include "managedjobpointer.h"
26#include "queuepolicy.h"
27
28namespace ThreadWeaver
29{
31 : d_(new Private::Job_Private())
32{
33#if !defined(NDEBUG)
34 d()->debugExecuteWrapper.wrap(setExecutor(&(d()->debugExecuteWrapper)));
35#endif
36 d()->status.storeRelease(Status_New);
37}
38
39Job::Job(Private::Job_Private *d__)
40 : d_(d__)
41{
42#if !defined(NDEBUG)
43 d()->debugExecuteWrapper.wrap(setExecutor(&(d()->debugExecuteWrapper)));
44#endif
45 d()->status.storeRelease(Status_New);
46}
47
49{
50 for (int index = 0; index < d()->queuePolicies.size(); ++index) {
51 d()->queuePolicies.at(index)->destructed(this);
52 }
53 delete d_;
54}
55
56void Job::execute(const JobPointer &self, Thread *th)
57{
58 Executor *executor = d()->executor.loadAcquire();
59 Q_ASSERT(executor); // may never be unset!
60 Q_ASSERT(self);
61 executor->begin(self, th);
62 self->setStatus(Status_Running);
63 try {
64 executor->execute(self, th);
65 if (self->status() == Status_Running) {
66 self->setStatus(Status_Success);
67 }
68 } catch (JobAborted &) {
69 self->setStatus(Status_Aborted);
70 } catch (JobFailed &) {
71 self->setStatus(Status_Failed);
72 } catch (AbortThread &) {
73 throw;
74 } catch (...) {
75 TWDEBUG(0, "Uncaught exception in Job %p, aborting.", self.data());
76 throw;
77 }
78 Q_ASSERT(self->status() > Status_Running);
79 executor->end(self, th);
80}
81
83{
84 execute(ManagedJobPointer<Job>(this), nullptr);
85}
86
87Executor *Job::setExecutor(Executor *executor)
88{
89 return d()->executor.fetchAndStoreOrdered(executor == nullptr ? &Private::defaultExecutor : executor);
90}
91
92Executor *Job::executor() const
93{
94 return d()->executor.loadAcquire();
95}
96
97int Job::priority() const
98{
99 return 0;
100}
101
102void Job::setStatus(JobInterface::Status status)
103{
104 d()->status.storeRelease(status);
105}
106
107JobInterface::Status Job::status() const
108{
109 // since status is set only through setStatus, this should be safe:
110 return static_cast<Status>(d()->status.loadAcquire());
111}
112
113bool Job::success() const
114{
115 return d()->status.loadAcquire() == Status_Success;
116}
117
119{
120 d()->shouldAbort = true;
121}
122
124{
125}
126
128{
129 d()->handleFinish(job);
130 d()->freeQueuePolicyResources(job);
131}
132
133void Job::aboutToBeQueued(QueueAPI *api)
134{
135 QMutexLocker l(mutex());
136 Q_UNUSED(l);
138}
139
141{
142}
143
144void Job::aboutToBeDequeued(QueueAPI *api)
145{
146 QMutexLocker l(mutex());
147 Q_UNUSED(l);
149}
150
152{
153}
154
156{
157 Q_ASSERT(!mutex()->tryLock());
158 if (!d()->queuePolicies.contains(policy)) {
159 d()->queuePolicies.append(policy);
160 }
161}
162
164{
165 Q_ASSERT(!mutex()->tryLock());
166 int index = d()->queuePolicies.indexOf(policy);
167 if (index != -1) {
168 d()->queuePolicies.removeAt(index);
169 }
170}
171
173{
174 Q_ASSERT(!mutex()->tryLock());
175 return d()->queuePolicies;
176}
177
178Private::Job_Private *Job::d()
179{
180 return d_;
181}
182
183const Private::Job_Private *Job::d() const
184{
185 return d_;
186}
187
188bool Job::isFinished() const
189{
190 const Status s = status();
191 return s == Status_Success || s == Status_Failed || s == Status_Aborted;
192}
193
195{
196 return &(d()->mutex);
197}
198
200{
201 return d()->shouldAbort;
202}
203
204void Job::onFinish(const std::function<void(const JobInterface &job)> &lambda)
205{
206 QMutexLocker l(mutex());
207 d()->finishHandlers << lambda;
208}
209
210}
211
212#include "managedjobpointer.h"
void defaultBegin(const JobPointer &job, Thread *thread) override
Perform standard tasks before starting the execution of a job.
Definition job.cpp:123
void blockingExecute() override
Perform the job synchronously in the current thread.
Definition job.cpp:82
bool shouldAbort() const
Whether Job should abort itself.
Definition job.cpp:199
void onFinish(const std::function< void(const JobInterface &job)> &lambda)
Add handler that will be invoked once job has finished.
Definition job.cpp:204
void aboutToBeDequeued(QueueAPI *api) override
This Job is about the be dequeued from the weaver's job queue.
Definition job.cpp:144
Executor * setExecutor(Executor *executor) override
Set the Executor object that is supposed to run the job.
Definition job.cpp:87
void assignQueuePolicy(QueuePolicy *) override
Assign a queue policy.
Definition job.cpp:155
void removeQueuePolicy(QueuePolicy *) override
Remove a queue policy from this job.
Definition job.cpp:163
bool isFinished() const override
Returns true if the jobs's execute method finished.
Definition job.cpp:188
QMutex * mutex() const override
The mutex used to protect this job.
Definition job.cpp:194
void aboutToBeDequeued_locked(QueueAPI *api) override
Called from aboutToBeDequeued() while the mutex is being held.
Definition job.cpp:151
void setStatus(Status) override
Set the status of the Job.
Definition job.cpp:102
Job()
Construct a Job.
Definition job.cpp:30
int priority() const override
The queueing priority of the job.
Definition job.cpp:97
void aboutToBeQueued(QueueAPI *api) override
The job is about to be added to the weaver's job queue.
Definition job.cpp:133
~Job() override
Destructor.
Definition job.cpp:48
void execute(const JobPointer &job, Thread *) override
Perform the job.
Definition job.cpp:56
bool success() const override
Return whether the Job finished successfully or not.
Definition job.cpp:113
Executor * executor() const override
Returns the executor currently set on the Job.
Definition job.cpp:92
void aboutToBeQueued_locked(QueueAPI *api) override
Called from aboutToBeQueued() while the mutex is being held.
Definition job.cpp:140
Status status() const override
The status of the job.
Definition job.cpp:107
QList< QueuePolicy * > queuePolicies() const override
Return the queue policies assigned to this Job.
Definition job.cpp:172
void requestAbort() override
Abort the execution of the job.
Definition job.cpp:118
void defaultEnd(const JobPointer &job, Thread *thread) override
Perform standard task after the execution of a job.
Definition job.cpp:127
QueuePolicy is an interface for customizations of the queueing behaviour of jobs.
Definition queuepolicy.h:39
Thread represents a worker thread in a Queue's inventory.
Definition thread.h:28
Q_SCRIPTABLE CaptureState status()
T * data() const const
This file is part of the KDE documentation.
Documentation copyright © 1996-2024 The KDE developers.
Generated on Fri Nov 8 2024 11:58:32 by doxygen 1.12.0 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.