document concurrency

master
Daniel Kolesa 2017-03-27 21:49:55 +02:00
parent cca5a12755
commit ca40fb1e0c
2 changed files with 450 additions and 13 deletions

View File

@ -1,6 +1,15 @@
/* Channels provide the messaging system used for OctaSTD concurrency.
/** @addtogroup Concurrency
* @{
*/
/** @file channel.hh
*
* This file is part of OctaSTD. See COPYING.md for futher information.
* @brief Thread-safe queue for cross-task data transfer.
*
* This file implements channels, a kind of thread-safe queue that can be
* used to send and receive values across tasks safely.
*
* @copyright See COPYING.md in the project tree for further information.
*/
#ifndef OSTD_CHANNEL_HH
@ -18,36 +27,134 @@
namespace ostd {
/** @addtogroup Concurrency
* @{
*/
/** Thrown when manipulating a channel that has been closed. */
struct channel_error: std::logic_error {
using std::logic_error::logic_error;
};
/** @brief A thread-safe message queue.
*
* A channel is a kind of message queue (FIFO) that is properly synchronized.
* It can be used standalone or it can be used as a part of OctaSTD's
* concurrency system.
*
* It stores its internal state in a reference counted manner, so multiple
* channel instances can reference the same internal state. The internal
* state is freed as soon as the last channel instance referencing it is
* destroyed.
*
* @tparam T The type of the values in the queue.
*/
template<typename T>
struct channel {
/* default ctor works for default C */
/** @brief Constructs a default channel.
*
* This uses `std::condition_variable` as its internal condition type,
* so it will work with standard threads (raw or when used with C++'s
* async APIs). You can also use channels with ostd's concurrency system
* though - see make_channel<T>() and channel(F).
*/
channel(): p_state(new impl) {}
/* constructing using a function object, keep in mind that condvars are
* not copy or move constructible, so the func has to work in a way that
* elides copying and moving (by directly returning the type ctor call)
/** @brief Constructs a channel with a custom condition variable type.
*
* Channels use #ostd::generic_condvar to store their condvars internally,
* so you can provide a custom type as well. This comes in handy for
* example when doing custom scheduling.
*
* The condvar cannot be passed directly though, as it's not required to
* be copy or move constructible, so it's passed in through a function
* instead - the function is meant to return the condvar.
*
* However, typically you won't be using this directly, as you're meant
* to use the higher level concurrency system, which lready provides the
* make_channel<T>() function.
*
* @param[in] func A function that returns the desired condvar.
*/
template<typename F>
channel(F func): p_state(new impl{func}) {}
/** @brief Creates a new reference to the channel.
*
* This does not copy per se, as channels store a refcounted internal
* state. It increments the reference count on the state and creates
* a new channel instance that references this state.
*
* If you don't want to increment and instead you want to transfer the
* reference to a new instance, use channel(channel &&).
*
* @see channel(channel &&), operator=(channel const &)
*/
channel(channel const &) = default;
/** @brief Moves the internal state reference to a new channel instance.
*
* This is like channel(channel const &) except it does not increment
* the reference; it moves the reference to a new container instead,
* leaving the other one uninitialized. You cannot use the channel
* the reference was moved from anymore afterwards.
*
* @see channel(channel const &), operator=(channel &&)
*/
channel(channel &&) = default;
/** @see channel(channel const &) */
channel &operator=(channel const &) = default;
/** @see channel(channel &&) */
channel &operator=(channel &&) = default;
/** @brief Inserts a copy of a value into the queue.
*
* This will insert a copy of @p val into the queue and notify any
* single task waiting on the queue (if present, see get()) that the
* queue is ready.
*
* @param[in] val The value to insert.
*
* @throws #ostd::channel_error when the channel is closed.
*
* @see put(T &&), get(), try_get(), close(), is_closed()
*/
void put(T const &val) {
p_state->put(val);
}
/** @brief Moves a value into the queue.
*
* Same as put(T const &), except it moves the value.
*
* @param[in] val The value to insert.
*
* @throws #ostd::channel_error when the channel is closed.
*
* @see put(T const &)
*/
void put(T &&val) {
p_state->put(std::move(val));
}
/** @brief Waits for a value and returns it.
*
* If the queue is empty at the time of the call, this will block the
* calling task and wait until there is a value in the queue. Once there
* is a value, it pops it out of the queue and returns it. The value is
* moved out of the queue.
*
* If you don't want to wait and want to just check if there is something
* in the queue already, use try_get(T &).
*
* @returns The first inserted value in the queue.
*
* @throws #ostd::channel_error when the channel is closed.
*
* @see try_get(T &), put(T const &), close(), is_closed()
*/
T get() {
T ret;
/* guaranteed to return true if at all */
@ -55,15 +162,33 @@ struct channel {
return ret;
}
/** @brief Gets a value from the queue if there is one.
*
* If a value is present in the queue, writes it into @p val and returns
* `true`. Otherwise returns `false`. See get() for a waiting version.
*
* @returns `true` if a value was retrieved and `false` otherwise.
*
* @throws #ostd::channel_error when the channel is closed.
*
* @see try_get(T &), put(T const &), close(), is_closed()
*/
bool try_get(T &val) {
return p_state->get(val, false);
}
bool is_closed() const {
/** @brief Checks if the channel is closed.
*
* A channel is only called after explicitly calling close().
*
* @returns `true` if closed, `false` otherwise.
*/
bool is_closed() const noexcept {
return p_state->is_closed();
}
void close() {
/** Closes the channel. No effect if already closed. */
void close() noexcept {
p_state->close();
}
@ -105,12 +230,12 @@ private:
return true;
}
bool is_closed() const {
bool is_closed() const noexcept {
std::lock_guard<std::mutex> l{p_lock};
return p_closed;
}
void close() {
void close() noexcept {
{
std::lock_guard<std::mutex> l{p_lock};
p_closed = true;
@ -128,6 +253,10 @@ private:
std::shared_ptr<impl> p_state;
};
/** @} */
} /* namespace ostd */
#endif
/** @} */

View File

@ -1,6 +1,43 @@
/* Concurrency module with custom scheduler support.
/** @defgroup Concurrency
*
* This file is part of OctaSTD. See COPYING.md for futher information.
* @brief Concurrent task execution support.
*
* OctaSTD provides an elaborate concurrency system that covers multiple
* schedulers with different characteristics as well as different ways to
* pass data between tasks.
*
* It implements 1:1 (tasks are OS threads), N:1 (tasks are lightweight
* threads running on a single thread) and M:N (tasks are lightweight
* threads running on a fixed number of OS threads) scheduling approaches.
*
* The system is flexible and extensible; it can be used as a base for higher
* level systems, and its individual components are independently usable.
*
* Typical usage of the concurrency system is as follows:
*
* ~~~{.cc}
* #include <ostd/concurrency.hh>
*
* int main() {
* ostd::coroutine_scheduler{}.start([]() {
* // the rest of main follows
* });
* }
* ~~~
*
* See the examples provided with the library for further information.
*
* @{
*/
/** @file concurrency.hh
*
* @brief Different concurrent schedulers and related APIs.
*
* This file implements several schedulers as well as APIs to spawn
* tasks, coroutines and channels that utilize the current scheduler's API.
*
* @copyright See COPYING.md in the project tree for further information.
*/
#ifndef OSTD_CONCURRENCY_HH
@ -20,6 +57,23 @@
namespace ostd {
/** @addtogroup Concurrency
* @{
*/
/** @brief A base interface for any scheduler.
*
* All schedulers derive from this. Its core interface is defined using
* pure virtual functions, which the schedulers are supposed to implement.
*
* Every scheduler its supposed to implement its own method `start`, which
* will take a function, arguments and will return any value returned from
* the given function. The given function will be used as the very first
* task (the main task) which typically replaces your `main` function.
*
* The `start` method will also set the internal current scheduler pointer
* so that APIs such as ostd::spawn(F &&, A &&...) can work.
*/
struct scheduler {
private:
struct stack_allocator {
@ -38,26 +92,109 @@ private:
scheduler *p_sched;
};
public:
protected:
/** Does nothing, this base class is empty. */
scheduler() {}
public:
/** A scheduler is not copy constructible. */
scheduler(scheduler const &) = delete;
/** A scheduler is not move constructible. */
scheduler(scheduler &&) = delete;
/** A scheduler is not copy assignable. */
scheduler &operator=(scheduler const &) = delete;
/** A scheduler is not move assignable. */
scheduler &operator=(scheduler &&) = delete;
/** @brief Spawns a task.
*
* Spawns a task and schedules it for execution. This is a low level
* interface function. Typically you will want ostd::spawn(F &&, A &&...).
* The detailed behavior of the function is completely scheduler dependent.
*
* @see ostd::spawn(F &&, A &&...), spawn(F &&, A &&...)
*/
virtual void spawn(std::function<void()>) = 0;
/** @brief Tells the scheduler to re-schedule the current task.
*
* In #ostd::thread_scheduler, this is just a hint, as it uses OS threading
* facilities. In coroutine based schedulers, this will typically suspend
* the currently running task and re-schedule it for later.
*
* @see ostd::yield()
*/
virtual void yield() noexcept = 0;
/** @brief Creates a condition variable using #ostd::generic_condvar.
*
* A scheduler might be using a custom condition variable type depending
* on how its tasks are implemented. Other data structures might want to
* use these condition variables to synchronize (see make_channel() for
* an example).
*
* @see make_channel(), make_coroutine(F &&), make_generator(F &&)
*/
virtual generic_condvar make_condition() = 0;
/** @brief Allocates a stack suitable for a coroutine.
*
* If the scheduler uses coroutine based tasks, this allows us to
* create coroutines and generators that use the same stacks as tasks.
* This has benefits particularly when a pooled stack allocator is in
* use for the tasks.
*
* Using get_stack_allocator() you can create an actual stack allocator
* usable with coroutines that uses this set of methods.
*
* @see deallocate_stack(stack_context &), reserve_stacks(size_t),
* get_stack_allocator()
*/
virtual stack_context allocate_stack() = 0;
/** @brief Deallocates a stack allocated with allocate_stack().
*
* @see allocate_stack(), reserve_stacks(size_t), get_stack_allocator()
*/
virtual void deallocate_stack(stack_context &st) noexcept = 0;
/** @brief Reserves at least @p n stacks.
*
* If the stack allocator used in the scheduler is pooled, this will
* reserve the given number of stacks (or more). It can, however, be
* a no-op if the allocator is not pooled.
*
* @see allocate_stack(), deallocate_stack(stack_context &),
* get_stack_allocator()
*/
virtual void reserve_stacks(size_t n) = 0;
/** @brief Gets a stack allocator using the scheduler's stack allocation.
*
* The stack allocator will use allocate_stack() and
* deallocate_stack(stack_context &) to perform the alloaction and
* deallocation.
*/
stack_allocator get_stack_allocator() noexcept {
return stack_allocator{*this};
}
/** @brief Spawns a task using any callable and a set of arguments.
*
* Just like spawn(std::function<void()>), but works for any callable
* and accepts arguments. If any arguments are provided, they're bound
* to the callable first. Then the result is converted to the right
* type for spawn(std::function<void()>) and passed.
*
* You can use this to spawn a task directly on a scheduler. However,
* typically you will not want to pass the scheduler around and instead
* use the generic ostd::spawn(), which works on any scheduler.
*
* @see spawn(std::function<void()>), ostd::spawn(F &&, A &&...)
*/
template<typename F, typename ...A>
void spawn(F &&func, A &&...args) {
if constexpr(sizeof...(A) == 0) {
@ -69,6 +206,17 @@ public:
}
}
/** @brief Creates a channel suitable for the scheduler.
*
* Returns a channel that uses a condition variable type returned by
* make_condition(). You can use this to create a channel directly
* with the scheduler. However, typically you will not want to pass
* it around, so ostd::make_channel<T>() is a more convenient choice.
*
* @tparam T The type of the channel value.
*
* @see ostd::make_channel<T>()
*/
template<typename T>
channel<T> make_channel() {
return channel<T>{[this]() {
@ -76,11 +224,29 @@ public:
}};
}
/** @brief Creates a coroutine using the scheduler's stack allocator.
*
* Using ostd::make_coroutine<T>(F &&) will do the same thing, but
* without the need to explicitly pass the scheduler around.
*
* @tparam T The type passed to the coroutine, `Result(Args...)`.
*
* @see make_generator<T>(F &&), ostd::make_coroutine<T>(F &&)
*/
template<typename T, typename F>
coroutine<T> make_coroutine(F &&func) {
return coroutine<T>{std::forward<F>(func), get_stack_allocator()};
}
/** @brief Creates a generator using the scheduler's stack allocator.
*
* Using ostd::make_generator<T>(F &&) will do the same thing, but
* without the need to explicitly pass the scheduler around.
*
* @tparam T The value type of the generator.
*
* @see make_coroutine<T>(F &&), ostd::make_generator<T>(F &&)
*/
template<typename T, typename F>
generator<T> make_generator(F &&func) {
return generator<T>{std::forward<F>(func), get_stack_allocator()};
@ -112,10 +278,32 @@ namespace detail {
};
}
/** @brief A scheduler that uses an `std::thread` per each task.
*
* This one doesn't actually do any scheduling, it leaves it to the OS.
* Effectively this implements a 1:1 model.
*
* @tparam SA The stack allocator to use when requesting stacks. It's not
* actually used anywhere else, as thread stacks are managed by the OS.
* Can be a stack pool and only has to be move constructible.
*/
template<typename SA>
struct basic_thread_scheduler: scheduler {
/* @brief Creates the scheduler.
*
* @param[in] sa The provided stack allocator.
*/
basic_thread_scheduler(SA &&sa = SA{}): p_stacks(std::move(sa)) {}
/** @brief Starts the scheduler given a set of arguments.
*
* Sets the internal current scheduler pointer to this scheduler and
* calls the given function. As it doesn't do any scheduling, it really
* just calls. Then it waits for all threads (tasks) it spawned to finish
* and returns the value returned from the given function, if any.
*
* @returns The result of @p func.
*/
template<typename F, typename ...A>
auto start(F func, A &&...args) -> std::result_of_t<F(A...)> {
detail::current_scheduler_owner iface{*this};
@ -202,6 +390,7 @@ private:
std::mutex p_lock;
};
/** An #ostd::basic_thread_scheduler using #ostd::stack_pool. */
using thread_scheduler = basic_thread_scheduler<stack_pool>;
namespace detail {
@ -260,6 +449,16 @@ namespace detail {
};
}
/** @brief A scheduler that uses a coroutine type for tasks on a single thread.
*
* Effectively implements the N:1 model. Runs on a single thread, so it doesn't
* make any use of multicore systems. The tasks bypass the
* coroutine_context::current() method, so they're completely hidden from the
* outside code. This also has several advantages for code using coroutines.
*
* @tparam SA The stack allocator to use when requesting stacks. Used for
* the tasks as well as for the stack request methods.
*/
template<typename SA>
struct basic_simple_coroutine_scheduler: scheduler {
private:
@ -298,10 +497,28 @@ private:
};
public:
/* @brief Creates the scheduler.
*
* @param[in] sa The provided stack allocator.
*/
basic_simple_coroutine_scheduler(SA &&sa = SA{}):
p_stacks(std::move(sa))
{}
/** @brief Starts the scheduler given a set of arguments.
*
* Sets the internal current scheduler pointer to this scheduler creates
* the main task using the separate provided stack allocator. This is
* useful because the task stacks tend to be rather small and we need
* a much bigger stack for the main task.
*
* After creating the task, starts the dispatcher on the thread. Returns
* the return value of the provided main task function once it finishes.
*
* @returns The result of @p func.
*
* @see start(F, A &&...)
*/
template<typename TSA, typename F, typename ...A>
auto start(std::allocator_arg_t, TSA &&sa, F func, A &&...args)
-> std::result_of_t<F(A...)>
@ -337,6 +554,17 @@ public:
}
}
/** @brief Starts the scheduler given a set of arguments.
*
* Like start(std::allocator_arg_t, TSA &&, F, A &&...) but uses a
* fixed size stack that has the same size as the main thread stack.
*
* The stack traits type is inherited from @p SA.
*
* @returns The result of @p func.
*
* @see start(std::allocator_arg_t, TSA &&, F, A &&...)
*/
template<typename F, typename ...A>
auto start(F func, A &&...args) -> std::result_of_t<F(A...)> {
basic_fixedsize_stack<typename SA::traits_type, false> sa{
@ -393,8 +621,20 @@ private:
typename std::list<detail::csched_task>::iterator p_idx = p_coros.end();
};
/** An #ostd::basic_simple_coroutine_scheduler using #ostd::stack_pool. */
using simple_coroutine_scheduler = basic_simple_coroutine_scheduler<stack_pool>;
/** @brief A scheduler that uses a coroutine type for tasks on several threads.
*
* Effectively implements the M:N model. Runs on several threads, typically as
* many as there are physical threads on your CPU(s), so it makes use of
* multicore systems. The tasks bypass the coroutine_context::current() method,
* so they're completely hidden from the outside code. This also has several
* advantages for code using coroutines.
*
* @tparam SA The stack allocator to use when requesting stacks. Used for
* the tasks as well as for the stack request methods.
*/
template<typename SA>
struct basic_coroutine_scheduler: scheduler {
private:
@ -474,6 +714,13 @@ private:
};
public:
/* @brief Creates the scheduler.
*
* The number of threads defaults to the number of physical threads.
*
* @param[in] thrs The number of threads to use.
* @param[in] sa The provided stack allocator.
*/
basic_coroutine_scheduler(
size_t thrs = std::thread::hardware_concurrency(), SA &&sa = SA{}
):
@ -482,6 +729,21 @@ public:
~basic_coroutine_scheduler() {}
/** @brief Starts the scheduler given a set of arguments.
*
* Sets the internal current scheduler pointer to this scheduler creates
* the main task using the separate provided stack allocator. This is
* useful because the task stacks tend to be rather small and we need
* a much bigger stack for the main task.
*
* After creating the task, creates the requested number of threads and
* starts the dispatcher on each. Then it waits for all threads to finish
* and returns the return value of the provided main task function.
*
* @returns The result of @p func.
*
* @see start(F, A &&...)
*/
template<typename TSA, typename F, typename ...A>
auto start(std::allocator_arg_t, TSA &&sa, F func, A &&...args)
-> std::result_of_t<F(A...)>
@ -514,6 +776,17 @@ public:
}
}
/** @brief Starts the scheduler given a set of arguments.
*
* Like start(std::allocator_arg_t, TSA &&, F, A &&...) but uses a
* fixed size stack that has the same size as the main thread stack.
*
* The stack traits type is inherited from @p SA.
*
* @returns The result of @p func.
*
* @see start(std::allocator_arg_t, TSA &&, F, A &&...)
*/
template<typename F, typename ...A>
auto start(F func, A &&...args) -> std::result_of_t<F(A...)> {
/* the default 64 KiB stack won't cut it for main, allocate a stack
@ -693,8 +966,14 @@ private:
tlist p_running;
};
/** An #ostd::basic_coroutine_scheduler using #ostd::stack_pool. */
using coroutine_scheduler = basic_coroutine_scheduler<stack_pool>;
/** @brief Spawns a task on the currently in use scheduler.
*
* The arguments are passed to the function. Effectively just calls
* scheduler::spawn(F &&, A &&...).
*/
template<typename F, typename ...A>
inline void spawn(F &&func, A &&...args) {
detail::current_scheduler->spawn(
@ -702,20 +981,45 @@ inline void spawn(F &&func, A &&...args) {
);
}
/** @brief Tells the current scheduler to re-schedule the current task.
*
* Effectively calls scheduler::yield().
*/
inline void yield() noexcept {
detail::current_scheduler->yield();
}
/** @brief Creates a channel with the currently in use scheduler.
*
* Effectively calls scheduler::make_channel<T>().
*
* @tparam T The type of the channel value.
*
*/
template<typename T>
inline channel<T> make_channel() {
return detail::current_scheduler->make_channel<T>();
}
/** @brief Creates a coroutine with the currently in use scheduler.
*
* Effectively calls scheduler::make_coroutine<T>(F &&).
*
* @tparam T The type passed to the coroutine, `Result(Args...)`.
*
*/
template<typename T, typename F>
inline coroutine<T> make_coroutine(F &&func) {
return detail::current_scheduler->make_coroutine<T>(std::forward<F>(func));
}
/** @brief Creates a generator with the currently in use scheduler.
*
* Effectively calls scheduler::make_generator<T>(F &&).
*
* @tparam T The value type of the generator.
*
*/
template<typename T, typename F>
inline generator<T> make_generator(F &&func) {
return detail::current_scheduler->make_generator<T>(std::forward<F>(func));
@ -725,6 +1029,10 @@ inline void reserve_stacks(size_t n) {
detail::current_scheduler->reserve_stacks(n);
}
/** @} */
} /* namespace ostd */
#endif
/** @} */