KASync

job_impl.h
1 /*
2  SPDX-FileCopyrightText: 2014-2015 Daniel Vrátil <[email protected]>
3  SPDX-FileCopyrightText: 2015-2016 Daniel Vrátil <[email protected]>
4  SPDX-FileCopyrightText: 2016 Christian Mollekopf <[email protected]>
5 
6  SPDX-License-Identifier: LGPL-2.0-or-later
7 */
8 
9 #ifndef KASYNC_JOB_IMPL_H
10 #define KASYNC_JOB_IMPL_H
11 
12 #include "async.h"
13 #include "traits_p.h"
14 
15 #include <QTimer>
16 
17 //@cond PRIVATE
18 
19 namespace KAsync
20 {
21 
22 template<typename Out, typename ... In>
23 template<typename ... InOther>
24 Job<Out, In ...>::operator std::conditional_t<std::is_void<OutType>::value, IncompleteType, Job<void>> ()
25 {
26  return thenImpl<void, InOther ...>({JobContinuation<void, InOther ...>([](InOther ...){ return KAsync::null<void>(); })}, {});
27 }
28 
29 template<typename Out, typename ... In>
30 template<typename OutOther, typename ... InOther>
31 Job<OutOther, In ...> Job<Out, In ...>::thenImpl(Private::ContinuationHolder<OutOther, InOther ...> workHelper,
32  Private::ExecutionFlag execFlag) const
33 {
34  thenInvariants<InOther ...>();
35  return Job<OutOther, In ...>(QSharedPointer<Private::Executor<OutOther, InOther ...>>::create(
36  std::forward<Private::ContinuationHolder<OutOther, InOther ...>>(workHelper), mExecutor, execFlag));
37 }
38 
39 template<typename Out, typename ... In>
40 template<typename OutOther, typename ... InOther>
41 Job<OutOther, In ...> Job<Out, In ...>::then(const Job<OutOther, InOther ...> &job) const
42 {
43  thenInvariants<InOther ...>();
44  auto executor = job.mExecutor;
45  executor->prepend(mExecutor);
46  return Job<OutOther, In ...>(executor);
47 }
48 
49 template<typename Out, typename ... In>
50 Job<Out, In ...> Job<Out, In ...>::onError(SyncErrorContinuation<void> &&errorFunc) const
51 {
52  return Job<Out, In...>(QSharedPointer<Private::Executor<Out, Out>>::create(
53  // Extra indirection to allow propagating the result of a previous future when no
54  // error occurs
55  Private::ContinuationHolder<Out, Out>([errorFunc = std::move(errorFunc)](const Error &error, const Out &val) {
56  errorFunc(error);
57  return val;
58  }), mExecutor, Private::ExecutionFlag::ErrorCase));
59 }
60 
61 template<> // Specialize for void jobs
62 inline Job<void> Job<void>::onError(SyncErrorContinuation<void> &&errorFunc) const
63 {
64  return Job<void>(QSharedPointer<Private::Executor<void>>::create(
65  Private::ContinuationHolder<void>(std::forward<SyncErrorContinuation<void>>(errorFunc)),
66  mExecutor, Private::ExecutionFlag::ErrorCase));
67 }
68 
69 template<typename Out, typename ... In>
70 template<typename FirstIn>
72 {
73  // Inject a fake sync executor that will return the initial value
74  Private::ExecutorBasePtr first = mExecutor;
75  while (first->mPrev) {
76  first = first->mPrev;
77  }
78 
79  first->mPrev = QSharedPointer<Private::Executor<FirstIn>>::create(
80  Private::ContinuationHolder<FirstIn>([val = std::move(in)](Future<FirstIn> &future) {
81  future.setResult(val);
82  }));
83 
84  auto result = exec();
85  // Remove the injected executor
86  first->mPrev.reset();
87  return result;
88 }
89 
90 template<typename Out, typename ... In>
92 {
93  Private::ExecutionPtr execution = mExecutor->exec(mExecutor, Private::ExecutionContext::Ptr::create());
94  KAsync::Future<Out> result = *execution->result<Out>();
95 
96  return result;
97 }
98 
99 template<typename Out, typename ... In>
100 Job<Out, In ...>::Job(Private::ExecutorBasePtr executor)
101  : JobBase(executor)
102 {}
103 
104 template<typename Out, typename ... In>
105 Job<Out, In ...>::Job(JobContinuation<Out, In ...> &&func)
106  : JobBase(new Private::Executor<Out, In ...>(std::forward<JobContinuation<Out, In ...>>(func), {}))
107 {
108  qWarning() << "Creating job job";
109  static_assert(sizeof...(In) <= 1, "Only one or zero input parameters are allowed.");
110 }
111 
112 template<typename Out, typename ... In>
113 template<typename OutOther>
114 void Job<Out, In ...>::eachInvariants() const
115 {
116  static_assert(traits::isContainer<Out>::value,
117  "The 'Each' task can only be connected to a job that returns a list or an array.");
118  static_assert(std::is_void<OutOther>::value || traits::isContainer<OutOther>::value,
119  "The result type of 'Each' task must be void, a list or an array.");
120 }
121 
122 template<typename Out, typename ... In>
123 template<typename InOtherFirst, typename ... InOtherTail>
124 void Job<Out, In ...>::thenInvariants() const
125 {
126  static_assert(!std::is_void<Out>::value && (std::is_convertible<Out, InOtherFirst>::value || std::is_base_of<Out, InOtherFirst>::value),
127  "The return type of previous task must be compatible with input type of this task");
128 }
129 
130 template<typename Out, typename ... In>
131 template<typename ... InOther>
132 auto Job<Out, In ...>::thenInvariants() const -> std::enable_if_t<(sizeof...(InOther) == 0)>
133 {
134 }
135 
136 template<template<typename> class Container>
137 KAsync::Job<void> waitForCompletion(Container<KAsync::Future<void>> &futures)
138 {
139  struct Context {
140  void removeWatcher(KAsync::FutureWatcher<void> *w)
141  {
142  pending.erase(std::remove_if(pending.begin(), pending.end(), [w](const auto &watcher) {
143  return w == watcher.get();
144  }));
145  }
146 
147  std::vector<std::unique_ptr<KAsync::FutureWatcher<void>>> pending;
148  };
149 
150  return start<Context *>([]() {
151  return new Context();
152  })
153  .template then<Context*, Context*>([futures](Context *context, KAsync::Future<Context *> &future) {
154  for (KAsync::Future<void> subFuture : futures) {
155  if (subFuture.isFinished()) {
156  continue;
157  }
158  // FIXME bind lifetime all watcher to future (repectively the main job
159  auto watcher = std::make_unique<KAsync::FutureWatcher<void>>();
161  [&future, watcher = watcher.get(), context]() {
162  context->removeWatcher(watcher);
163  if (context->pending.empty()) {
164  future.setResult(context);
165  }
166  });
167  watcher->setFuture(subFuture);
168  context->pending.push_back(std::move(watcher));
169  }
170  if (context->pending.empty()) {
171  future.setResult(context);
172  }
173  })
174  .template then<void, Context*>([](Context *context) {
175  delete context;
176  });
177  // .finally<void>([context]() { delete context; });
178 }
179 
180 template<typename List, typename ValueType>
181 Job<void, List> forEach(KAsync::Job<void, ValueType> job)
182 {
183  auto cont = [job] (const List &values) mutable {
186  for (const auto &v : values) {
187  auto future = job
188  .template then<void>([error] (const KAsync::Error &e) {
189  if (e && !*error) {
190  //TODO ideally we would aggregate the errors instead of just using the first one
191  *error = e;
192  }
193  })
194  .exec(v);
195  list.push_back(future);
196  }
197  return waitForCompletion(list)
198  .then<void>([error](KAsync::Future<void> &future) {
199  if (*error) {
200  future.setError(*error);
201  } else {
202  future.setFinished();
203  }
204  });
205  };
206  return Job<void, List>(QSharedPointer<Private::Executor<void, List>>::create(
207  Private::ContinuationHolder<void, List>(JobContinuation<void, List>(std::move(cont))), nullptr, Private::ExecutionFlag::GoodCase));
208 }
209 
210 
211 template<typename List, typename ValueType>
212 Job<void, List> serialForEach(KAsync::Job<void, ValueType> job)
213 {
214  auto cont = [job] (const List &values) mutable {
216  auto serialJob = KAsync::null<void>();
217  for (const auto &value : values) {
218  serialJob = serialJob.then<void>([value, job, error](KAsync::Future<void> &future) {
219  job.template then<void>([&future, error] (const KAsync::Error &e) {
220  if (e && !*error) {
221  //TODO ideally we would aggregate the errors instead of just using the first one
222  *error = e;
223  }
224  future.setFinished();
225  })
226  .exec(value);
227  });
228  }
229  return serialJob
230  .then<void>([error](KAsync::Future<void> &future) {
231  if (*error) {
232  future.setError(*error);
233  } else {
234  future.setFinished();
235  }
236  });
237  };
238  return Job<void, List>(QSharedPointer<Private::Executor<void, List>>::create(
239  Private::ContinuationHolder<void, List>(JobContinuation<void, List>(std::move(cont))), nullptr, Private::ExecutionFlag::GoodCase));
240 }
241 
242 template<typename List, typename ValueType>
243 Job<void, List> forEach(JobContinuation<void, ValueType> &&func)
244 {
245  return forEach<List, ValueType>(KAsync::start<void, ValueType>(std::forward<JobContinuation<void, ValueType>>(func)));
246 }
247 
248 template<typename List, typename ValueType>
249 Job<void, List> serialForEach(JobContinuation<void, ValueType> &&func)
250 {
251  return serialForEach<List, ValueType>(KAsync::start<void, ValueType>(std::forward<JobContinuation<void, ValueType>>(func)));
252 }
253 
254 template<typename Out>
255 Job<Out> null()
256 {
257  return KAsync::start<Out>(
259  future.setFinished();
260  });
261 }
262 
263 template<typename Out>
264 Job<Out> value(Out v)
265 {
266  return KAsync::start<Out>(
267  [val = std::move(v)](KAsync::Future<Out> &future) {
268  future.setResult(val);
269  });
270 }
271 
272 template<typename Out>
273 Job<Out> error(int errorCode, const QString &errorMessage)
274 {
275  return error<Out>({errorCode, errorMessage});
276 }
277 
278 template<typename Out>
279 Job<Out> error(const char *message)
280 {
281  return error<Out>(Error(message));
282 }
283 
284 template<typename Out>
285 Job<Out> error(const Error &error)
286 {
287  return KAsync::start<Out>(
288  [error](KAsync::Future<Out> &future) {
289  future.setError(error);
290  });
291 }
292 
293 inline Job<void> doWhile(const Job<ControlFlowFlag> &body)
294 {
295  return KAsync::start<void>([body] (KAsync::Future<void> &future) {
296  auto job = body.then<void, ControlFlowFlag>([&future, body](const KAsync::Error &error, ControlFlowFlag flag) {
297  if (error) {
298  future.setError(error);
299  future.setFinished();
300  } else if (flag == ControlFlowFlag::Continue) {
301  doWhile(body).then<void>([&future](const KAsync::Error &error) {
302  if (error) {
303  future.setError(error);
304  }
305  future.setFinished();
306  }).exec();
307  } else {
308  future.setFinished();
309  }
310  }).exec();
311  });
312 }
313 
314 inline Job<void> doWhile(const JobContinuation<ControlFlowFlag> &body)
315 {
316  return doWhile(KAsync::start<ControlFlowFlag>([body] {
317  return body();
318  }));
319 }
320 
321 inline Job<void> wait(int delay)
322 {
323  return KAsync::start<void>([delay](KAsync::Future<void> &future) {
324  QTimer::singleShot(delay, [&future]() {
325  future.setFinished();
326  });
327  });
328 }
329 } // namespace KAsync
330 
331 //@endcond
332 
333 #endif // KASYNC_JOB_IMPL_H
QSharedPointer< T > create(Args &&...args)
An Asynchronous job.
Definition: async.h:67
void setFuture(const KAsync::Future< T > &future)
Set future to watch.
Definition: future.h:467
STL namespace.
KGuiItem cont()
QVector< V > values(const QMultiHash< K, V > &c)
Job< Out, In... > onError(SyncErrorContinuation< void > &&errorFunc) const
Shorthand for a job that receives the error only.
void error(QWidget *parent, const QString &text, const QString &caption=QString(), Options options=Notify)
const QList< QKeySequence > & forward()
Job< OutOther, In... > then(const Job< OutOther, InOther... > &job) const
A continuation.
KAsync::Future< Out > exec()
Starts execution of the job chain.
Future is a promise that is used by Job to deliver result of an asynchronous execution.
Definition: future.h:185
Definition: async.h:59
void push_back(const T &value)
KAsync::Future< T > future() const
Returns currently watched future.
Definition: future.h:475
A specialization of Future<T> for tasks that have no (void) result.
Definition: future.h:375
QMetaObject::Connection connect(const QObject *sender, const char *signal, const QObject *receiver, const char *method, Qt::ConnectionType type)
KIOFILEWIDGETS_EXPORT QStringList list(const QString &fileClass)
The FutureWatcher allows monitoring of Job results using signals and slots.
Definition: future.h:445
This file is part of the KDE documentation.
Documentation copyright © 1996-2021 The KDE developers.
Generated on Sun Apr 11 2021 23:07:13 by doxygen 1.8.11 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.