fixes, bigger stack for main task, take spawn funcs by value

master
Daniel Kolesa 2017-03-22 17:31:57 +01:00
parent c1a1c4a1ac
commit 2537d955d1
4 changed files with 71 additions and 49 deletions

View File

@ -10,6 +10,7 @@
#include <utility>
#include <memory>
#include "ostd/platform.hh"
#include "ostd/coroutine.hh"
#include "ostd/channel.hh"
@ -29,7 +30,7 @@ struct thread_scheduler {
}
template<typename F, typename ...A>
void spawn(F &&func, A &&...args) {
void spawn(F func, A &&...args) {
std::lock_guard<std::mutex> l{p_lock};
p_threads.emplace_front();
auto it = p_threads.begin();
@ -38,7 +39,7 @@ struct thread_scheduler {
func(std::move(args)...);
remove_thread(it);
},
std::forward<F>(func), std::forward<A>(args)...
std::move(func), std::forward<A>(args)...
};
}
@ -125,7 +126,7 @@ public:
p_stacks(ss, cs),
p_dispatcher([this](auto yield_main) {
this->dispatch(yield_main);
}, p_stacks.get_allocator()),
}),
p_coros()
{}
@ -143,14 +144,14 @@ public:
}
template<typename F, typename ...A>
void spawn(F &&func, A &&...args) {
void spawn(F func, A &&...args) {
if constexpr(sizeof...(A) == 0) {
p_coros.emplace_back([lfunc = std::forward<F>(func)](auto) {
p_coros.emplace_back([lfunc = std::move(func)](auto) {
lfunc();
}, p_stacks.get_allocator());
} else {
p_coros.emplace_back([lfunc = std::bind(
std::forward<F>(func), std::forward<A>(args)...
std::move(func), std::forward<A>(args)...
)](auto) mutable {
lfunc();
}, p_stacks.get_allocator());
@ -323,50 +324,39 @@ public:
~basic_coroutine_scheduler() {}
template<typename F, typename ...A>
auto start(F &&func, A &&...args) -> std::result_of_t<F(A...)> {
auto start(F func, A &&...args) -> std::result_of_t<F(A...)> {
/* start with one task in the queue, this way we can
* say we've finished when the task queue becomes empty
*/
using R = std::result_of_t<F(A...)>;
/* the default 64 KiB stack won't cut it for main, allocate a stack
* which matches the size of the process stack outside of the pool
*/
basic_fixedsize_stack<TR, Protected> sa{detail::stack_main_size()};
if constexpr(std::is_same_v<R, void>) {
spawn(std::forward<F>(func), std::forward<A>(args)...);
spawn_add(sa, std::move(func), std::forward<A>(args)...);
/* actually start the thread pool */
init();
destroy();
} else {
R ret;
spawn([&ret, func = std::forward<F>(func)](auto &&...args) {
spawn_add(sa, [&ret, func = std::move(func)](auto &&...args) {
ret = func(std::forward<A>(args)...);
}, std::forward<A>(args)...);
init();
destroy();
return ret;
}
}
template<typename F, typename ...A>
void spawn(F &&func, A &&...args) {
void spawn(F func, A &&...args) {
{
std::lock_guard<std::mutex> l{p_lock};
task *t = nullptr;
if constexpr(sizeof...(A) == 0) {
t = &p_available.emplace_back(
[lfunc = std::forward<F>(func)](auto) {
lfunc();
},
p_stacks.get_allocator()
);
} else {
t = &p_available.emplace_back(
[lfunc = std::bind(
std::forward<F>(func), std::forward<A>(args)...
)](auto) mutable {
lfunc();
},
p_stacks.get_allocator()
);
}
t->pos = --p_available.end();
spawn_add(
p_stacks.get_allocator(), std::move(func),
std::forward<A>(args)...
);
}
p_cond.notify_one();
}
@ -382,23 +372,40 @@ public:
}};
}
private:
void init() {
auto tf = [this]() {
thread_run();
};
size_t size = std::thread::hardware_concurrency();
for (size_t i = 0; i < size; ++i) {
std::thread tid{tf};
if (!tid.joinable()) {
throw std::runtime_error{"coroutine_scheduler worker failed"};
}
p_thrs.push_back(std::move(tid));
template<typename SA, typename F, typename ...A>
void spawn_add(SA &&sa, F &&func, A &&...args) {
task *t = nullptr;
if constexpr(sizeof...(A) == 0) {
t = &p_available.emplace_back(
[lfunc = std::forward<F>(func)](auto) {
lfunc();
},
std::forward<SA>(sa)
);
} else {
t = &p_available.emplace_back(
[lfunc = std::bind(
std::forward<F>(func), std::forward<A>(args)...
)](auto) mutable {
lfunc();
},
std::forward<SA>(sa)
);
}
t->pos = --p_available.end();
}
void destroy() {
for (auto &tid: p_thrs) {
tid.join();
void init() {
size_t size = std::thread::hardware_concurrency();
std::vector<std::thread> thrs;
thrs.reserve(size);
for (size_t i = 0; i < size; ++i) {
thrs.emplace_back([this]() { thread_run(); });
}
for (size_t i = 0; i < size; ++i) {
if (thrs[i].joinable()) {
thrs[i].join();
}
}
}
@ -481,7 +488,6 @@ private:
std::condition_variable p_cond;
std::mutex p_lock;
std::vector<std::thread> p_thrs;
basic_stack_pool<TR, Protected> p_stacks;
tlist p_available;
tlist p_waiting;

View File

@ -43,6 +43,7 @@ namespace detail {
OSTD_EXPORT void *stack_alloc(size_t sz);
OSTD_EXPORT void stack_free(void *p, size_t sz) noexcept;
OSTD_EXPORT void stack_protect(void *p, size_t sz) noexcept;
OSTD_EXPORT size_t stack_main_size() noexcept;
}
template<typename TR, bool Protected>
@ -50,7 +51,11 @@ struct basic_fixedsize_stack {
using traits_type = TR;
basic_fixedsize_stack(size_t ss = TR::default_size()) noexcept:
p_size(std::clamp(ss, TR::minimum_size(), TR::maximum_size()))
p_size(
TR::is_unbounded()
? std::max(ss, TR::minimum_size())
: std::clamp(ss, TR::minimum_size(), TR::maximum_size())
)
{}
stack_context allocate() {

View File

@ -219,8 +219,8 @@ release:
}
stack_context p_stack;
detail::fcontext_t p_coro;
detail::fcontext_t p_orig;
detail::fcontext_t p_coro = nullptr;
detail::fcontext_t p_orig = nullptr;
std::exception_ptr p_except;
state p_state = state::HOLD;
};

View File

@ -74,6 +74,17 @@ namespace detail {
#endif
}
OSTD_EXPORT size_t stack_main_size() noexcept {
#if defined(OSTD_PLATFORM_WIN32)
/* 4 MiB for windows... */
return 4 * 1024 * 1024;
#else
struct rlimit l;
getrlimit(RLIMIT_STACK, &l);
return size_t(l.rlim_cur);
#endif
}
OSTD_EXPORT void stack_protect(void *p, size_t sz) noexcept {
#if defined(OSTD_PLATFORM_WIN32)
DWORD oo;