diff --git a/ostd/concurrency.hh b/ostd/concurrency.hh index 160c1db..bbe6942 100644 --- a/ostd/concurrency.hh +++ b/ostd/concurrency.hh @@ -80,6 +80,62 @@ private: std::mutex p_lock; }; +namespace detail { + struct csched_task; + + OSTD_EXPORT extern thread_local csched_task *current_csched_task; + + struct csched_task: coroutine_context { + friend struct coroutine_context; + + csched_task() = delete; + csched_task(csched_task const &) = delete; + csched_task(csched_task &&) = delete; + csched_task &operator=(csched_task const &) = delete; + csched_task &operator=(csched_task &&) = delete; + + template + csched_task(F &&f, SA &&sa): p_func(std::forward(f)) { + if (!p_func) { + this->set_dead(); + return; + } + this->make_context(sa); + } + + void operator()() { + this->set_exec(); + csched_task *curr = std::exchange(current_csched_task, this); + this->coro_jump(); + current_csched_task = curr; + this->rethrow(); + } + + void yield() { + /* we'll yield back to the thread we were scheduled to, which + * will appropriately notify one or all other waiting threads + * so we either get re-scheduled or the whole scheduler dies + */ + this->yield_jump(); + } + + bool dead() const { + return this->is_dead(); + } + + static csched_task *current() { + return current_csched_task; + } + + private: + void resume_call() { + p_func(); + } + + std::function p_func; + }; +} + template struct basic_simple_coroutine_scheduler { private: @@ -126,9 +182,9 @@ public: size_t cs = basic_stack_pool::DEFAULT_CHUNK_SIZE ): p_stacks(ss, cs), - p_dispatcher([this](auto yield_main) { - this->dispatch(yield_main); - }), + p_dispatcher([this]() { + dispatch(); + }, basic_fixedsize_stack{ss}), p_coros() {} @@ -148,32 +204,27 @@ public: template void spawn(F func, A &&...args) { if constexpr(sizeof...(A) == 0) { - p_coros.emplace_back([lfunc = std::move(func)](auto) { + p_coros.emplace_back([lfunc = std::move(func)]() { lfunc(); }, p_stacks.get_allocator()); } else { p_coros.emplace_back([lfunc = std::bind( std::move(func), std::forward(args)... - )](auto) mutable { + )]() mutable { lfunc(); }, p_stacks.get_allocator()); } } void yield() { - auto ctx = coroutine_context::current(); + auto ctx = detail::csched_task::current(); if (!ctx) { /* yield from main means go to dispatcher and call first task */ p_idx = p_coros.begin(); p_dispatcher(); return; } - coro *c = dynamic_cast(ctx); - if (c) { - typename coro::yield_type{*c}(); - return; - } - throw std::runtime_error{"attempt to yield outside coroutine"}; + ctx->yield(); } template @@ -183,22 +234,18 @@ public: }}; } private: - struct coro: coroutine { - using coroutine::coroutine; - }; - - void dispatch(typename coro::yield_type &yield_main) { + void dispatch() { while (!p_coros.empty()) { if (p_idx == p_coros.end()) { /* we're at the end; it's time to return to main and * continue there (potential yield from main results * in continuing from this point with the first task) */ - yield_main(); + detail::csched_task::current()->yield(); continue; } (*p_idx)(); - if (!*p_idx) { + if (p_idx->dead()) { p_idx = p_coros.erase(p_idx); } else { ++p_idx; @@ -217,9 +264,9 @@ private: } basic_stack_pool p_stacks; - coro p_dispatcher; - std::list p_coros; - typename std::list::iterator p_idx = p_coros.end(); + detail::csched_task p_dispatcher; + std::list p_coros; + typename std::list::iterator p_idx = p_coros.end(); }; using simple_coroutine_scheduler = @@ -228,71 +275,46 @@ using simple_coroutine_scheduler = using protected_simple_coroutine_scheduler = basic_simple_coroutine_scheduler; -namespace detail { - struct csched_task; - - OSTD_EXPORT extern thread_local csched_task *current_csched_task; - - struct csched_task: coroutine_context { - std::function func; - void *waiting_on = nullptr; - csched_task *next_waiting = nullptr; - typename std::list::iterator pos; - - csched_task() = delete; - csched_task(csched_task const &) = delete; - csched_task(csched_task &&) = delete; - csched_task &operator=(csched_task const &) = delete; - csched_task &operator=(csched_task &&) = delete; - - template - csched_task(F &&f, SA &&sa): func(std::forward(f)) { - if (!func) { - this->set_dead(); - return; - } - this->make_context(sa); - } - - void operator()() { - this->set_exec(); - csched_task *curr = std::exchange(current_csched_task, this); - this->coro_jump(); - current_csched_task = curr; - this->rethrow(); - } - - void yield() { - /* we'll yield back to the thread we were scheduled to, which - * will appropriately notify one or all other waiting threads - * so we either get re-scheduled or the whole scheduler dies - */ - this->yield_jump(); - } - - bool dead() const { - return this->is_dead(); - } - - static csched_task *current() { - return current_csched_task; - } - - void resume_call() { - func(); - } - }; -} - template struct basic_coroutine_scheduler { private: struct task_cond; - using task = detail::csched_task; + struct task; using tlist = std::list; using titer = typename tlist::iterator; + struct task { + private: + detail::csched_task p_func; + + public: + void *waiting_on = nullptr; + task *next_waiting = nullptr; + titer pos; + + template + task(F &&f, SA &&sa): + p_func(std::forward(f), std::forward(sa)) + {} + + void operator()() { + p_func(); + } + + void yield() { + p_func.yield(); + } + + bool dead() const { + return p_func.dead(); + } + + static task *current() { + return reinterpret_cast(detail::csched_task::current()); + } + }; + struct task_cond { task_cond() = delete; task_cond(task_cond const &) = delete;