ThreadWeaver

queue.cpp
1 /* -*- C++ -*-
2  The Queue class in ThreadWeaver.
3 
4  SPDX-FileCopyrightText: 2005-2013 Mirko Boehm <[email protected]>
5 
6  SPDX-License-Identifier: LGPL-2.0-or-later
7 */
8 
9 #include <QCoreApplication>
10 #include <QMutex>
11 #include <QVector>
12 
13 #include "queue.h"
14 #include "weaver.h"
15 
16 using namespace ThreadWeaver;
17 
18 namespace
19 {
20 static Queue::GlobalQueueFactory *globalQueueFactory;
21 }
22 
23 class Q_DECL_HIDDEN Queue::Private
24 {
25 public:
26  Private(Queue *q, QueueSignals *queue)
27  : implementation(queue)
28  {
29  Q_ASSERT_X(qApp != nullptr, Q_FUNC_INFO, "Cannot create global ThreadWeaver instance before QApplication!");
30  Q_ASSERT(queue);
31  queue->setParent(q);
32  q->connect(implementation, SIGNAL(finished()), SIGNAL(finished()));
33  q->connect(implementation, SIGNAL(suspended()), SIGNAL(suspended()));
34  }
35 
36  QueueSignals *implementation;
37  void init(QueueSignals *implementation);
38 };
39 
40 /** @brief Construct a Queue. */
42  : QueueSignals(parent)
43  , d(new Private(this, new Weaver))
44 {
45 }
46 
47 /** @brief Construct a Queue, specifying the QueueSignals implementation to use.
48  *
49  * The QueueSignals instance is usually a Weaver object, which may be customized for specific
50  * application needs. The Weaver instance will take ownership of the implementation object and
51  * deletes it when destructed.
52  * @see Weaver
53  * @see GlobalQueueFactory
54  */
55 Queue::Queue(QueueSignals *implementation, QObject *parent)
56  : QueueSignals(parent)
57  , d(new Private(this, implementation))
58 {
59 }
60 
61 /** @brief Destruct the Queue object.
62  *
63  * If the queue is not already in Destructed state, the destructor will call shutDown() to make sure
64  * enqueued jobs are completed and the queue is idle.
65  * The queue implementation will be destroyed.
66  * @see shutDown()
67  * @see ThreadWeaver::Destructed
68  */
70 {
71  if (d->implementation->state()->stateId() != Destructed) {
72  d->implementation->shutDown();
73  }
74  delete d->implementation;
75  delete d;
76 }
77 
78 /** @brief Create a QueueStream to enqueue jobs into this queue. */
80 {
81  return QueueStream(this);
82 }
83 
85 {
86  d->implementation->shutDown();
87 }
88 
89 /** @brief Set the factory object that will create the global queue.
90  *
91  * Once set, the global queue factory will be deleted when the global ThreadWeaver pool is deleted.
92  * The factory object needs to be set before the global ThreadWeaver pool is instantiated. Call this
93  * method before Q(Core)Application is constructed. */
95 {
96  if (globalQueueFactory) {
97  delete globalQueueFactory;
98  }
99  globalQueueFactory = factory;
100 }
101 
102 const State *Queue::state() const
103 {
104  return d->implementation->state();
105 }
106 
107 namespace
108 {
109 class StaticThreadWeaverInstanceGuard : public QObject
110 {
111  Q_OBJECT
112 public:
113  explicit StaticThreadWeaverInstanceGuard(QAtomicPointer<Queue> &instance, QCoreApplication *app)
114  : QObject(app)
115  , instance_(instance)
116  {
117  Q_ASSERT_X(app != nullptr, Q_FUNC_INFO, "Calling ThreadWeaver::Weaver::instance() requires a QCoreApplication!");
118  QObject *impl = instance.loadRelaxed()->findChild<QueueSignals *>();
119  Q_ASSERT(impl);
120  impl->setObjectName(QStringLiteral("GlobalQueue"));
121  qAddPostRoutine(shutDownGlobalQueue);
122  }
123 
124  ~StaticThreadWeaverInstanceGuard() override
125  {
126  instance_.fetchAndStoreOrdered(nullptr);
127  delete globalQueueFactory;
128  globalQueueFactory = nullptr;
129  }
130 
131 private:
132  static void shutDownGlobalQueue()
133  {
135  Q_ASSERT(Queue::instance()->state()->stateId() == Destructed);
136  }
137 
138  QAtomicPointer<Queue> &instance_;
139 };
140 
141 }
142 
143 /** @brief Access the application-global Queue.
144  *
145  * In some cases, the global queue is sufficient for the applications purpose. The global queue will only be
146  * created if this method is actually called in the lifetime of the application.
147  *
148  * The Q(Core)Application object must exist when instance() is called for the first time.
149  * The global queue will be destroyed when Q(Core)Application is destructed. After that, the instance() method
150  * returns zero.
151  */
153 {
154  static QAtomicPointer<Queue> s_instance(globalQueueFactory ? globalQueueFactory->create(qApp) : new Queue(qApp));
155  // Order is of importance here:
156  // When s_instanceGuard is destructed (first, before s_instance), it sets the value of s_instance to zero. Next, qApp will delete
157  // the object s_instance pointed to.
158  static StaticThreadWeaverInstanceGuard *s_instanceGuard = new StaticThreadWeaverInstanceGuard(s_instance, qApp);
159  Q_UNUSED(s_instanceGuard);
160  Q_ASSERT_X(s_instance.loadRelaxed() == nullptr //
161  || s_instance.loadRelaxed()->thread() == QCoreApplication::instance()->thread(),
162  Q_FUNC_INFO,
163  "The global ThreadWeaver queue needs to be instantiated (accessed first) from the main thread!");
164  return s_instance.loadAcquire();
165 }
166 
168 {
169  d->implementation->enqueue(jobs);
170 }
171 
172 void Queue::enqueue(const JobPointer &job)
173 {
174  enqueue(QVector<JobPointer>() << job);
175 }
176 
177 bool Queue::dequeue(const JobPointer &job)
178 {
179  return d->implementation->dequeue(job);
180 }
181 
183 {
184  return d->implementation->dequeue();
185 }
186 
188 {
189  return d->implementation->finish();
190 }
191 
193 {
194  return d->implementation->suspend();
195 }
196 
198 {
199  return d->implementation->resume();
200 }
201 
202 bool Queue::isEmpty() const
203 {
204  return d->implementation->isEmpty();
205 }
206 
207 bool Queue::isIdle() const
208 {
209  return d->implementation->isIdle();
210 }
211 
213 {
214  return d->implementation->queueLength();
215 }
216 
218 {
219  d->implementation->setMaximumNumberOfThreads(cap);
220 }
221 
223 {
224  return d->implementation->currentNumberOfThreads();
225 }
226 
228 {
229  return d->implementation->maximumNumberOfThreads();
230 }
231 
233 {
234  d->implementation->requestAbort();
235 }
236 
238 {
239  d->implementation->reschedule();
240 }
241 
242 #include "queue.moc"
void resume() override
Resume job queueing.
Definition: queue.cpp:197
QueueStream stream()
Create a QueueStream to enqueue jobs into this queue.
Definition: queue.cpp:79
static ThreadWeaver::Queue * instance()
Access the application-global Queue.
Definition: queue.cpp:152
Interface for the global queue factory.
Definition: queue.h:69
const State * state() const override
Return the state of the weaver object.
Definition: queue.cpp:102
T * loadAcquire() const const
Queue(QObject *parent=nullptr)
Construct a Queue.
Definition: queue.cpp:41
QCA_EXPORT void init()
T * loadRelaxed() const const
void reschedule() override
Reschedule the jobs in the queue.
Definition: queue.cpp:237
QueueStream implements a stream based API to access ThreadWeaver queues.
Definition: queuestream.h:21
QMetaObject::Connection connect(const QObject *sender, const char *signal, const QObject *receiver, const char *method, Qt::ConnectionType type)
QThread * thread() const const
int currentNumberOfThreads() const override
Returns the current number of threads in the inventory.
Definition: queue.cpp:222
A Weaver manages worker threads.
Definition: weaver.h:34
void enqueue(const QVector< JobPointer > &jobs) override
Queue a vector of jobs.
Definition: queue.cpp:167
void suspend() override
Suspend job execution.
Definition: queue.cpp:192
QCoreApplication * instance()
void requestAbort() override
Request aborts of the currently executed jobs.
Definition: queue.cpp:232
QueueSignals declares the Qt signals shared by the Queue and Weaver classes.
Definition: queuesignals.h:23
We use a State pattern to handle the system state in ThreadWeaver.
Definition: state.h:55
~Queue() override
Destruct the Queue object.
Definition: queue.cpp:69
void shutDown() override
Shut down the queue.
Definition: queue.cpp:84
void setMaximumNumberOfThreads(int cap) override
Set the maximum number of threads this Weaver object may start.
Definition: queue.cpp:217
bool isEmpty() const override
Is the queue empty? The queue is empty if no more jobs are queued.
Definition: queue.cpp:202
void finish() override
Finish all queued operations, then return.
Definition: queue.cpp:187
void dequeue() override
Remove all queued jobs.
Definition: queue.cpp:182
void setObjectName(const QString &name)
void setParent(QObject *parent)
Queue implements a ThreadWeaver job queue.
Definition: queue.h:35
int queueLength() const override
Returns the number of pending jobs.
Definition: queue.cpp:212
int maximumNumberOfThreads() const override
Get the maximum number of threads this Weaver may start.
Definition: queue.cpp:227
bool isIdle() const override
Is the weaver idle? The weaver is idle if no jobs are queued and no jobs are processed by the threads...
Definition: queue.cpp:207
static void setGlobalQueueFactory(GlobalQueueFactory *factory)
Set the factory object that will create the global queue.
Definition: queue.cpp:94
This file is part of the KDE documentation.
Documentation copyright © 1996-2023 The KDE developers.
Generated on Tue Feb 7 2023 04:09:38 by doxygen 1.8.17 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.