From c120f49634811e443f82949e358e4d5eb52c5aab Mon Sep 17 00:00:00 2001 From: q66 Date: Tue, 21 Mar 2017 00:27:20 +0100 Subject: [PATCH] implement an M:N thread/coroutine scheduler --- examples/concurrency.cc | 47 +++++-- ostd/concurrency.hh | 266 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 304 insertions(+), 9 deletions(-) diff --git a/examples/concurrency.cc b/examples/concurrency.cc index ae6c723..5fb4eb3 100644 --- a/examples/concurrency.cc +++ b/examples/concurrency.cc @@ -4,6 +4,10 @@ using namespace ostd; int main() { + /* have an array, split it in two halves and sum each half in a separate + * task, which may or may not run in parallel with the other one depending + * on the scheduler currently in use - several schedulers are shown + */ auto foo = [](auto &sched) { auto arr = ostd::iter({ 150, 38, 76, 25, 67, 18, -15, 215, 25, -10 }); @@ -18,26 +22,53 @@ int main() { writefln("%s + %s = %s", a, b, a + b); }; + /* using thread_scheduler results in an OS thread spawned per task, + * implementing a 1:1 (kernel-level) scheduling - very expensive on + * Windows, less expensive on Unix-likes (but more than coroutines) + */ thread_scheduler tsched; tsched.start([&tsched, &foo]() { - writeln("thread scheduler: starting..."); + writeln("(1) 1:1 scheduler: starting..."); foo(tsched); - writeln("thread scheduler: finishing..."); + writeln("(1) 1:1 scheduler: finishing..."); }); + writeln(); + /* using simple_coroutine_scheduler results in a coroutine spawned + * per task, implementing N:1 (user-level) scheduling - very cheap + * and portable everywhere but obviously limited to only one thread + */ simple_coroutine_scheduler scsched; scsched.start([&scsched, &foo]() { - writeln("simple coroutine scheduler: starting..."); + writeln("(2) N:1 scheduler: starting..."); foo(scsched); - writeln("simple coroutine scheduler: finishing..."); + writeln("(2) N:1 scheduler: finishing..."); + }); + writeln(); + + /* using coroutine_scheduler results in a coroutine spawned per + * task, but mapped onto a certain number of OS threads, implementing + * a hybrid M:N approach - this benefits from multicore systems and + * also is relatively cheap (you can create a big number of tasks) + */ + coroutine_scheduler csched; + csched.start([&csched, &foo]() { + writeln("(3) M:N scheduler: starting..."); + foo(csched); + writeln("(3) M:N scheduler: finishing..."); }); } /* -thread scheduler: starting... +(1) 1:1 scheduler: starting... 356 + 233 = 589 -thread scheduler: finishing... -simple coroutine scheduler: starting... +(1) 1:1 scheduler: finishing... + +(2) N:1 scheduler: starting... 356 + 233 = 589 -simple coroutine scheduler: finishing... +(2) N:1 scheduler: finishing... + +(3) M:N scheduler: starting... +356 + 233 = 589 +(3) M:N scheduler: finishing... */ diff --git a/ostd/concurrency.hh b/ostd/concurrency.hh index 8d14592..c4bb549 100644 --- a/ostd/concurrency.hh +++ b/ostd/concurrency.hh @@ -36,7 +36,7 @@ struct thread_scheduler { *it = std::thread{ [this, it](auto func, auto ...args) { func(std::move(args)...); - this->remove_thread(it); + remove_thread(it); }, std::forward(func), std::forward(args)... }; @@ -225,6 +225,270 @@ using simple_coroutine_scheduler = using protected_simple_coroutine_scheduler = basic_simple_coroutine_scheduler; +template +struct basic_coroutine_scheduler { +private: + struct task_cond; + struct task; + + using tlist = std::list; + using titer = typename tlist::iterator; + + struct task { + struct coro: coroutine { + using coroutine::coroutine; + task *tptr = nullptr; + }; + + coro func; + task_cond *waiting_on = nullptr; + task *next_waiting = nullptr; + titer pos; + + task() = delete; + template + task(F &&f, SA &&alloc): + func(std::forward(f), std::forward(alloc)) + { + func.tptr = this; + } + + void operator()() { + func(); + } + + void yield(basic_coroutine_scheduler &sched) { + { + std::lock_guard l{sched.p_lock}; + sched.p_available.splice( + sched.p_available.cend(), sched.p_running, pos + ); + } + yield_raw(); + } + + void yield_raw() { + typename coro::yield_type{func}(); + } + + bool dead() const { + return !func; + } + + static task *current() { + auto ctx = coroutine_context::current(); + coro *c = dynamic_cast(ctx); + if (!c) { + std::terminate(); + } + return c->tptr; + } + }; + + struct task_cond { + task_cond() = delete; + task_cond(task_cond const &) = delete; + task_cond(task_cond &&) = delete; + task_cond &operator=(task_cond const &) = delete; + task_cond &operator=(task_cond &&) = delete; + + task_cond(basic_coroutine_scheduler &s): p_sched(s) {} + + template + void wait(L &l) noexcept { + l.unlock(); + task *curr = task::current(); + p_sched.wait(this, p_waiting, curr); + curr->yield_raw(); + l.lock(); + } + + void notify_one() noexcept { + p_sched.notify_one(p_waiting); + } + + void notify_all() noexcept { + p_sched.notify_all(p_waiting); + } + private: + basic_coroutine_scheduler &p_sched; + task *p_waiting = nullptr; + }; + +public: + template + using channel_type = channel; + + basic_coroutine_scheduler( + size_t ss = TR::default_size(), + size_t cs = basic_stack_pool::DEFAULT_CHUNK_SIZE + ): + p_stacks(ss, cs) + {} + + ~basic_coroutine_scheduler() {} + + template + auto start(F &&func, A &&...args) -> std::result_of_t { + /* start with one task in the queue, this way we can + * say we've finished when the task queue becomes empty + */ + using R = std::result_of_t; + if constexpr(std::is_same_v) { + spawn(std::forward(func), std::forward(args)...); + /* actually start the thread pool */ + init(); + destroy(); + } else { + R ret; + spawn([&ret, func = std::forward(func)](auto &&...args) { + ret = func(std::forward(args)...); + }, std::forward(args)...); + init(); + destroy(); + return ret; + } + } + + template + void spawn(F &&func, A &&...args) { + { + std::lock_guard l{p_lock}; + task *t = nullptr; + if constexpr(sizeof...(A) == 0) { + t = &p_available.emplace_back( + [lfunc = std::forward(func)](auto) { + lfunc(); + }, + p_stacks.get_allocator() + ); + } else { + t = &p_available.emplace_back( + [lfunc = std::bind( + std::forward(func), std::forward(args)... + )](auto) mutable { + lfunc(); + }, + p_stacks.get_allocator() + ); + } + t->pos = --p_available.end(); + } + p_cond.notify_one(); + } + + void yield() { + task::current()->yield(*this); + } + + template + channel make_channel() { + return channel{[this]() { + return task_cond{*this}; + }}; + } +private: + void init() { + auto tf = [this]() { + thread_run(); + }; + size_t size = std::thread::hardware_concurrency(); + for (size_t i = 0; i < size; ++i) { + std::thread tid{tf}; + if (!tid.joinable()) { + throw std::runtime_error{"coroutine_scheduler worker failed"}; + } + p_thrs.push_back(std::move(tid)); + } + } + + void destroy() { + p_cond.notify_all(); + for (auto &tid: p_thrs) { + tid.join(); + p_cond.notify_all(); + } + } + + void wait(task_cond *cond, task *&wt, task *t) { + std::lock_guard l{p_lock}; + p_waiting.splice(p_waiting.cbegin(), p_running, t->pos); + t->waiting_on = cond; + t->next_waiting = wt; + wt = t; + } + + void notify_one(task *&wl) { + std::unique_lock l{p_lock}; + if (wl == nullptr) { + return; + } + wl->waiting_on = nullptr; + p_available.splice(p_available.cbegin(), p_waiting, wl->pos); + wl = wl->next_waiting; + l.unlock(); + p_cond.notify_one(); + task::current()->yield(*this); + } + + void notify_all(task *&wl) { + { + std::unique_lock l{p_lock}; + while (wl != nullptr) { + wl->waiting_on = nullptr; + p_available.splice(p_available.cbegin(), p_waiting, wl->pos); + wl = wl->next_waiting; + l.unlock(); + p_cond.notify_one(); + l.lock(); + } + } + task::current()->yield(*this); + } + + void thread_run() { + for (;;) { + std::unique_lock l{p_lock}; + /* wait for an item to become available */ + while (p_available.empty()) { + /* if all lists have become empty, we're done */ + if (p_waiting.empty() && p_running.empty()) { + return; + } + p_cond.wait(l); + } + task_run(l); + } + } + + void task_run(std::unique_lock &l) { + auto it = p_available.begin(); + p_running.splice(p_running.cend(), p_available, it); + task &c = *it; + l.unlock(); + c(); + l.lock(); + if (c.dead()) { + p_running.erase(it); + } + l.unlock(); + } + + std::condition_variable p_cond; + std::mutex p_lock; + std::vector p_thrs; + basic_stack_pool p_stacks; + tlist p_available; + tlist p_waiting; + tlist p_running; +}; + +using coroutine_scheduler = + basic_coroutine_scheduler; + +using protected_coroutine_scheduler = + basic_coroutine_scheduler; + template inline void spawn(S &sched, F &&func, A &&...args) { sched.spawn(std::forward(func), std::forward(args)...);