ThreadWeaver

job.cpp
1 /* -*- C++ -*-
2  This file implements the Job class.
3 
4  SPDX-FileCopyrightText: 2004-2013 Mirko Boehm <mirko@kde.org>
5 
6  SPDX-License-Identifier: LGPL-2.0-or-later
7 
8  $Id: Job.cpp 20 2005-08-08 21:02:51Z mirko $
9 */
10 
11 #include "job.h"
12 #include "job_p.h"
13 
14 #include "debuggingaids.h"
15 #include "thread.h"
16 #include <QAtomicInt>
17 #include <QAtomicPointer>
18 #include <QList>
19 #include <QMutex>
20 
21 #include "dependencypolicy.h"
22 #include "exception.h"
23 #include "executewrapper_p.h"
24 #include "executor_p.h"
25 #include "managedjobpointer.h"
26 #include "queuepolicy.h"
27 
28 namespace ThreadWeaver
29 {
31  : d_(new Private::Job_Private())
32 {
33 #if !defined(NDEBUG)
34  d()->debugExecuteWrapper.wrap(setExecutor(&(d()->debugExecuteWrapper)));
35 #endif
36  d()->status.storeRelease(Status_New);
37 }
38 
39 Job::Job(Private::Job_Private *d__)
40  : d_(d__)
41 {
42 #if !defined(NDEBUG)
43  d()->debugExecuteWrapper.wrap(setExecutor(&(d()->debugExecuteWrapper)));
44 #endif
45  d()->status.storeRelease(Status_New);
46 }
47 
49 {
50  for (int index = 0; index < d()->queuePolicies.size(); ++index) {
51  d()->queuePolicies.at(index)->destructed(this);
52  }
53  delete d_;
54 }
55 
56 void Job::execute(const JobPointer &self, Thread *th)
57 {
58  Executor *executor = d()->executor.loadAcquire();
59  Q_ASSERT(executor); // may never be unset!
60  Q_ASSERT(self);
61  executor->begin(self, th);
62  self->setStatus(Status_Running);
63  try {
64  executor->execute(self, th);
65  if (self->status() == Status_Running) {
66  self->setStatus(Status_Success);
67  }
68  } catch (JobAborted &) {
69  self->setStatus(Status_Aborted);
70  } catch (JobFailed &) {
71  self->setStatus(Status_Failed);
72  } catch (AbortThread &) {
73  throw;
74  } catch (...) {
75  TWDEBUG(0, "Uncaught exception in Job %p, aborting.", self.data());
76  throw;
77  }
78  Q_ASSERT(self->status() > Status_Running);
79  executor->end(self, th);
80 }
81 
83 {
84  execute(ManagedJobPointer<Job>(this), nullptr);
85 }
86 
87 Executor *Job::setExecutor(Executor *executor)
88 {
89  return d()->executor.fetchAndStoreOrdered(executor == nullptr ? &Private::defaultExecutor : executor);
90 }
91 
92 Executor *Job::executor() const
93 {
94  return d()->executor.loadAcquire();
95 }
96 
97 int Job::priority() const
98 {
99  return 0;
100 }
101 
102 void Job::setStatus(JobInterface::Status status)
103 {
104  d()->status.storeRelease(status);
105 }
106 
107 JobInterface::Status Job::status() const
108 {
109  // since status is set only through setStatus, this should be safe:
110  return static_cast<Status>(d()->status.loadAcquire());
111 }
112 
113 bool Job::success() const
114 {
115  return d()->status.loadAcquire() == Status_Success;
116 }
117 
119 {
120  d()->shouldAbort = true;
121 }
122 
124 {
125 }
126 
127 void Job::defaultEnd(const JobPointer &job, Thread *)
128 {
129  d()->handleFinish(job);
130  d()->freeQueuePolicyResources(job);
131 }
132 
133 void Job::aboutToBeQueued(QueueAPI *api)
134 {
135  QMutexLocker l(mutex());
136  Q_UNUSED(l);
138 }
139 
141 {
142 }
143 
144 void Job::aboutToBeDequeued(QueueAPI *api)
145 {
146  QMutexLocker l(mutex());
147  Q_UNUSED(l);
149 }
150 
152 {
153 }
154 
156 {
157  Q_ASSERT(!mutex()->tryLock());
158  if (!d()->queuePolicies.contains(policy)) {
159  d()->queuePolicies.append(policy);
160  }
161 }
162 
164 {
165  Q_ASSERT(!mutex()->tryLock());
166  int index = d()->queuePolicies.indexOf(policy);
167  if (index != -1) {
168  d()->queuePolicies.removeAt(index);
169  }
170 }
171 
173 {
174  Q_ASSERT(!mutex()->tryLock());
175  return d()->queuePolicies;
176 }
177 
178 Private::Job_Private *Job::d()
179 {
180  return d_;
181 }
182 
183 const Private::Job_Private *Job::d() const
184 {
185  return d_;
186 }
187 
188 bool Job::isFinished() const
189 {
190  const Status s = status();
191  return s == Status_Success || s == Status_Failed || s == Status_Aborted;
192 }
193 
195 {
196  return &(d()->mutex);
197 }
198 
199 bool Job::shouldAbort() const
200 {
201  return d()->shouldAbort;
202 }
203 
204 void Job::onFinish(const std::function<void(const JobInterface &job)> &lambda)
205 {
206  QMutexLocker l(mutex());
207  d()->finishHandlers << lambda;
208 }
209 
210 }
211 
212 #include "managedjobpointer.h"
void execute(const JobPointer &job, Thread *) override
Perform the job.
Definition: job.cpp:56
int priority() const override
The queueing priority of the job.
Definition: job.cpp:97
void onFinish(const std::function< void(const JobInterface &job)> &lambda)
Add handler that will be invoked once job has finished.
Definition: job.cpp:204
void defaultBegin(const JobPointer &job, Thread *thread) override
Perform standard tasks before starting the execution of a job.
Definition: job.cpp:123
void aboutToBeDequeued_locked(QueueAPI *api) override
Called from aboutToBeDequeued() while the mutex is being held.
Definition: job.cpp:151
Executor * setExecutor(Executor *executor) override
Set the Executor object that is supposed to run the job.
Definition: job.cpp:87
void removeQueuePolicy(QueuePolicy *) override
Remove a queue policy from this job.
Definition: job.cpp:163
QList< QueuePolicy * > queuePolicies() const override
Return the queue policies assigned to this Job.
Definition: job.cpp:172
void defaultEnd(const JobPointer &job, Thread *thread) override
Perform standard task after the execution of a job.
Definition: job.cpp:127
Q_SCRIPTABLE CaptureState status()
~Job() override
Destructor.
Definition: job.cpp:48
bool shouldAbort() const
Whether Job should abort itself.
Definition: job.cpp:199
void aboutToBeQueued(QueueAPI *api) override
The job is about to be added to the weaver's job queue.
Definition: job.cpp:133
void setStatus(Status) override
Set the status of the Job.
Definition: job.cpp:102
void aboutToBeDequeued(QueueAPI *api) override
This Job is about the be dequeued from the weaver's job queue.
Definition: job.cpp:144
void requestAbort() override
Abort the execution of the job.
Definition: job.cpp:118
Status status() const override
The status of the job.
Definition: job.cpp:107
void blockingExecute() override
Perform the job synchronously in the current thread.
Definition: job.cpp:82
bool isFinished() const override
Returns true if the jobs's execute method finished.
Definition: job.cpp:188
QueuePolicy is an interface for customizations of the queueing behaviour of jobs.
Definition: queuepolicy.h:38
Thread represents a worker thread in a Queue's inventory.
Definition: thread.h:27
void aboutToBeQueued_locked(QueueAPI *api) override
Called from aboutToBeQueued() while the mutex is being held.
Definition: job.cpp:140
Executor * executor() const override
Returns the executor currently set on the Job.
Definition: job.cpp:92
void assignQueuePolicy(QueuePolicy *) override
Assign a queue policy.
Definition: job.cpp:155
bool success() const override
Return whether the Job finished successfully or not.
Definition: job.cpp:113
QMutex * mutex() const override
The mutex used to protect this job.
Definition: job.cpp:194
Job()
Construct a Job.
Definition: job.cpp:30
This file is part of the KDE documentation.
Documentation copyright © 1996-2024 The KDE developers.
Generated on Thu Feb 15 2024 04:07:51 by doxygen 1.8.17 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.