/** @addtogroup Concurrency * @{ */ /** @file channel.hh * * @brief Thread-safe queue for cross-task data transfer. * * This file implements channels, a kind of thread-safe queue that can be * used to send and receive values across tasks safely. * * @copyright See COPYING.md in the project tree for further information. */ #ifndef OSTD_CHANNEL_HH #define OSTD_CHANNEL_HH #include #include #include #include #include #include #include #include #include #include namespace ostd { /** @addtogroup Concurrency * @{ */ /** @brief Thrown when manipulating a channel that has been closed. */ struct OSTD_EXPORT channel_error: std::logic_error { using std::logic_error::logic_error; /* empty, for vtable placement */ virtual ~channel_error(); }; /** @brief A thread-safe message queue. * * A channel is a kind of message queue (FIFO) that is properly synchronized. * It can be used standalone or it can be used as a part of libostd's * concurrency system. * * It stores its internal state in a reference counted manner, so multiple * channel instances can reference the same internal state. The internal * state is freed as soon as the last channel instance referencing it is * destroyed. * * Channels are move constructible. Moving a channel transfers the state * into another channel and leaves the original state in an unusable * state, with no reference count change. Copying a channel increases * the reference count, so both instances point to the ssame state and * both are valid. * * @tparam T The type of the values in the queue. */ template struct channel { /** @brief Constructs a default channel. * * This uses std::condition_variable as its internal condition type, * so it will work with standard threads (raw or when used with C++'s * async APIs). You can also use channels with ostd's concurrency system * though - see ostd::make_channel() and channel(F). */ channel(): p_state(new impl) {} /** @brief Constructs a channel with a custom condition variable type. * * Channels use #ostd::generic_condvar to store their condvars internally, * so you can provide a custom type as well. This comes in handy for * example when doing custom scheduling. * * The condvar cannot be passed directly though, as it's not required to * be copy or move constructible, so it's passed in through a function * instead - the function is meant to return the condvar. * * However, typically you won't be using this directly, as you're meant * to use the higher level concurrency system, which lready provides the * ostd::make_channel() function. * * @param[in] func A function that returns the desired condvar. */ template channel(F func): p_state(new impl{func}) {} channel(channel const &) = default; channel(channel &&) = default; channel &operator=(channel const &) = default; channel &operator=(channel &&) = default; /** @brief Inserts a copy of a value into the queue. * * This will insert a copy of @p val into the queue and notify any * single task waiting on the queue (if present, see get()) that the * queue is ready. * * @param[in] val The value to insert. * * @throws ostd::channel_error when the channel is closed. * * @see put(T &&), get(), try_get(), close(), cloed() */ void put(T const &val) { p_state->put(val); } /** @brief Moves a value into the queue. * * Same as put(T const &), except it moves the value. * * @param[in] val The value to insert. * * @throws ostd::channel_error when the channel is closed. * * @see put(T const &), emplace() */ void put(T &&val) { p_state->put(std::move(val)); } /** @brief Like put(), but constructs the element in-place. * * The arguments are to be passed to the element constructor. * No copy or move operations are performed. */ template void emplace(A &&...args) { p_state->emplace(std::forward(args)...); } /** @brief Waits for a value and returns it. * * If the queue is empty at the time of the call, this will block the * calling task and wait until there is a value in the queue. Once there * is a value, it pops it out of the queue and returns it. The value is * moved out of the queue. * * If you don't want to wait and want to just check if there is something * in the queue already, use try_get(). * * @returns The first inserted value in the queue. * * @throws ostd::channel_error when the channel is closed. * * @see try_get(), put(T const &), close(), closed() */ T get() { T ret; /* guaranteed to return true if at all */ p_state->get(ret, true); return ret; } /** @brief Gets a value from the queue if there is one. * * If a value is present in the queue, returns the value. * Otherwise returns std::nullopt. See get() for a waiting * version. * * @returns The value or std::nullopt if there isn't one. * * @throws ostd::channel_error when the channel is closed. * * @see get(), put(T const &), close(), closed() */ std::optional try_get() { T ret; if (!p_state->get(ret, false)) { return std::nullopt; } return std::move(ret); } /** @brief Checks if the channel is empty. * * A channel is empty if there are no values in the queue. It's also * considered empty if it's closed() (even if there are items left in * the queue). * * @returns `true` if empty, `false` otherwise. */ bool empty() const noexcept { return p_state->empty(); } /** @brief Checks if the channel is closed. * * A channel is only called after explicitly calling close(). * * @returns `true` if closed, `false` otherwise. */ bool closed() const noexcept { return p_state->closed(); } /** @brief Closes the channel. No effect if already closed. */ void close() noexcept { p_state->close(); } private: struct impl { impl() { } template impl(F &func): p_lock(), p_cond(func()) {} template void put(U &&val) { { std::lock_guard l{p_lock}; if (p_closed) { throw channel_error{"put in a closed channel"}; } p_messages.push_back(std::forward(val)); } p_cond.notify_one(); } template void emplace(A &&...args) { { std::lock_guard l{p_lock}; if (p_closed) { throw channel_error{"emplace in a closed channel"}; } p_messages.emplace(std::forward(args)...); } p_cond.notify_one(); } bool get(T &val, bool w) { std::unique_lock l{p_lock}; if (w) { while (!p_closed && p_messages.empty()) { p_cond.wait(l); } } if (p_messages.empty()) { if (p_closed) { throw channel_error{"get from a closed channel"}; } return false; } val = std::move(p_messages.front()); p_messages.pop_front(); return true; } bool empty() const noexcept { std::lock_guard l{p_lock}; return p_closed || p_messages.empty(); } bool closed() const noexcept { std::lock_guard l{p_lock}; return p_closed; } void close() noexcept { { std::lock_guard l{p_lock}; p_closed = true; } p_cond.notify_all(); } std::list p_messages; mutable std::mutex p_lock; generic_condvar p_cond; bool p_closed = false; }; /* basic and inefficient, deal with it better later */ std::shared_ptr p_state; }; /** @} */ } /* namespace ostd */ #endif /** @} */