separate thread pool into two structures

the internal one will also be used in parallel coroutine scheduler
master
Daniel Kolesa 2017-03-20 18:42:54 +01:00
parent 367d2b8b31
commit a080a17d00
1 changed files with 100 additions and 56 deletions

View File

@ -20,6 +20,86 @@
namespace ostd {
namespace detail {
/* can be used as a base for any custom thread pool, internal */
struct thread_pool_base {
virtual ~thread_pool_base() {
destroy();
}
void destroy() {
{
std::lock_guard<std::mutex> l{p_lock};
if (!p_running) {
return;
}
p_running = false;
}
p_cond.notify_all();
for (auto &tid: p_thrs) {
tid.join();
p_cond.notify_all();
}
}
protected:
thread_pool_base() {}
thread_pool_base(thread_pool_base const &) = delete;
thread_pool_base(thread_pool_base &&) = delete;
thread_pool_base &operator=(thread_pool_base const &) = delete;
thread_pool_base &operator=(thread_pool_base &&) = delete;
template<typename B>
void start(size_t size) {
p_running = true;
auto tf = [this]() {
thread_run<B>();
};
for (size_t i = 0; i < size; ++i) {
std::thread tid{tf};
if (!tid.joinable()) {
throw std::runtime_error{"thread_pool worker failed"};
}
p_thrs.push_back(std::move(tid));
}
}
template<typename F>
void push_task(F &&func) {
{
std::lock_guard<std::mutex> l{this->p_lock};
if (!p_running) {
throw std::runtime_error{"push on stopped thread_pool"};
}
func();
}
p_cond.notify_one();
}
private:
template<typename B>
void thread_run() {
B &self = *static_cast<B *>(this);
for (;;) {
std::unique_lock<std::mutex> l{p_lock};
while (p_running && self.empty()) {
p_cond.wait(l);
}
if (!p_running && self.empty()) {
return;
}
self.task_run(l);
}
}
std::condition_variable p_cond;
std::mutex p_lock;
std::vector<std::thread> p_thrs;
bool p_running = false;
};
/* regular thread pool task, as lightweight as possible */
struct tpool_func_base {
tpool_func_base() {}
virtual ~tpool_func_base() {}
@ -87,38 +167,16 @@ namespace detail {
};
}
struct thread_pool {
struct thread_pool: detail::thread_pool_base {
private:
friend struct detail::thread_pool_base;
using base_t = detail::thread_pool_base;
public:
thread_pool(): base_t() {}
void start(size_t size = std::thread::hardware_concurrency()) {
p_running = true;
auto tf = [this]() {
thread_run();
};
for (size_t i = 0; i < size; ++i) {
std::thread tid{tf};
if (!tid.joinable()) {
throw std::runtime_error{"thread_pool worker failed"};
}
p_thrs.push_back(std::move(tid));
}
}
~thread_pool() {
destroy();
}
void destroy() {
{
std::lock_guard<std::mutex> l{p_lock};
if (!p_running) {
return;
}
p_running = false;
}
p_cond.notify_all();
for (auto &tid: p_thrs) {
tid.join();
p_cond.notify_all();
}
base_t::template start<thread_pool>(size);
}
template<typename F, typename ...A>
@ -135,39 +193,25 @@ struct thread_pool {
};
}
auto ret = t.get_future();
{
std::lock_guard<std::mutex> l{p_lock};
if (!p_running) {
throw std::runtime_error{"push on stopped thread_pool"};
}
this->push_task([this, &t]() {
p_tasks.emplace(std::move(t));
}
p_cond.notify_one();
});
return ret;
}
private:
void thread_run() {
for (;;) {
std::unique_lock<std::mutex> l{p_lock};
while (p_running && p_tasks.empty()) {
p_cond.wait(l);
}
if (!p_running && p_tasks.empty()) {
return;
}
auto t{std::move(p_tasks.front())};
p_tasks.pop();
l.unlock();
t();
}
bool empty() const {
return p_tasks.empty();
}
void task_run(std::unique_lock<std::mutex> &l) {
auto t{std::move(p_tasks.front())};
p_tasks.pop();
l.unlock();
t();
}
std::condition_variable p_cond;
std::mutex p_lock;
std::vector<std::thread> p_thrs;
std::queue<detail::tpool_func> p_tasks;
bool p_running = false;
};
} /* namespace ostd */