future-like objects representing tasks in concurrency

master
Daniel Kolesa 2017-05-02 21:42:19 +02:00
parent 5a09b58613
commit 2c62b82ec4
2 changed files with 207 additions and 16 deletions

View File

@ -8,6 +8,12 @@ int main() {
* task, which may or may not run in parallel with the other one depending
* on the scheduler currently in use - several schedulers are shown
*/
/* this version uses Go-style channels to exchange data; multiple
* tasks can put data into channels, the channel itself is a thread
* safe queue; it goes the other way around too, multiple tasks can
* wait on a channel for some data to be received
*/
auto foo = []() {
auto arr = { 150, 38, 76, 25, 67, 18, -15, 215, 25, -10 };
@ -20,16 +26,46 @@ int main() {
int a = c.get();
int b = c.get();
writefln(" %s + %s = %s", a, b, a + b);
};
/* this version uses future-style tid objects that represent the tasks
* themselves including the return value of the task; the return value
* can be retrieved using .get(), which will wait for the value to be
* stored unless it already is, becoming invalid after retrieval of
* the value; the tid also intercepts any possible exceptions thrown
* by the task and propagates them in .get(), allowing the user to
* perform safe exception handling across tasks
*/
auto bar = []() {
auto arr = { 150, 38, 76, 25, 67, 18, -15, 215, 25, -10 };
auto f = [](auto half) {
return foldl(half, 0);
};
auto t1 = spawn(f, iter(arr).slice(0, arr.size() / 2));
auto t2 = spawn(f, iter(arr).slice(arr.size() / 2));
int a = t1.get();
int b = t2.get();
writefln("%s + %s = %s", a, b, a + b);
};
/* tries both examples above */
auto baz = [&foo, &bar]() {
writeln(" testing channels...");
foo();
writeln(" testing futures...");
foo();
};
/* using thread_scheduler results in an OS thread spawned per task,
* implementing a 1:1 (kernel-level) scheduling - very expensive on
* Windows, less expensive on Unix-likes (but more than coroutines)
*/
thread_scheduler{}.start([&foo]() {
thread_scheduler{}.start([&baz]() {
writeln("(1) 1:1 scheduler: starting...");
foo();
baz();
writeln("(1) 1:1 scheduler: finishing...");
});
writeln();
@ -38,9 +74,9 @@ int main() {
* per task, implementing N:1 (user-level) scheduling - very cheap
* and portable everywhere but obviously limited to only one thread
*/
simple_coroutine_scheduler{}.start([&foo]() {
simple_coroutine_scheduler{}.start([&baz]() {
writeln("(2) N:1 scheduler: starting...");
foo();
baz();
writeln("(2) N:1 scheduler: finishing...");
});
writeln();
@ -50,23 +86,32 @@ int main() {
* a hybrid M:N approach - this benefits from multicore systems and
* also is relatively cheap (you can create a big number of tasks)
*/
coroutine_scheduler{}.start([&foo]() {
coroutine_scheduler{}.start([&baz]() {
writeln("(3) M:N scheduler: starting...");
foo();
baz();
writeln("(3) M:N scheduler: finishing...");
});
}
/*
(1) 1:1 scheduler: starting...
356 + 233 = 589
testing channels...
356 + 233 = 589
testing futures...
356 + 233 = 589
(1) 1:1 scheduler: finishing...
(2) N:1 scheduler: starting...
356 + 233 = 589
testing channels...
356 + 233 = 589
testing futures...
356 + 233 = 589
(2) N:1 scheduler: finishing...
(3) M:N scheduler: starting...
356 + 233 = 589
testing channels...
356 + 233 = 589
testing futures...
356 + 233 = 589
(3) M:N scheduler: finishing...
*/

View File

@ -53,6 +53,8 @@
#include <utility>
#include <memory>
#include <stdexcept>
#include <exception>
#include <type_traits>
#include "ostd/platform.hh"
#include "ostd/coroutine.hh"
@ -65,6 +67,140 @@ namespace ostd {
* @{
*/
struct scheduler;
namespace detail {
template<typename T>
struct tid_impl {
tid_impl() = delete;
template<typename F>
tid_impl(F &func): p_lock(), p_eptr(), p_cond(func()) {}
T get() {
std::unique_lock<std::mutex> l{p_lock};
while (!p_stor) {
p_cond.wait(l);
}
if (p_eptr) {
std::rethrow_exception(std::exchange(p_eptr, nullptr));
}
auto ret = std::move(p_stor);
if constexpr(!std::is_same_v<T, void>) {
if constexpr(std::is_lvalue_reference_v<T>) {
return **ret;
} else {
return std::move(*ret);
}
}
}
void wait() const {
std::unique_lock<std::mutex> l{p_lock};
while (!p_stor) {
p_cond.wait(l);
}
}
template<typename F>
void set_value(F &func) {
{
std::lock_guard<std::mutex> l{p_lock};
try {
if constexpr(std::is_same_v<T, void>) {
func();
p_stor = true;
} else {
if constexpr(std::is_lvalue_reference_v<T>) {
p_stor = &func();
} else {
p_stor = std::move(func());
}
}
} catch (...) {
p_eptr = std::current_exception();
}
}
p_cond.notify_one();
}
private:
using storage = std::conditional_t<
std::is_same_v<T, void>,
bool,
std::optional<std::conditional_t<
std::is_lvalue_reference_v<T>,
std::remove_reference_t<T> *,
std::decay_t<T>
>>
>;
mutable std::mutex p_lock;
mutable std::exception_ptr p_eptr;
generic_condvar p_cond;
storage p_stor = storage{};
};
}
/** @brief An object that defines a task.
*
* This is returned by ostd::spawn() and obviously scheduler::spawn() It
* represents a spawned task, allowing you to wait for the task to be done
* as well as retrieve its return value. It also allows sane cross-task
* exception handling, as any exception is saved inside the internal
* shared state and propagated on get().
*
* The internal shared state is reference counted, but it's only safe to
* call get() once from one instance. It cannot be constructed normally,
* only as a return value or a copy.
*
* The `T` template parameter is the type of the result. It can be `void`,
* in which case get() returns nothing, but can still propagate exceptions.
*/
template<typename T>
struct tid {
friend struct scheduler;
tid() = delete;
tid(tid const &) = default;
tid(tid &&) = default;
tid &operator=(tid const &) = default;
tid &operator=(tid &&) = default;
/** @brief Waits for the result and returns it.
*
* If `T` is void, this does not return. If an exception was thrown by
* the task, it will be rethrown here, allowing sane exception handling
* between different tasks and possibly threads.
*
* Effectively calls wait() before returning or throwing.
*/
T get() {
auto p = std::move(p_state);
return p->get();
}
/** @brief Checks if this `tid` points to a valid shared state. */
bool valid() const {
return p_state;
}
/** @brief Waits for the associated task to finish.
*
* The behavior is undefined if valid() is true.
*/
void wait() const {
p_state->wait();
}
private:
template<typename F>
tid(F func): p_state(new detail::tid_impl<T>{func}) {}
std::shared_ptr<detail::tid_impl<T>> p_state;
};
/** @brief A base interface for any scheduler.
*
* All schedulers derive from this. Its core interface is defined using
@ -192,14 +328,24 @@ public:
* @see do_spawn(), ostd::spawn()
*/
template<typename F, typename ...A>
void spawn(F &&func, A &&...args) {
tid<std::result_of_t<F(A...)>> spawn(F func, A &&...args) {
tid<std::result_of_t<F(A...)>> t{[this]() {
return make_condition();
}};
/* private shared state reference */
auto st = t.p_state;
if constexpr(sizeof...(A) == 0) {
do_spawn(func);
do_spawn([lfunc = std::move(func), lst = std::move(st)]() {
lst->set_value(lfunc);
});
} else {
do_spawn(
std::bind(std::forward<F>(func), std::forward<A>(args)...)
);
do_spawn([lfunc = std::bind(
std::move(func), std::forward<A>(args)...
), lst = std::move(st)]() {
lst->set_value(lfunc);
});
}
return t;
}
/** @brief Creates a channel suitable for the scheduler.
@ -972,8 +1118,8 @@ using coroutine_scheduler = basic_coroutine_scheduler<stack_pool>;
* scheduler::spawn().
*/
template<typename F, typename ...A>
inline void spawn(F &&func, A &&...args) {
detail::current_scheduler->spawn(
inline tid<std::result_of_t<F(A...)>> spawn(F &&func, A &&...args) {
return detail::current_scheduler->spawn(
std::forward<F>(func), std::forward<A>(args)...
);
}