merge tpool.hh into main.cc

master
Daniel Kolesa 2015-11-27 22:11:46 +00:00
parent 2b4c8dd359
commit 4fe2b2f7db
4 changed files with 137 additions and 144 deletions

View File

@ -21,7 +21,7 @@ cubescript.o: $(CUBESCRIPT_PATH)/cubescript.cc
clean:
rm -f $(FILES) obuild
main.o: globs.hh tpool.hh
main.o: globs.hh
main.o: $(CUBESCRIPT_PATH)/cubescript.hh
globs.o: globs.hh

136
main.cc
View File

@ -1,17 +1,19 @@
#include <unistd.h>
#include <pthread.h>
#include <ostd/types.hh>
#include <ostd/functional.hh>
#include <ostd/string.hh>
#include <ostd/vector.hh>
#include <ostd/map.hh>
#include <ostd/atomic.hh>
#include <ostd/filesystem.hh>
#include <ostd/platform.hh>
#include <ostd/utility.hh>
#include <cubescript.hh>
#include "globs.hh"
#include "tpool.hh"
using ostd::ConstCharRange;
using ostd::Vector;
@ -24,6 +26,138 @@ using cscript::CsState;
using cscript::TvalRange;
using cscript::StackedValue;
/* thread pool */
struct Mutex {
Mutex() {
pthread_mutex_init(&mtx, nullptr);
locked = false;
}
~Mutex() {
while (locked) unlock();
pthread_mutex_destroy(&mtx);
}
void lock() {
pthread_mutex_lock(&mtx);
locked = true;
}
void unlock() {
locked = false;
pthread_mutex_unlock(&mtx);
}
pthread_mutex_t mtx;
volatile bool locked;
};
struct Cond {
Cond() {
pthread_cond_init(&cnd, nullptr);
}
~Cond() {
pthread_cond_destroy(&cnd);
}
void signal() {
pthread_cond_signal(&cnd);
}
void broadcast() {
pthread_cond_broadcast(&cnd);
}
void wait(Mutex &m) {
pthread_cond_wait(&cnd, &m.mtx);
}
pthread_cond_t cnd;
};
struct Task {
ostd::Function<void()> cb;
Task *next = nullptr;
Task(ostd::Function<void()> &&cb): cb(ostd::move(cb)) {}
};
struct ThreadPool {
ThreadPool() {}
~ThreadPool() {
if (running) destroy();
}
static void *thr_func(void *ptr) {
((ThreadPool *)ptr)->run();
return nullptr;
}
bool init(ostd::Size size) {
running = true;
for (ostd::Size i = 0; i < size; ++i) {
pthread_t tid;
if (pthread_create(&tid, nullptr, thr_func, this))
return false;
thrs.push(tid);
}
return true;
}
void destroy() {
mtx.lock();
running = false;
mtx.unlock();
cond.broadcast();
for (pthread_t &tid: thrs.iter()) {
void *ret;
pthread_join(tid, &ret);
cond.broadcast();
}
}
void run() {
for (;;) {
mtx.lock();
while (running && (tasks == nullptr))
cond.wait(mtx);
if (!running) {
mtx.unlock();
pthread_exit(nullptr);
}
Task *t = tasks;
tasks = t->next;
if (last_task == t)
last_task = nullptr;
mtx.unlock();
t->cb();
delete t;
}
}
void push(ostd::Function<void()> func) {
mtx.lock();
Task *t = new Task(ostd::move(func));
if (last_task)
last_task->next = t;
last_task = t;
if (!tasks)
tasks = t;
cond.signal();
mtx.unlock();
}
private:
Cond cond;
Mutex mtx;
Vector<pthread_t> thrs;
Task *tasks;
Task *last_task;
volatile bool running;
};
/* check funcs */
static bool ob_check_ts(ConstCharRange tname, const Vector<String> &deps) {

View File

@ -26,7 +26,7 @@ action clean [
shell rm -f $FILES obuild_ob
]
depend main_ob.o globs.hh tpool.hh [@CS_PATH/cubescript.hh]
depend main_ob.o globs.hh [@CS_PATH/cubescript.hh]
depend globs_ob.o globs.hh [@CS_PATH/cubescript.hh]
depend cubescript_ob.o [@CS_PATH/cubescript.hh]

141
tpool.hh
View File

@ -1,141 +0,0 @@
#ifndef OCTABUILD_TPOOL_HH
#define OCTABUILD_TPOOL_HH
#include <pthread.h>
#include <ostd/types.hh>
#include <ostd/functional.hh>
#include <ostd/vector.hh>
#include <ostd/utility.hh>
struct Mutex {
Mutex() {
pthread_mutex_init(&mtx, nullptr);
locked = false;
}
~Mutex() {
while (locked) unlock();
pthread_mutex_destroy(&mtx);
}
void lock() {
pthread_mutex_lock(&mtx);
locked = true;
}
void unlock() {
locked = false;
pthread_mutex_unlock(&mtx);
}
pthread_mutex_t mtx;
volatile bool locked;
};
struct Cond {
Cond() {
pthread_cond_init(&cnd, nullptr);
}
~Cond() {
pthread_cond_destroy(&cnd);
}
void signal() {
pthread_cond_signal(&cnd);
}
void broadcast() {
pthread_cond_broadcast(&cnd);
}
void wait(Mutex &m) {
pthread_cond_wait(&cnd, &m.mtx);
}
pthread_cond_t cnd;
};
struct Task {
ostd::Function<void()> cb;
Task *next = nullptr;
Task(ostd::Function<void()> &&cb): cb(ostd::move(cb)) {}
};
struct ThreadPool {
ThreadPool() {}
~ThreadPool() {
if (running) destroy();
}
static void *thr_func(void *ptr) {
((ThreadPool *)ptr)->run();
return nullptr;
}
bool init(ostd::Size size) {
running = true;
for (ostd::Size i = 0; i < size; ++i) {
pthread_t tid;
if (pthread_create(&tid, nullptr, thr_func, this))
return false;
thrs.push(tid);
}
return true;
}
void destroy() {
mtx.lock();
running = false;
mtx.unlock();
cond.broadcast();
for (pthread_t &tid: thrs.iter()) {
void *ret;
pthread_join(tid, &ret);
cond.broadcast();
}
}
void run() {
for (;;) {
mtx.lock();
while (running && (tasks == nullptr))
cond.wait(mtx);
if (!running) {
mtx.unlock();
pthread_exit(nullptr);
}
Task *t = tasks;
tasks = t->next;
if (last_task == t)
last_task = nullptr;
mtx.unlock();
t->cb();
delete t;
}
}
void push(ostd::Function<void()> func) {
mtx.lock();
Task *t = new Task(ostd::move(func));
if (last_task)
last_task->next = t;
last_task = t;
if (!tasks)
tasks = t;
cond.signal();
mtx.unlock();
}
private:
Cond cond;
Mutex mtx;
ostd::Vector<pthread_t> thrs;
Task *tasks;
Task *last_task;
volatile bool running;
};
#endif /* OCTABUILD_TPOOL_HH */