From b4b7224dd05acc1188f2ce764530e5d5324e65e1 Mon Sep 17 00:00:00 2001 From: q66 Date: Sat, 18 Mar 2017 01:05:10 +0100 Subject: [PATCH] initial skeleton for concurrency module --- build.sh | 4 +-- examples/concurrency.cc | 47 +++++++++++++++++++++++++ ostd/concurrency.hh | 78 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 127 insertions(+), 2 deletions(-) create mode 100644 examples/concurrency.cc create mode 100644 ostd/concurrency.hh diff --git a/build.sh b/build.sh index 1c6a9e2..8778e56 100755 --- a/build.sh +++ b/build.sh @@ -5,7 +5,7 @@ set -e # example sources EXAMPLES="format listdir range range_pipe signal" -EXAMPLES="${EXAMPLES} stream1 stream2 coroutine1 coroutine2" +EXAMPLES="${EXAMPLES} stream1 stream2 coroutine1 coroutine2 concurrency" # assembly sources ASM_SOURCE_DIR="src/asm" @@ -136,7 +136,7 @@ if [ ! -z "$CPPFLAGS" ]; then fi # linker flags -OSTD_LDFLAGS="" +OSTD_LDFLAGS="-pthread" # custom linker flags if [ ! -z "$LDFLAGS" ]; then diff --git a/examples/concurrency.cc b/examples/concurrency.cc new file mode 100644 index 0000000..f70bd33 --- /dev/null +++ b/examples/concurrency.cc @@ -0,0 +1,47 @@ +#include +#include + +using namespace ostd; + +thread_scheduler sched; + +void foo() { + auto arr = ostd::iter({ 150, 38, 76, 25, 67, 18, -15, -38, 25, -10 }); + + auto h1 = arr.slice(0, arr.size() / 2); + auto h2 = arr.slice(arr.size() / 2, arr.size()); + + auto c = sched.make_channel(); + auto f = [&c](auto half) { + int ret = 0; + for (int i: half) { + ret += i; + } + c.put(ret); + }; + sched.spawn(f, h1); + sched.spawn(f, h2); + + int a, b; + c.get(a); + c.get(b); + writefln("first half: %s", a); + writefln("second half: %s", b); + writefln("total: %s", a + b); +} + +int main() { + sched.start([]() { + writeln("starting..."); + foo(); + writeln("finishing..."); + }); +} + +/* +starting... +first half: 356 +second half: -20 +total: 336 +finishing... +*/ diff --git a/ostd/concurrency.hh b/ostd/concurrency.hh new file mode 100644 index 0000000..e706211 --- /dev/null +++ b/ostd/concurrency.hh @@ -0,0 +1,78 @@ +/* Concurrency module with custom scheduler support. + * + * This file is part of OctaSTD. See COPYING.md for futher information. + */ + +#ifndef OSTD_CONCURRENCY_HH +#define OSTD_CONCURRENCY_HH + +#include +#include +#include + +#include "ostd/channel.hh" + +namespace ostd { + +struct thread_scheduler { + ~thread_scheduler() { + join_all(); + } + + template + void start(F &&func, A &&...args) { + func(std::forward(args)...); + } + + template + void spawn(F &&func, A &&...args) { + std::unique_lock l{p_lock}; + p_threads.emplace_front(); + auto it = p_threads.begin(); + *it = std::thread{ + [this, it](auto func, auto ...args) { + func(std::move(args)...); + this->remove_thread(it); + }, + std::forward(func), std::forward(args)... + }; + } + + void yield() { + std::this_thread::yield(); + } + + template + channel make_channel() { + return channel{}; + } + +private: + void remove_thread(typename std::list::iterator it) { + std::unique_lock l{p_lock}; + std::thread t{std::exchange(p_dead, std::move(*it))}; + if (t.joinable()) { + t.join(); + } + p_threads.erase(it); + } + + void join_all() { + /* wait for all threads to finish */ + std::unique_lock l{p_lock}; + if (p_dead.joinable()) { + p_dead.join(); + } + for (auto &t: p_threads) { + t.join(); + } + } + + std::list p_threads; + std::thread p_dead; + std::mutex p_lock; +}; + +} /* namespace ostd */ + +#endif