diff --git a/main.cc b/main.cc index cf9514f..0abe5da 100644 --- a/main.cc +++ b/main.cc @@ -8,6 +8,8 @@ #include +#include "tpool.hh" + /* represents a rule definition, possibly with a function */ struct Rule { ostd::String target; diff --git a/tpool.hh b/tpool.hh new file mode 100644 index 0000000..8283a27 --- /dev/null +++ b/tpool.hh @@ -0,0 +1,137 @@ +#include + +#include +#include +#include +#include + +struct Mutex { + Mutex() { + pthread_mutex_init(&mtx, nullptr); + locked = false; + } + + ~Mutex() { + while (locked) unlock(); + pthread_mutex_destroy(&mtx); + } + + void lock() { + pthread_mutex_lock(&mtx); + locked = true; + } + + void unlock() { + locked = false; + pthread_mutex_unlock(&mtx); + } + + pthread_mutex_t mtx; + volatile bool locked; +}; + +struct Cond { + Cond() { + pthread_cond_init(&cnd, nullptr); + } + + ~Cond() { + pthread_cond_destroy(&cnd); + } + + void signal() { + pthread_cond_signal(&cnd); + } + + void broadcast() { + pthread_cond_broadcast(&cnd); + } + + void wait(Mutex &m) { + pthread_cond_wait(&cnd, &m.mtx); + } + + pthread_cond_t cnd; +}; + +struct Task { + ostd::Function cb; + Task *next = nullptr; + Task(ostd::Function &&cb): cb(ostd::move(cb)) {} +}; + +struct ThreadPool { + ThreadPool(ostd::Size size): size(size) {} + + ~ThreadPool() { + if (running) destroy(); + } + + static void *thr_func(void *ptr) { + ((ThreadPool *)ptr)->run(); + return nullptr; + } + + bool init() { + running = true; + for (ostd::Size i = 0; i < size; ++i) { + pthread_t tid; + if (pthread_create(&tid, nullptr, thr_func, this)) + return false; + thrs.push(tid); + } + return true; + } + + void destroy() { + mtx.lock(); + running = false; + mtx.unlock(); + cond.broadcast(); + for (ostd::Size i = 0; i < size; ++i) { + void *ret; + pthread_join(thrs[i], &ret); + cond.broadcast(); + } + } + + void run() { + for (;;) { + mtx.lock(); + while (running && (tasks == nullptr)) + cond.wait(mtx); + if (!running) { + mtx.unlock(); + pthread_exit(nullptr); + } + Task *t = tasks; + tasks = t->next; + if (last_task == t) + last_task = nullptr; + mtx.unlock(); + t->cb(); + delete t; + } + } + + void push(ostd::Function func) { + mtx.lock(); + Task *t = new Task(ostd::move(func)); + if (last_task) + last_task->next = t; + last_task = t; + if (!tasks) + tasks = t; + cond.signal(); + mtx.unlock(); + } + +private: + ostd::Size size; + Cond cond; + Mutex mtx; + ostd::Vector thrs; + Task *tasks; + Task *last_task; + volatile bool running; +}; \ No newline at end of file