diff --git a/ostd/channel.hh b/ostd/channel.hh index e196b88..2b888a8 100644 --- a/ostd/channel.hh +++ b/ostd/channel.hh @@ -1,6 +1,15 @@ -/* Channels provide the messaging system used for OctaSTD concurrency. +/** @addtogroup Concurrency + * @{ + */ + +/** @file channel.hh * - * This file is part of OctaSTD. See COPYING.md for futher information. + * @brief Thread-safe queue for cross-task data transfer. + * + * This file implements channels, a kind of thread-safe queue that can be + * used to send and receive values across tasks safely. + * + * @copyright See COPYING.md in the project tree for further information. */ #ifndef OSTD_CHANNEL_HH @@ -18,36 +27,134 @@ namespace ostd { +/** @addtogroup Concurrency + * @{ + */ + +/** Thrown when manipulating a channel that has been closed. */ struct channel_error: std::logic_error { using std::logic_error::logic_error; }; +/** @brief A thread-safe message queue. + * + * A channel is a kind of message queue (FIFO) that is properly synchronized. + * It can be used standalone or it can be used as a part of OctaSTD's + * concurrency system. + * + * It stores its internal state in a reference counted manner, so multiple + * channel instances can reference the same internal state. The internal + * state is freed as soon as the last channel instance referencing it is + * destroyed. + * + * @tparam T The type of the values in the queue. + */ template struct channel { - /* default ctor works for default C */ + /** @brief Constructs a default channel. + * + * This uses `std::condition_variable` as its internal condition type, + * so it will work with standard threads (raw or when used with C++'s + * async APIs). You can also use channels with ostd's concurrency system + * though - see make_channel() and channel(F). + */ channel(): p_state(new impl) {} - /* constructing using a function object, keep in mind that condvars are - * not copy or move constructible, so the func has to work in a way that - * elides copying and moving (by directly returning the type ctor call) + /** @brief Constructs a channel with a custom condition variable type. + * + * Channels use #ostd::generic_condvar to store their condvars internally, + * so you can provide a custom type as well. This comes in handy for + * example when doing custom scheduling. + * + * The condvar cannot be passed directly though, as it's not required to + * be copy or move constructible, so it's passed in through a function + * instead - the function is meant to return the condvar. + * + * However, typically you won't be using this directly, as you're meant + * to use the higher level concurrency system, which lready provides the + * make_channel() function. + * + * @param[in] func A function that returns the desired condvar. */ template channel(F func): p_state(new impl{func}) {} + /** @brief Creates a new reference to the channel. + * + * This does not copy per se, as channels store a refcounted internal + * state. It increments the reference count on the state and creates + * a new channel instance that references this state. + * + * If you don't want to increment and instead you want to transfer the + * reference to a new instance, use channel(channel &&). + * + * @see channel(channel &&), operator=(channel const &) + */ channel(channel const &) = default; + + /** @brief Moves the internal state reference to a new channel instance. + * + * This is like channel(channel const &) except it does not increment + * the reference; it moves the reference to a new container instead, + * leaving the other one uninitialized. You cannot use the channel + * the reference was moved from anymore afterwards. + * + * @see channel(channel const &), operator=(channel &&) + */ channel(channel &&) = default; + /** @see channel(channel const &) */ channel &operator=(channel const &) = default; + + /** @see channel(channel &&) */ channel &operator=(channel &&) = default; + /** @brief Inserts a copy of a value into the queue. + * + * This will insert a copy of @p val into the queue and notify any + * single task waiting on the queue (if present, see get()) that the + * queue is ready. + * + * @param[in] val The value to insert. + * + * @throws #ostd::channel_error when the channel is closed. + * + * @see put(T &&), get(), try_get(), close(), is_closed() + */ void put(T const &val) { p_state->put(val); } + /** @brief Moves a value into the queue. + * + * Same as put(T const &), except it moves the value. + * + * @param[in] val The value to insert. + * + * @throws #ostd::channel_error when the channel is closed. + * + * @see put(T const &) + */ void put(T &&val) { p_state->put(std::move(val)); } + /** @brief Waits for a value and returns it. + * + * If the queue is empty at the time of the call, this will block the + * calling task and wait until there is a value in the queue. Once there + * is a value, it pops it out of the queue and returns it. The value is + * moved out of the queue. + * + * If you don't want to wait and want to just check if there is something + * in the queue already, use try_get(T &). + * + * @returns The first inserted value in the queue. + * + * @throws #ostd::channel_error when the channel is closed. + * + * @see try_get(T &), put(T const &), close(), is_closed() + */ T get() { T ret; /* guaranteed to return true if at all */ @@ -55,15 +162,33 @@ struct channel { return ret; } + /** @brief Gets a value from the queue if there is one. + * + * If a value is present in the queue, writes it into @p val and returns + * `true`. Otherwise returns `false`. See get() for a waiting version. + * + * @returns `true` if a value was retrieved and `false` otherwise. + * + * @throws #ostd::channel_error when the channel is closed. + * + * @see try_get(T &), put(T const &), close(), is_closed() + */ bool try_get(T &val) { return p_state->get(val, false); } - bool is_closed() const { + /** @brief Checks if the channel is closed. + * + * A channel is only called after explicitly calling close(). + * + * @returns `true` if closed, `false` otherwise. + */ + bool is_closed() const noexcept { return p_state->is_closed(); } - void close() { + /** Closes the channel. No effect if already closed. */ + void close() noexcept { p_state->close(); } @@ -105,12 +230,12 @@ private: return true; } - bool is_closed() const { + bool is_closed() const noexcept { std::lock_guard l{p_lock}; return p_closed; } - void close() { + void close() noexcept { { std::lock_guard l{p_lock}; p_closed = true; @@ -128,6 +253,10 @@ private: std::shared_ptr p_state; }; +/** @} */ + } /* namespace ostd */ #endif + +/** @} */ diff --git a/ostd/concurrency.hh b/ostd/concurrency.hh index 5dc14b4..8c5487a 100644 --- a/ostd/concurrency.hh +++ b/ostd/concurrency.hh @@ -1,6 +1,43 @@ -/* Concurrency module with custom scheduler support. +/** @defgroup Concurrency * - * This file is part of OctaSTD. See COPYING.md for futher information. + * @brief Concurrent task execution support. + * + * OctaSTD provides an elaborate concurrency system that covers multiple + * schedulers with different characteristics as well as different ways to + * pass data between tasks. + * + * It implements 1:1 (tasks are OS threads), N:1 (tasks are lightweight + * threads running on a single thread) and M:N (tasks are lightweight + * threads running on a fixed number of OS threads) scheduling approaches. + * + * The system is flexible and extensible; it can be used as a base for higher + * level systems, and its individual components are independently usable. + * + * Typical usage of the concurrency system is as follows: + * + * ~~~{.cc} + * #include + * + * int main() { + * ostd::coroutine_scheduler{}.start([]() { + * // the rest of main follows + * }); + * } + * ~~~ + * + * See the examples provided with the library for further information. + * + * @{ + */ + +/** @file concurrency.hh + * + * @brief Different concurrent schedulers and related APIs. + * + * This file implements several schedulers as well as APIs to spawn + * tasks, coroutines and channels that utilize the current scheduler's API. + * + * @copyright See COPYING.md in the project tree for further information. */ #ifndef OSTD_CONCURRENCY_HH @@ -20,6 +57,23 @@ namespace ostd { +/** @addtogroup Concurrency + * @{ + */ + +/** @brief A base interface for any scheduler. + * + * All schedulers derive from this. Its core interface is defined using + * pure virtual functions, which the schedulers are supposed to implement. + * + * Every scheduler its supposed to implement its own method `start`, which + * will take a function, arguments and will return any value returned from + * the given function. The given function will be used as the very first + * task (the main task) which typically replaces your `main` function. + * + * The `start` method will also set the internal current scheduler pointer + * so that APIs such as ostd::spawn(F &&, A &&...) can work. + */ struct scheduler { private: struct stack_allocator { @@ -38,26 +92,109 @@ private: scheduler *p_sched; }; -public: +protected: + /** Does nothing, this base class is empty. */ scheduler() {} +public: + /** A scheduler is not copy constructible. */ scheduler(scheduler const &) = delete; + + /** A scheduler is not move constructible. */ scheduler(scheduler &&) = delete; + + /** A scheduler is not copy assignable. */ scheduler &operator=(scheduler const &) = delete; + + /** A scheduler is not move assignable. */ scheduler &operator=(scheduler &&) = delete; + /** @brief Spawns a task. + * + * Spawns a task and schedules it for execution. This is a low level + * interface function. Typically you will want ostd::spawn(F &&, A &&...). + * The detailed behavior of the function is completely scheduler dependent. + * + * @see ostd::spawn(F &&, A &&...), spawn(F &&, A &&...) + */ virtual void spawn(std::function) = 0; + + /** @brief Tells the scheduler to re-schedule the current task. + * + * In #ostd::thread_scheduler, this is just a hint, as it uses OS threading + * facilities. In coroutine based schedulers, this will typically suspend + * the currently running task and re-schedule it for later. + * + * @see ostd::yield() + */ virtual void yield() noexcept = 0; + + /** @brief Creates a condition variable using #ostd::generic_condvar. + * + * A scheduler might be using a custom condition variable type depending + * on how its tasks are implemented. Other data structures might want to + * use these condition variables to synchronize (see make_channel() for + * an example). + * + * @see make_channel(), make_coroutine(F &&), make_generator(F &&) + */ virtual generic_condvar make_condition() = 0; + /** @brief Allocates a stack suitable for a coroutine. + * + * If the scheduler uses coroutine based tasks, this allows us to + * create coroutines and generators that use the same stacks as tasks. + * This has benefits particularly when a pooled stack allocator is in + * use for the tasks. + * + * Using get_stack_allocator() you can create an actual stack allocator + * usable with coroutines that uses this set of methods. + * + * @see deallocate_stack(stack_context &), reserve_stacks(size_t), + * get_stack_allocator() + */ virtual stack_context allocate_stack() = 0; + + /** @brief Deallocates a stack allocated with allocate_stack(). + * + * @see allocate_stack(), reserve_stacks(size_t), get_stack_allocator() + */ virtual void deallocate_stack(stack_context &st) noexcept = 0; + + /** @brief Reserves at least @p n stacks. + * + * If the stack allocator used in the scheduler is pooled, this will + * reserve the given number of stacks (or more). It can, however, be + * a no-op if the allocator is not pooled. + * + * @see allocate_stack(), deallocate_stack(stack_context &), + * get_stack_allocator() + */ virtual void reserve_stacks(size_t n) = 0; + /** @brief Gets a stack allocator using the scheduler's stack allocation. + * + * The stack allocator will use allocate_stack() and + * deallocate_stack(stack_context &) to perform the alloaction and + * deallocation. + */ stack_allocator get_stack_allocator() noexcept { return stack_allocator{*this}; } + /** @brief Spawns a task using any callable and a set of arguments. + * + * Just like spawn(std::function), but works for any callable + * and accepts arguments. If any arguments are provided, they're bound + * to the callable first. Then the result is converted to the right + * type for spawn(std::function) and passed. + * + * You can use this to spawn a task directly on a scheduler. However, + * typically you will not want to pass the scheduler around and instead + * use the generic ostd::spawn(), which works on any scheduler. + * + * @see spawn(std::function), ostd::spawn(F &&, A &&...) + */ template void spawn(F &&func, A &&...args) { if constexpr(sizeof...(A) == 0) { @@ -69,6 +206,17 @@ public: } } + /** @brief Creates a channel suitable for the scheduler. + * + * Returns a channel that uses a condition variable type returned by + * make_condition(). You can use this to create a channel directly + * with the scheduler. However, typically you will not want to pass + * it around, so ostd::make_channel() is a more convenient choice. + * + * @tparam T The type of the channel value. + * + * @see ostd::make_channel() + */ template channel make_channel() { return channel{[this]() { @@ -76,11 +224,29 @@ public: }}; } + /** @brief Creates a coroutine using the scheduler's stack allocator. + * + * Using ostd::make_coroutine(F &&) will do the same thing, but + * without the need to explicitly pass the scheduler around. + * + * @tparam T The type passed to the coroutine, `Result(Args...)`. + * + * @see make_generator(F &&), ostd::make_coroutine(F &&) + */ template coroutine make_coroutine(F &&func) { return coroutine{std::forward(func), get_stack_allocator()}; } + /** @brief Creates a generator using the scheduler's stack allocator. + * + * Using ostd::make_generator(F &&) will do the same thing, but + * without the need to explicitly pass the scheduler around. + * + * @tparam T The value type of the generator. + * + * @see make_coroutine(F &&), ostd::make_generator(F &&) + */ template generator make_generator(F &&func) { return generator{std::forward(func), get_stack_allocator()}; @@ -112,10 +278,32 @@ namespace detail { }; } +/** @brief A scheduler that uses an `std::thread` per each task. + * + * This one doesn't actually do any scheduling, it leaves it to the OS. + * Effectively this implements a 1:1 model. + * + * @tparam SA The stack allocator to use when requesting stacks. It's not + * actually used anywhere else, as thread stacks are managed by the OS. + * Can be a stack pool and only has to be move constructible. + */ template struct basic_thread_scheduler: scheduler { + /* @brief Creates the scheduler. + * + * @param[in] sa The provided stack allocator. + */ basic_thread_scheduler(SA &&sa = SA{}): p_stacks(std::move(sa)) {} + /** @brief Starts the scheduler given a set of arguments. + * + * Sets the internal current scheduler pointer to this scheduler and + * calls the given function. As it doesn't do any scheduling, it really + * just calls. Then it waits for all threads (tasks) it spawned to finish + * and returns the value returned from the given function, if any. + * + * @returns The result of @p func. + */ template auto start(F func, A &&...args) -> std::result_of_t { detail::current_scheduler_owner iface{*this}; @@ -202,6 +390,7 @@ private: std::mutex p_lock; }; +/** An #ostd::basic_thread_scheduler using #ostd::stack_pool. */ using thread_scheduler = basic_thread_scheduler; namespace detail { @@ -260,6 +449,16 @@ namespace detail { }; } +/** @brief A scheduler that uses a coroutine type for tasks on a single thread. + * + * Effectively implements the N:1 model. Runs on a single thread, so it doesn't + * make any use of multicore systems. The tasks bypass the + * coroutine_context::current() method, so they're completely hidden from the + * outside code. This also has several advantages for code using coroutines. + * + * @tparam SA The stack allocator to use when requesting stacks. Used for + * the tasks as well as for the stack request methods. + */ template struct basic_simple_coroutine_scheduler: scheduler { private: @@ -298,10 +497,28 @@ private: }; public: + /* @brief Creates the scheduler. + * + * @param[in] sa The provided stack allocator. + */ basic_simple_coroutine_scheduler(SA &&sa = SA{}): p_stacks(std::move(sa)) {} + /** @brief Starts the scheduler given a set of arguments. + * + * Sets the internal current scheduler pointer to this scheduler creates + * the main task using the separate provided stack allocator. This is + * useful because the task stacks tend to be rather small and we need + * a much bigger stack for the main task. + * + * After creating the task, starts the dispatcher on the thread. Returns + * the return value of the provided main task function once it finishes. + * + * @returns The result of @p func. + * + * @see start(F, A &&...) + */ template auto start(std::allocator_arg_t, TSA &&sa, F func, A &&...args) -> std::result_of_t @@ -337,6 +554,17 @@ public: } } + /** @brief Starts the scheduler given a set of arguments. + * + * Like start(std::allocator_arg_t, TSA &&, F, A &&...) but uses a + * fixed size stack that has the same size as the main thread stack. + * + * The stack traits type is inherited from @p SA. + * + * @returns The result of @p func. + * + * @see start(std::allocator_arg_t, TSA &&, F, A &&...) + */ template auto start(F func, A &&...args) -> std::result_of_t { basic_fixedsize_stack sa{ @@ -393,8 +621,20 @@ private: typename std::list::iterator p_idx = p_coros.end(); }; +/** An #ostd::basic_simple_coroutine_scheduler using #ostd::stack_pool. */ using simple_coroutine_scheduler = basic_simple_coroutine_scheduler; +/** @brief A scheduler that uses a coroutine type for tasks on several threads. + * + * Effectively implements the M:N model. Runs on several threads, typically as + * many as there are physical threads on your CPU(s), so it makes use of + * multicore systems. The tasks bypass the coroutine_context::current() method, + * so they're completely hidden from the outside code. This also has several + * advantages for code using coroutines. + * + * @tparam SA The stack allocator to use when requesting stacks. Used for + * the tasks as well as for the stack request methods. + */ template struct basic_coroutine_scheduler: scheduler { private: @@ -474,6 +714,13 @@ private: }; public: + /* @brief Creates the scheduler. + * + * The number of threads defaults to the number of physical threads. + * + * @param[in] thrs The number of threads to use. + * @param[in] sa The provided stack allocator. + */ basic_coroutine_scheduler( size_t thrs = std::thread::hardware_concurrency(), SA &&sa = SA{} ): @@ -482,6 +729,21 @@ public: ~basic_coroutine_scheduler() {} + /** @brief Starts the scheduler given a set of arguments. + * + * Sets the internal current scheduler pointer to this scheduler creates + * the main task using the separate provided stack allocator. This is + * useful because the task stacks tend to be rather small and we need + * a much bigger stack for the main task. + * + * After creating the task, creates the requested number of threads and + * starts the dispatcher on each. Then it waits for all threads to finish + * and returns the return value of the provided main task function. + * + * @returns The result of @p func. + * + * @see start(F, A &&...) + */ template auto start(std::allocator_arg_t, TSA &&sa, F func, A &&...args) -> std::result_of_t @@ -514,6 +776,17 @@ public: } } + /** @brief Starts the scheduler given a set of arguments. + * + * Like start(std::allocator_arg_t, TSA &&, F, A &&...) but uses a + * fixed size stack that has the same size as the main thread stack. + * + * The stack traits type is inherited from @p SA. + * + * @returns The result of @p func. + * + * @see start(std::allocator_arg_t, TSA &&, F, A &&...) + */ template auto start(F func, A &&...args) -> std::result_of_t { /* the default 64 KiB stack won't cut it for main, allocate a stack @@ -693,8 +966,14 @@ private: tlist p_running; }; +/** An #ostd::basic_coroutine_scheduler using #ostd::stack_pool. */ using coroutine_scheduler = basic_coroutine_scheduler; +/** @brief Spawns a task on the currently in use scheduler. + * + * The arguments are passed to the function. Effectively just calls + * scheduler::spawn(F &&, A &&...). + */ template inline void spawn(F &&func, A &&...args) { detail::current_scheduler->spawn( @@ -702,20 +981,45 @@ inline void spawn(F &&func, A &&...args) { ); } +/** @brief Tells the current scheduler to re-schedule the current task. + * + * Effectively calls scheduler::yield(). + */ inline void yield() noexcept { detail::current_scheduler->yield(); } +/** @brief Creates a channel with the currently in use scheduler. + * + * Effectively calls scheduler::make_channel(). + * + * @tparam T The type of the channel value. + * + */ template inline channel make_channel() { return detail::current_scheduler->make_channel(); } +/** @brief Creates a coroutine with the currently in use scheduler. + * + * Effectively calls scheduler::make_coroutine(F &&). + * + * @tparam T The type passed to the coroutine, `Result(Args...)`. + * + */ template inline coroutine make_coroutine(F &&func) { return detail::current_scheduler->make_coroutine(std::forward(func)); } +/** @brief Creates a generator with the currently in use scheduler. + * + * Effectively calls scheduler::make_generator(F &&). + * + * @tparam T The value type of the generator. + * + */ template inline generator make_generator(F &&func) { return detail::current_scheduler->make_generator(std::forward(func)); @@ -725,6 +1029,10 @@ inline void reserve_stacks(size_t n) { detail::current_scheduler->reserve_stacks(n); } +/** @} */ + } /* namespace ostd */ #endif + +/** @} */