From 2c62b82ec495bac6b8ebca7e0c2b780814671575 Mon Sep 17 00:00:00 2001 From: q66 Date: Tue, 2 May 2017 21:42:19 +0200 Subject: [PATCH] future-like objects representing tasks in concurrency --- examples/concurrency.cc | 63 +++++++++++++--- ostd/concurrency.hh | 160 ++++++++++++++++++++++++++++++++++++++-- 2 files changed, 207 insertions(+), 16 deletions(-) diff --git a/examples/concurrency.cc b/examples/concurrency.cc index 769652b..e7b6f78 100644 --- a/examples/concurrency.cc +++ b/examples/concurrency.cc @@ -8,6 +8,12 @@ int main() { * task, which may or may not run in parallel with the other one depending * on the scheduler currently in use - several schedulers are shown */ + + /* this version uses Go-style channels to exchange data; multiple + * tasks can put data into channels, the channel itself is a thread + * safe queue; it goes the other way around too, multiple tasks can + * wait on a channel for some data to be received + */ auto foo = []() { auto arr = { 150, 38, 76, 25, 67, 18, -15, 215, 25, -10 }; @@ -20,16 +26,46 @@ int main() { int a = c.get(); int b = c.get(); + writefln(" %s + %s = %s", a, b, a + b); + }; + + /* this version uses future-style tid objects that represent the tasks + * themselves including the return value of the task; the return value + * can be retrieved using .get(), which will wait for the value to be + * stored unless it already is, becoming invalid after retrieval of + * the value; the tid also intercepts any possible exceptions thrown + * by the task and propagates them in .get(), allowing the user to + * perform safe exception handling across tasks + */ + auto bar = []() { + auto arr = { 150, 38, 76, 25, 67, 18, -15, 215, 25, -10 }; + + auto f = [](auto half) { + return foldl(half, 0); + }; + auto t1 = spawn(f, iter(arr).slice(0, arr.size() / 2)); + auto t2 = spawn(f, iter(arr).slice(arr.size() / 2)); + + int a = t1.get(); + int b = t2.get(); writefln("%s + %s = %s", a, b, a + b); }; + /* tries both examples above */ + auto baz = [&foo, &bar]() { + writeln(" testing channels..."); + foo(); + writeln(" testing futures..."); + foo(); + }; + /* using thread_scheduler results in an OS thread spawned per task, * implementing a 1:1 (kernel-level) scheduling - very expensive on * Windows, less expensive on Unix-likes (but more than coroutines) */ - thread_scheduler{}.start([&foo]() { + thread_scheduler{}.start([&baz]() { writeln("(1) 1:1 scheduler: starting..."); - foo(); + baz(); writeln("(1) 1:1 scheduler: finishing..."); }); writeln(); @@ -38,9 +74,9 @@ int main() { * per task, implementing N:1 (user-level) scheduling - very cheap * and portable everywhere but obviously limited to only one thread */ - simple_coroutine_scheduler{}.start([&foo]() { + simple_coroutine_scheduler{}.start([&baz]() { writeln("(2) N:1 scheduler: starting..."); - foo(); + baz(); writeln("(2) N:1 scheduler: finishing..."); }); writeln(); @@ -50,23 +86,32 @@ int main() { * a hybrid M:N approach - this benefits from multicore systems and * also is relatively cheap (you can create a big number of tasks) */ - coroutine_scheduler{}.start([&foo]() { + coroutine_scheduler{}.start([&baz]() { writeln("(3) M:N scheduler: starting..."); - foo(); + baz(); writeln("(3) M:N scheduler: finishing..."); }); } /* (1) 1:1 scheduler: starting... -356 + 233 = 589 + testing channels... + 356 + 233 = 589 + testing futures... + 356 + 233 = 589 (1) 1:1 scheduler: finishing... (2) N:1 scheduler: starting... -356 + 233 = 589 + testing channels... + 356 + 233 = 589 + testing futures... + 356 + 233 = 589 (2) N:1 scheduler: finishing... (3) M:N scheduler: starting... -356 + 233 = 589 + testing channels... + 356 + 233 = 589 + testing futures... + 356 + 233 = 589 (3) M:N scheduler: finishing... */ diff --git a/ostd/concurrency.hh b/ostd/concurrency.hh index c247028..754fac7 100644 --- a/ostd/concurrency.hh +++ b/ostd/concurrency.hh @@ -53,6 +53,8 @@ #include #include #include +#include +#include #include "ostd/platform.hh" #include "ostd/coroutine.hh" @@ -65,6 +67,140 @@ namespace ostd { * @{ */ +struct scheduler; + +namespace detail { + template + struct tid_impl { + tid_impl() = delete; + + template + tid_impl(F &func): p_lock(), p_eptr(), p_cond(func()) {} + + T get() { + std::unique_lock l{p_lock}; + while (!p_stor) { + p_cond.wait(l); + } + if (p_eptr) { + std::rethrow_exception(std::exchange(p_eptr, nullptr)); + } + auto ret = std::move(p_stor); + if constexpr(!std::is_same_v) { + if constexpr(std::is_lvalue_reference_v) { + return **ret; + } else { + return std::move(*ret); + } + } + } + + void wait() const { + std::unique_lock l{p_lock}; + while (!p_stor) { + p_cond.wait(l); + } + } + + template + void set_value(F &func) { + { + std::lock_guard l{p_lock}; + try { + if constexpr(std::is_same_v) { + func(); + p_stor = true; + } else { + if constexpr(std::is_lvalue_reference_v) { + p_stor = &func(); + } else { + p_stor = std::move(func()); + } + } + } catch (...) { + p_eptr = std::current_exception(); + } + } + p_cond.notify_one(); + } + + private: + using storage = std::conditional_t< + std::is_same_v, + bool, + std::optional, + std::remove_reference_t *, + std::decay_t + >> + >; + + mutable std::mutex p_lock; + mutable std::exception_ptr p_eptr; + generic_condvar p_cond; + storage p_stor = storage{}; + }; +} + +/** @brief An object that defines a task. + * + * This is returned by ostd::spawn() and obviously scheduler::spawn() It + * represents a spawned task, allowing you to wait for the task to be done + * as well as retrieve its return value. It also allows sane cross-task + * exception handling, as any exception is saved inside the internal + * shared state and propagated on get(). + * + * The internal shared state is reference counted, but it's only safe to + * call get() once from one instance. It cannot be constructed normally, + * only as a return value or a copy. + * + * The `T` template parameter is the type of the result. It can be `void`, + * in which case get() returns nothing, but can still propagate exceptions. + */ +template +struct tid { + friend struct scheduler; + + tid() = delete; + + tid(tid const &) = default; + tid(tid &&) = default; + tid &operator=(tid const &) = default; + tid &operator=(tid &&) = default; + + /** @brief Waits for the result and returns it. + * + * If `T` is void, this does not return. If an exception was thrown by + * the task, it will be rethrown here, allowing sane exception handling + * between different tasks and possibly threads. + * + * Effectively calls wait() before returning or throwing. + */ + T get() { + auto p = std::move(p_state); + return p->get(); + } + + /** @brief Checks if this `tid` points to a valid shared state. */ + bool valid() const { + return p_state; + } + + /** @brief Waits for the associated task to finish. + * + * The behavior is undefined if valid() is true. + */ + void wait() const { + p_state->wait(); + } + +private: + template + tid(F func): p_state(new detail::tid_impl{func}) {} + + std::shared_ptr> p_state; +}; + /** @brief A base interface for any scheduler. * * All schedulers derive from this. Its core interface is defined using @@ -192,14 +328,24 @@ public: * @see do_spawn(), ostd::spawn() */ template - void spawn(F &&func, A &&...args) { + tid> spawn(F func, A &&...args) { + tid> t{[this]() { + return make_condition(); + }}; + /* private shared state reference */ + auto st = t.p_state; if constexpr(sizeof...(A) == 0) { - do_spawn(func); + do_spawn([lfunc = std::move(func), lst = std::move(st)]() { + lst->set_value(lfunc); + }); } else { - do_spawn( - std::bind(std::forward(func), std::forward(args)...) - ); + do_spawn([lfunc = std::bind( + std::move(func), std::forward(args)... + ), lst = std::move(st)]() { + lst->set_value(lfunc); + }); } + return t; } /** @brief Creates a channel suitable for the scheduler. @@ -972,8 +1118,8 @@ using coroutine_scheduler = basic_coroutine_scheduler; * scheduler::spawn(). */ template -inline void spawn(F &&func, A &&...args) { - detail::current_scheduler->spawn( +inline tid> spawn(F &&func, A &&...args) { + return detail::current_scheduler->spawn( std::forward(func), std::forward(args)... ); }