diff --git a/ostd/concurrency.hh b/ostd/concurrency.hh index 505bef9..5bafaca 100644 --- a/ostd/concurrency.hh +++ b/ostd/concurrency.hh @@ -10,6 +10,7 @@ #include #include +#include "ostd/platform.hh" #include "ostd/coroutine.hh" #include "ostd/channel.hh" @@ -29,7 +30,7 @@ struct thread_scheduler { } template - void spawn(F &&func, A &&...args) { + void spawn(F func, A &&...args) { std::lock_guard l{p_lock}; p_threads.emplace_front(); auto it = p_threads.begin(); @@ -38,7 +39,7 @@ struct thread_scheduler { func(std::move(args)...); remove_thread(it); }, - std::forward(func), std::forward(args)... + std::move(func), std::forward(args)... }; } @@ -125,7 +126,7 @@ public: p_stacks(ss, cs), p_dispatcher([this](auto yield_main) { this->dispatch(yield_main); - }, p_stacks.get_allocator()), + }), p_coros() {} @@ -143,14 +144,14 @@ public: } template - void spawn(F &&func, A &&...args) { + void spawn(F func, A &&...args) { if constexpr(sizeof...(A) == 0) { - p_coros.emplace_back([lfunc = std::forward(func)](auto) { + p_coros.emplace_back([lfunc = std::move(func)](auto) { lfunc(); }, p_stacks.get_allocator()); } else { p_coros.emplace_back([lfunc = std::bind( - std::forward(func), std::forward(args)... + std::move(func), std::forward(args)... )](auto) mutable { lfunc(); }, p_stacks.get_allocator()); @@ -323,50 +324,39 @@ public: ~basic_coroutine_scheduler() {} template - auto start(F &&func, A &&...args) -> std::result_of_t { + auto start(F func, A &&...args) -> std::result_of_t { /* start with one task in the queue, this way we can * say we've finished when the task queue becomes empty */ using R = std::result_of_t; + + /* the default 64 KiB stack won't cut it for main, allocate a stack + * which matches the size of the process stack outside of the pool + */ + basic_fixedsize_stack sa{detail::stack_main_size()}; + if constexpr(std::is_same_v) { - spawn(std::forward(func), std::forward(args)...); + spawn_add(sa, std::move(func), std::forward(args)...); /* actually start the thread pool */ init(); - destroy(); } else { R ret; - spawn([&ret, func = std::forward(func)](auto &&...args) { + spawn_add(sa, [&ret, func = std::move(func)](auto &&...args) { ret = func(std::forward(args)...); }, std::forward(args)...); init(); - destroy(); return ret; } } template - void spawn(F &&func, A &&...args) { + void spawn(F func, A &&...args) { { std::lock_guard l{p_lock}; - task *t = nullptr; - if constexpr(sizeof...(A) == 0) { - t = &p_available.emplace_back( - [lfunc = std::forward(func)](auto) { - lfunc(); - }, - p_stacks.get_allocator() - ); - } else { - t = &p_available.emplace_back( - [lfunc = std::bind( - std::forward(func), std::forward(args)... - )](auto) mutable { - lfunc(); - }, - p_stacks.get_allocator() - ); - } - t->pos = --p_available.end(); + spawn_add( + p_stacks.get_allocator(), std::move(func), + std::forward(args)... + ); } p_cond.notify_one(); } @@ -382,23 +372,40 @@ public: }}; } private: - void init() { - auto tf = [this]() { - thread_run(); - }; - size_t size = std::thread::hardware_concurrency(); - for (size_t i = 0; i < size; ++i) { - std::thread tid{tf}; - if (!tid.joinable()) { - throw std::runtime_error{"coroutine_scheduler worker failed"}; - } - p_thrs.push_back(std::move(tid)); + template + void spawn_add(SA &&sa, F &&func, A &&...args) { + task *t = nullptr; + if constexpr(sizeof...(A) == 0) { + t = &p_available.emplace_back( + [lfunc = std::forward(func)](auto) { + lfunc(); + }, + std::forward(sa) + ); + } else { + t = &p_available.emplace_back( + [lfunc = std::bind( + std::forward(func), std::forward(args)... + )](auto) mutable { + lfunc(); + }, + std::forward(sa) + ); } + t->pos = --p_available.end(); } - void destroy() { - for (auto &tid: p_thrs) { - tid.join(); + void init() { + size_t size = std::thread::hardware_concurrency(); + std::vector thrs; + thrs.reserve(size); + for (size_t i = 0; i < size; ++i) { + thrs.emplace_back([this]() { thread_run(); }); + } + for (size_t i = 0; i < size; ++i) { + if (thrs[i].joinable()) { + thrs[i].join(); + } } } @@ -481,7 +488,6 @@ private: std::condition_variable p_cond; std::mutex p_lock; - std::vector p_thrs; basic_stack_pool p_stacks; tlist p_available; tlist p_waiting; diff --git a/ostd/context_stack.hh b/ostd/context_stack.hh index 7fae8cc..aeaa973 100644 --- a/ostd/context_stack.hh +++ b/ostd/context_stack.hh @@ -43,6 +43,7 @@ namespace detail { OSTD_EXPORT void *stack_alloc(size_t sz); OSTD_EXPORT void stack_free(void *p, size_t sz) noexcept; OSTD_EXPORT void stack_protect(void *p, size_t sz) noexcept; + OSTD_EXPORT size_t stack_main_size() noexcept; } template @@ -50,7 +51,11 @@ struct basic_fixedsize_stack { using traits_type = TR; basic_fixedsize_stack(size_t ss = TR::default_size()) noexcept: - p_size(std::clamp(ss, TR::minimum_size(), TR::maximum_size())) + p_size( + TR::is_unbounded() + ? std::max(ss, TR::minimum_size()) + : std::clamp(ss, TR::minimum_size(), TR::maximum_size()) + ) {} stack_context allocate() { diff --git a/ostd/internal/context.hh b/ostd/internal/context.hh index 398030b..eb21be3 100644 --- a/ostd/internal/context.hh +++ b/ostd/internal/context.hh @@ -219,8 +219,8 @@ release: } stack_context p_stack; - detail::fcontext_t p_coro; - detail::fcontext_t p_orig; + detail::fcontext_t p_coro = nullptr; + detail::fcontext_t p_orig = nullptr; std::exception_ptr p_except; state p_state = state::HOLD; }; diff --git a/src/context_stack.cc b/src/context_stack.cc index 460c4a3..75f7b8d 100644 --- a/src/context_stack.cc +++ b/src/context_stack.cc @@ -74,6 +74,17 @@ namespace detail { #endif } + OSTD_EXPORT size_t stack_main_size() noexcept { +#if defined(OSTD_PLATFORM_WIN32) + /* 4 MiB for windows... */ + return 4 * 1024 * 1024; +#else + struct rlimit l; + getrlimit(RLIMIT_STACK, &l); + return size_t(l.rlim_cur); +#endif + } + OSTD_EXPORT void stack_protect(void *p, size_t sz) noexcept { #if defined(OSTD_PLATFORM_WIN32) DWORD oo;