From 35cda8872e693b513fb09b2d26b561a9a039f2ae Mon Sep 17 00:00:00 2001 From: q66 Date: Sat, 18 Mar 2017 18:34:13 +0100 Subject: [PATCH] add a thread pool impl --- ostd/concurrency.hh | 1 - ostd/thread_pool.hh | 111 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 111 insertions(+), 1 deletion(-) create mode 100644 ostd/thread_pool.hh diff --git a/ostd/concurrency.hh b/ostd/concurrency.hh index e706211..d27db6c 100644 --- a/ostd/concurrency.hh +++ b/ostd/concurrency.hh @@ -6,7 +6,6 @@ #ifndef OSTD_CONCURRENCY_HH #define OSTD_CONCURRENCY_HH -#include #include #include diff --git a/ostd/thread_pool.hh b/ostd/thread_pool.hh new file mode 100644 index 0000000..86fb43b --- /dev/null +++ b/ostd/thread_pool.hh @@ -0,0 +1,111 @@ +/* A thread pool that can be used standalone or within a more elaborate module. + * + * This file is part of OctaSTD. See COPYING.md for futher information. + */ + +#ifndef OSTD_THREAD_POOL_HH +#define OSTD_THREAD_POOL_HH + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace ostd { + +struct thread_pool { + void start(size_t size = std::thread::hardware_concurrency()) { + p_running = true; + auto tf = [this]() { + thread_run(); + }; + for (size_t i = 0; i < size; ++i) { + std::thread tid{tf}; + if (!tid.joinable()) { + throw std::runtime_error{"thread_pool worker failed"}; + } + p_thrs.push_back(std::move(tid)); + } + } + + ~thread_pool() { + destroy(); + } + + void destroy() { + std::unique_lock l{p_lock}; + if (!p_running) { + return; + } + p_running = false; + l.unlock(); + p_cond.notify_all(); + for (auto &tid: p_thrs) { + tid.join(); + p_cond.notify_all(); + } + } + + template + auto push(F &&func, A &&...args) -> std::result_of_t { + using R = std::result_of_t; + if constexpr(std::is_same_v) { + /* void-returning funcs return void */ + std::unique_lock l{p_lock}; + if (!p_running) { + throw std::runtime_error{"push on stopped thread_pool"}; + } + p_tasks.push( + std::bind(std::forward(func), std::forward(args)...) + ); + p_cond.notify_one(); + } else { + /* non-void-returning funcs return a future */ + std::packaged_task t{ + std::bind(std::forward(func), std::forward(args)...) + }; + auto ret = t.get_future(); + std::unique_lock l{p_lock}; + if (!p_running) { + throw std::runtime_error{"push on stopped thread_pool"}; + } + p_tasks.emplace([t = std::move(t)]() { + t(); + }); + p_cond.notify_one(); + return ret; + } + } + +private: + void thread_run() { + for (;;) { + std::unique_lock l{p_lock}; + while (p_running && p_tasks.empty()) { + p_cond.wait(l); + } + if (!p_running && p_tasks.empty()) { + return; + } + auto t = std::move(p_tasks.front()); + p_tasks.pop(); + l.unlock(); + t(); + } + } + + std::condition_variable p_cond; + std::mutex p_lock; + std::vector p_thrs; + std::queue> p_tasks; + bool p_running = false; +}; + +} /* namespace ostd */ + +#endif