split threadpool into a separate header

master
Daniel Kolesa 2016-08-02 21:49:10 +01:00
parent 113d18c026
commit e8901e2fae
4 changed files with 113 additions and 103 deletions

View File

@ -19,5 +19,5 @@ obuild: $(FILES)
clean:
rm -f $(FILES) obuild
main.o: $(CUBESCRIPT_PATH)/cubescript.hh
main.o: tpool.hh $(CUBESCRIPT_PATH)/cubescript.hh
globs.o: $(CUBESCRIPT_PATH)/cubescript.hh

103
main.cc
View File

@ -9,12 +9,13 @@
#include <ostd/platform.hh>
#include <ostd/utility.hh>
#include <ostd/environ.hh>
#include <ostd/thread.hh>
#include <ostd/mutex.hh>
#include <ostd/condition.hh>
#include <cubescript.hh>
#include "tpool.hh"
void cs_register_globs(cscript::CsState &cs);
using ostd::ConstCharRange;
@ -23,7 +24,6 @@ using ostd::Map;
using ostd::String;
using ostd::Uint32;
using ostd::slice_until;
using ostd::Thread;
using ostd::UniqueLock;
using ostd::Mutex;
using ostd::Condition;
@ -33,105 +33,6 @@ using cscript::TvalRange;
using cscript::StackedValue;
using cscript::Bytecode;
/* thread pool */
struct ThreadPool {
ThreadPool():
cond(), mtx(), thrs(), tasks(nullptr), last_task(nullptr),
running(false)
{}
~ThreadPool() {
destroy();
}
static void *thr_func(void *ptr) {
static_cast<ThreadPool *>(ptr)->run();
return nullptr;
}
bool init(ostd::Size size) {
running = true;
for (ostd::Size i = 0; i < size; ++i) {
Thread tid([this]() { run(); });
if (!tid) {
return false;
}
thrs.push(ostd::move(tid));
}
return true;
}
void destroy() {
mtx.lock();
if (!running) {
mtx.unlock();
return;
}
running = false;
mtx.unlock();
cond.broadcast();
for (Thread &tid: thrs.iter()) {
tid.join();
cond.broadcast();
}
}
void run() {
for (;;) {
UniqueLock<Mutex> l(mtx);
while (running && (tasks == nullptr)) {
cond.wait(l);
}
if (!running) {
l.unlock();
ostd::this_thread::exit();
}
Task *t = tasks;
tasks = t->next;
if (last_task == t) {
last_task = nullptr;
}
l.unlock();
t->cb();
delete t;
}
}
void push(ostd::Function<void()> func) {
mtx.lock();
Task *t = new Task(ostd::move(func));
if (last_task) {
last_task->next = t;
}
last_task = t;
if (!tasks) {
tasks = t;
}
cond.signal();
mtx.unlock();
}
private:
struct Task {
ostd::Function<void()> cb;
Task *next = nullptr;
Task() = delete;
Task(Task const &) = delete;
Task(Task &&) = delete;
Task(ostd::Function<void()> &&cbf): cb(ostd::move(cbf)) {}
Task &operator=(Task const &) = delete;
Task &operator=(Task &&) = delete;
};
Condition cond;
Mutex mtx;
Vector<Thread> thrs;
Task *tasks;
Task *last_task;
bool volatile running;
};
/* check funcs */
static bool ob_check_ts(ConstCharRange tname, Vector<String> const &deps) {

View File

@ -24,7 +24,7 @@ action clean [
shell rm -f $FILES obuild_ob
]
depend main_ob.o [@CS_PATH/cubescript.hh]
depend main_ob.o [tpool.hh @CS_PATH/cubescript.hh]
depend globs_ob.o [@CS_PATH/cubescript.hh]
rule default obuild

109
tpool.hh 100644
View File

@ -0,0 +1,109 @@
#ifndef OBUILD_TPOOL_HH
#define OBUILD_TPOOL_HH
#include <ostd/thread.hh>
using ostd::Thread;
using ostd::UniqueLock;
using ostd::Mutex;
using ostd::Condition;
using ostd::Vector;
struct ThreadPool {
ThreadPool():
cond(), mtx(), thrs(), tasks(nullptr), last_task(nullptr),
running(false)
{}
~ThreadPool() {
destroy();
}
static void *thr_func(void *ptr) {
static_cast<ThreadPool *>(ptr)->run();
return nullptr;
}
bool init(ostd::Size size) {
running = true;
for (ostd::Size i = 0; i < size; ++i) {
Thread tid([this]() { run(); });
if (!tid) {
return false;
}
thrs.push(ostd::move(tid));
}
return true;
}
void destroy() {
mtx.lock();
if (!running) {
mtx.unlock();
return;
}
running = false;
mtx.unlock();
cond.broadcast();
for (Thread &tid: thrs.iter()) {
tid.join();
cond.broadcast();
}
}
void run() {
for (;;) {
UniqueLock<Mutex> l(mtx);
while (running && (tasks == nullptr)) {
cond.wait(l);
}
if (!running) {
l.unlock();
ostd::this_thread::exit();
}
Task *t = tasks;
tasks = t->next;
if (last_task == t) {
last_task = nullptr;
}
l.unlock();
t->cb();
delete t;
}
}
void push(ostd::Function<void()> func) {
mtx.lock();
Task *t = new Task(ostd::move(func));
if (last_task) {
last_task->next = t;
}
last_task = t;
if (!tasks) {
tasks = t;
}
cond.signal();
mtx.unlock();
}
private:
struct Task {
ostd::Function<void()> cb;
Task *next = nullptr;
Task() = delete;
Task(Task const &) = delete;
Task(Task &&) = delete;
Task(ostd::Function<void()> &&cbf): cb(ostd::move(cbf)) {}
Task &operator=(Task const &) = delete;
Task &operator=(Task &&) = delete;
};
Condition cond;
Mutex mtx;
Vector<Thread> thrs;
Task *tasks;
Task *last_task;
bool volatile running;
};
#endif