KASync

job_impl.h
1 /*
2  * Copyright 2014 - 2015 Daniel Vrátil <[email protected]>
3  * Copyright 2015 - 2016 Daniel Vrátil <[email protected]>
4  * Copyright 2016 Christian Mollekopf <mollek[email protected]>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public License as
8  * published by the Free Software Foundation; either version 2 of
9  * the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public License
17  * along with this library. If not, see <http://www.gnu.org/licenses/>.
18  */
19 
20 #ifndef KASYNC_JOB_IMPL_H
21 #define KASYNC_JOB_IMPL_H
22 
23 #include "async.h"
24 #include "traits_p.h"
25 
26 #include <QTimer>
27 
28 //@cond PRIVATE
29 
30 namespace KAsync
31 {
32 
33 template<typename Out, typename ... In>
34 template<typename ... InOther>
35 Job<Out, In ...>::operator std::conditional_t<std::is_void<OutType>::value, IncompleteType, Job<void>> ()
36 {
37  return thenImpl<void, InOther ...>({JobContinuation<void, InOther ...>([](InOther ...){ return KAsync::null<void>(); })}, {});
38 }
39 
40 template<typename Out, typename ... In>
41 template<typename OutOther, typename ... InOther>
42 Job<OutOther, In ...> Job<Out, In ...>::thenImpl(Private::ContinuationHolder<OutOther, InOther ...> workHelper,
43  Private::ExecutionFlag execFlag) const
44 {
45  thenInvariants<InOther ...>();
46  return Job<OutOther, In ...>(QSharedPointer<Private::Executor<OutOther, InOther ...>>::create(
47  std::forward<Private::ContinuationHolder<OutOther, InOther ...>>(workHelper), mExecutor, execFlag));
48 }
49 
50 template<typename Out, typename ... In>
51 template<typename OutOther, typename ... InOther>
52 Job<OutOther, In ...> Job<Out, In ...>::then(const Job<OutOther, InOther ...> &job) const
53 {
54  thenInvariants<InOther ...>();
55  auto executor = job.mExecutor;
56  executor->prepend(mExecutor);
57  return Job<OutOther, In ...>(executor);
58 }
59 
60 template<typename Out, typename ... In>
61 Job<Out, In ...> Job<Out, In ...>::onError(SyncErrorContinuation<void> &&errorFunc) const
62 {
63  return Job<Out, In...>(QSharedPointer<Private::Executor<Out, Out>>::create(
64  // Extra indirection to allow propagating the result of a previous future when no
65  // error occurs
66  Private::ContinuationHolder<Out, Out>([errorFunc = std::move(errorFunc)](const Error &error, const Out &val) {
67  errorFunc(error);
68  return val;
69  }), mExecutor, Private::ExecutionFlag::ErrorCase));
70 }
71 
72 template<> // Specialize for void jobs
73 inline Job<void> Job<void>::onError(SyncErrorContinuation<void> &&errorFunc) const
74 {
75  return Job<void>(QSharedPointer<Private::Executor<void>>::create(
76  Private::ContinuationHolder<void>(std::forward<SyncErrorContinuation<void>>(errorFunc)),
77  mExecutor, Private::ExecutionFlag::ErrorCase));
78 }
79 
80 template<typename Out, typename ... In>
81 template<typename FirstIn>
83 {
84  // Inject a fake sync executor that will return the initial value
85  Private::ExecutorBasePtr first = mExecutor;
86  while (first->mPrev) {
87  first = first->mPrev;
88  }
89 
90  first->mPrev = QSharedPointer<Private::Executor<FirstIn>>::create(
91  Private::ContinuationHolder<FirstIn>([val = std::move(in)](Future<FirstIn> &future) {
92  future.setResult(val);
93  }));
94 
95  auto result = exec();
96  // Remove the injected executor
97  first->mPrev.reset();
98  return result;
99 }
100 
101 template<typename Out, typename ... In>
103 {
104  Private::ExecutionPtr execution = mExecutor->exec(mExecutor, Private::ExecutionContext::Ptr::create());
105  KAsync::Future<Out> result = *execution->result<Out>();
106 
107  return result;
108 }
109 
110 template<typename Out, typename ... In>
111 Job<Out, In ...>::Job(Private::ExecutorBasePtr executor)
112  : JobBase(executor)
113 {}
114 
115 template<typename Out, typename ... In>
116 Job<Out, In ...>::Job(JobContinuation<Out, In ...> &&func)
117  : JobBase(new Private::Executor<Out, In ...>(std::forward<JobContinuation<Out, In ...>>(func), {}))
118 {
119  qWarning() << "Creating job job";
120  static_assert(sizeof...(In) <= 1, "Only one or zero input parameters are allowed.");
121 }
122 
123 template<typename Out, typename ... In>
124 template<typename OutOther>
125 void Job<Out, In ...>::eachInvariants() const
126 {
127  static_assert(traits::isContainer<Out>::value,
128  "The 'Each' task can only be connected to a job that returns a list or an array.");
129  static_assert(std::is_void<OutOther>::value || traits::isContainer<OutOther>::value,
130  "The result type of 'Each' task must be void, a list or an array.");
131 }
132 
133 template<typename Out, typename ... In>
134 template<typename InOtherFirst, typename ... InOtherTail>
135 void Job<Out, In ...>::thenInvariants() const
136 {
137  static_assert(!std::is_void<Out>::value && (std::is_convertible<Out, InOtherFirst>::value || std::is_base_of<Out, InOtherFirst>::value),
138  "The return type of previous task must be compatible with input type of this task");
139 }
140 
141 template<typename Out, typename ... In>
142 template<typename ... InOther>
143 auto Job<Out, In ...>::thenInvariants() const -> std::enable_if_t<(sizeof...(InOther) == 0)>
144 {
145 }
146 
147 template<template<typename> class Container>
148 KAsync::Job<void> waitForCompletion(Container<KAsync::Future<void>> &futures)
149 {
150  struct Context {
151  void removeWatcher(KAsync::FutureWatcher<void> *w)
152  {
153  pending.erase(std::remove_if(pending.begin(), pending.end(), [w](const auto &watcher) {
154  return w == watcher.get();
155  }));
156  }
157 
158  std::vector<std::unique_ptr<KAsync::FutureWatcher<void>>> pending;
159  };
160 
161  return start<Context *>([]() {
162  return new Context();
163  })
164  .template then<Context*, Context*>([futures](Context *context, KAsync::Future<Context *> &future) {
165  for (KAsync::Future<void> subFuture : futures) {
166  if (subFuture.isFinished()) {
167  continue;
168  }
169  // FIXME bind lifetime all watcher to future (repectively the main job
170  auto watcher = std::make_unique<KAsync::FutureWatcher<void>>();
172  [&future, watcher = watcher.get(), context]() {
173  context->removeWatcher(watcher);
174  if (context->pending.empty()) {
175  future.setResult(context);
176  }
177  });
178  watcher->setFuture(subFuture);
179  context->pending.push_back(std::move(watcher));
180  }
181  if (context->pending.empty()) {
182  future.setResult(context);
183  }
184  })
185  .template then<void, Context*>([](Context *context) {
186  delete context;
187  });
188  // .finally<void>([context]() { delete context; });
189 }
190 
191 template<typename List, typename ValueType>
192 Job<void, List> forEach(KAsync::Job<void, ValueType> job)
193 {
194  auto cont = [job] (const List &values) mutable {
197  for (const auto &v : values) {
198  auto future = job
199  .template then<void>([error] (const KAsync::Error &e) {
200  if (e && !*error) {
201  //TODO ideally we would aggregate the errors instead of just using the first one
202  *error = e;
203  }
204  })
205  .exec(v);
206  list.push_back(future);
207  }
208  return waitForCompletion(list)
209  .then<void>([error](KAsync::Future<void> &future) {
210  if (*error) {
211  future.setError(*error);
212  } else {
213  future.setFinished();
214  }
215  });
216  };
217  return Job<void, List>(QSharedPointer<Private::Executor<void, List>>::create(
218  Private::ContinuationHolder<void, List>(JobContinuation<void, List>(std::move(cont))), nullptr, Private::ExecutionFlag::GoodCase));
219 }
220 
221 
222 template<typename List, typename ValueType>
223 Job<void, List> serialForEach(KAsync::Job<void, ValueType> job)
224 {
225  auto cont = [job] (const List &values) mutable {
227  auto serialJob = KAsync::null<void>();
228  for (const auto &value : values) {
229  serialJob = serialJob.then<void>([value, job, error](KAsync::Future<void> &future) {
230  job.template then<void>([&future, error] (const KAsync::Error &e) {
231  if (e && !*error) {
232  //TODO ideally we would aggregate the errors instead of just using the first one
233  *error = e;
234  }
235  future.setFinished();
236  })
237  .exec(value);
238  });
239  }
240  return serialJob
241  .then<void>([error](KAsync::Future<void> &future) {
242  if (*error) {
243  future.setError(*error);
244  } else {
245  future.setFinished();
246  }
247  });
248  };
249  return Job<void, List>(QSharedPointer<Private::Executor<void, List>>::create(
250  Private::ContinuationHolder<void, List>(JobContinuation<void, List>(std::move(cont))), nullptr, Private::ExecutionFlag::GoodCase));
251 }
252 
253 template<typename List, typename ValueType>
254 Job<void, List> forEach(JobContinuation<void, ValueType> &&func)
255 {
256  return forEach<List, ValueType>(KAsync::start<void, ValueType>(std::forward<JobContinuation<void, ValueType>>(func)));
257 }
258 
259 template<typename List, typename ValueType>
260 Job<void, List> serialForEach(JobContinuation<void, ValueType> &&func)
261 {
262  return serialForEach<List, ValueType>(KAsync::start<void, ValueType>(std::forward<JobContinuation<void, ValueType>>(func)));
263 }
264 
265 template<typename Out>
266 Job<Out> null()
267 {
268  return KAsync::start<Out>(
270  future.setFinished();
271  });
272 }
273 
274 template<typename Out>
275 Job<Out> value(Out v)
276 {
277  return KAsync::start<Out>(
278  [val = std::move(v)](KAsync::Future<Out> &future) {
279  future.setResult(val);
280  });
281 }
282 
283 template<typename Out>
284 Job<Out> error(int errorCode, const QString &errorMessage)
285 {
286  return error<Out>({errorCode, errorMessage});
287 }
288 
289 template<typename Out>
290 Job<Out> error(const char *message)
291 {
292  return error<Out>(Error(message));
293 }
294 
295 template<typename Out>
296 Job<Out> error(const Error &error)
297 {
298  return KAsync::start<Out>(
299  [error](KAsync::Future<Out> &future) {
300  future.setError(error);
301  });
302 }
303 
304 inline Job<void> doWhile(const Job<ControlFlowFlag> &body)
305 {
306  return KAsync::start<void>([body] (KAsync::Future<void> &future) {
307  auto job = body.then<void, ControlFlowFlag>([&future, body](const KAsync::Error &error, ControlFlowFlag flag) {
308  if (error) {
309  future.setError(error);
310  future.setFinished();
311  } else if (flag == ControlFlowFlag::Continue) {
312  doWhile(body).then<void>([&future](const KAsync::Error &error) {
313  if (error) {
314  future.setError(error);
315  }
316  future.setFinished();
317  }).exec();
318  } else {
319  future.setFinished();
320  }
321  }).exec();
322  });
323 }
324 
325 inline Job<void> doWhile(const JobContinuation<ControlFlowFlag> &body)
326 {
327  return doWhile(KAsync::start<ControlFlowFlag>([body] {
328  return body();
329  }));
330 }
331 
332 inline Job<void> wait(int delay)
333 {
334  return KAsync::start<void>([delay](KAsync::Future<void> &future) {
335  QTimer::singleShot(delay, [&future]() {
336  future.setFinished();
337  });
338  });
339 }
340 } // namespace KAsync
341 
342 //@endcond
343 
344 #endif // KASYNC_JOB_IMPL_H
QSharedPointer< T > create(Args &&...args)
An Asynchronous job.
Definition: async.h:76
void setFuture(const KAsync::Future< T > &future)
Set future to watch.
Definition: future.h:478
STL namespace.
KGuiItem cont()
QVector< V > values(const QMultiHash< K, V > &c)
Job< Out, In... > onError(SyncErrorContinuation< void > &&errorFunc) const
Shorthand for a job that receives the error only.
void error(QWidget *parent, const QString &text, const QString &caption=QString(), Options options=Notify)
const QList< QKeySequence > & forward()
Job< OutOther, In... > then(const Job< OutOther, InOther... > &job) const
A continuation.
KAsync::Future< Out > exec()
Starts execution of the job chain.
Future is a promise that is used by Job to deliver result of an asynchronous execution.
Definition: future.h:196
Definition: async.h:68
void push_back(const T &value)
KAsync::Future< T > future() const
Returns currently watched future.
Definition: future.h:486
A specialization of Future<T> for tasks that have no (void) result.
Definition: future.h:386
QMetaObject::Connection connect(const QObject *sender, const char *signal, const QObject *receiver, const char *method, Qt::ConnectionType type)
KIOFILEWIDGETS_EXPORT QStringList list(const QString &fileClass)
The FutureWatcher allows monitoring of Job results using signals and slots.
Definition: future.h:456
This file is part of the KDE documentation.
Documentation copyright © 1996-2020 The KDE developers.
Generated on Wed Jul 1 2020 23:04:36 by doxygen 1.8.11 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.