From a080a17d00c8671443d76e2de77a6d81e38b76b0 Mon Sep 17 00:00:00 2001 From: q66 Date: Mon, 20 Mar 2017 18:42:54 +0100 Subject: [PATCH] separate thread pool into two structures the internal one will also be used in parallel coroutine scheduler --- ostd/thread_pool.hh | 156 ++++++++++++++++++++++++++++---------------- 1 file changed, 100 insertions(+), 56 deletions(-) diff --git a/ostd/thread_pool.hh b/ostd/thread_pool.hh index 0a54b50..c39e388 100644 --- a/ostd/thread_pool.hh +++ b/ostd/thread_pool.hh @@ -20,6 +20,86 @@ namespace ostd { namespace detail { + /* can be used as a base for any custom thread pool, internal */ + struct thread_pool_base { + virtual ~thread_pool_base() { + destroy(); + } + + void destroy() { + { + std::lock_guard l{p_lock}; + if (!p_running) { + return; + } + p_running = false; + } + p_cond.notify_all(); + for (auto &tid: p_thrs) { + tid.join(); + p_cond.notify_all(); + } + } + + protected: + thread_pool_base() {} + + thread_pool_base(thread_pool_base const &) = delete; + thread_pool_base(thread_pool_base &&) = delete; + thread_pool_base &operator=(thread_pool_base const &) = delete; + thread_pool_base &operator=(thread_pool_base &&) = delete; + + template + void start(size_t size) { + p_running = true; + auto tf = [this]() { + thread_run(); + }; + for (size_t i = 0; i < size; ++i) { + std::thread tid{tf}; + if (!tid.joinable()) { + throw std::runtime_error{"thread_pool worker failed"}; + } + p_thrs.push_back(std::move(tid)); + } + } + + template + void push_task(F &&func) { + { + std::lock_guard l{this->p_lock}; + if (!p_running) { + throw std::runtime_error{"push on stopped thread_pool"}; + } + func(); + } + p_cond.notify_one(); + } + + private: + template + void thread_run() { + B &self = *static_cast(this); + for (;;) { + std::unique_lock l{p_lock}; + while (p_running && self.empty()) { + p_cond.wait(l); + } + if (!p_running && self.empty()) { + return; + } + self.task_run(l); + } + } + + std::condition_variable p_cond; + std::mutex p_lock; + std::vector p_thrs; + bool p_running = false; + }; + + /* regular thread pool task, as lightweight as possible */ + struct tpool_func_base { tpool_func_base() {} virtual ~tpool_func_base() {} @@ -87,38 +167,16 @@ namespace detail { }; } -struct thread_pool { +struct thread_pool: detail::thread_pool_base { +private: + friend struct detail::thread_pool_base; + using base_t = detail::thread_pool_base; + +public: + thread_pool(): base_t() {} + void start(size_t size = std::thread::hardware_concurrency()) { - p_running = true; - auto tf = [this]() { - thread_run(); - }; - for (size_t i = 0; i < size; ++i) { - std::thread tid{tf}; - if (!tid.joinable()) { - throw std::runtime_error{"thread_pool worker failed"}; - } - p_thrs.push_back(std::move(tid)); - } - } - - ~thread_pool() { - destroy(); - } - - void destroy() { - { - std::lock_guard l{p_lock}; - if (!p_running) { - return; - } - p_running = false; - } - p_cond.notify_all(); - for (auto &tid: p_thrs) { - tid.join(); - p_cond.notify_all(); - } + base_t::template start(size); } template @@ -135,39 +193,25 @@ struct thread_pool { }; } auto ret = t.get_future(); - { - std::lock_guard l{p_lock}; - if (!p_running) { - throw std::runtime_error{"push on stopped thread_pool"}; - } + this->push_task([this, &t]() { p_tasks.emplace(std::move(t)); - } - p_cond.notify_one(); + }); return ret; } private: - void thread_run() { - for (;;) { - std::unique_lock l{p_lock}; - while (p_running && p_tasks.empty()) { - p_cond.wait(l); - } - if (!p_running && p_tasks.empty()) { - return; - } - auto t{std::move(p_tasks.front())}; - p_tasks.pop(); - l.unlock(); - t(); - } + bool empty() const { + return p_tasks.empty(); + } + + void task_run(std::unique_lock &l) { + auto t{std::move(p_tasks.front())}; + p_tasks.pop(); + l.unlock(); + t(); } - std::condition_variable p_cond; - std::mutex p_lock; - std::vector p_thrs; std::queue p_tasks; - bool p_running = false; }; } /* namespace ostd */