initial skeleton for concurrency module

master
Daniel Kolesa 2017-03-18 01:05:10 +01:00
parent 0da22e777d
commit b4b7224dd0
3 changed files with 127 additions and 2 deletions

View File

@ -5,7 +5,7 @@ set -e
# example sources
EXAMPLES="format listdir range range_pipe signal"
EXAMPLES="${EXAMPLES} stream1 stream2 coroutine1 coroutine2"
EXAMPLES="${EXAMPLES} stream1 stream2 coroutine1 coroutine2 concurrency"
# assembly sources
ASM_SOURCE_DIR="src/asm"
@ -136,7 +136,7 @@ if [ ! -z "$CPPFLAGS" ]; then
fi
# linker flags
OSTD_LDFLAGS=""
OSTD_LDFLAGS="-pthread"
# custom linker flags
if [ ! -z "$LDFLAGS" ]; then

View File

@ -0,0 +1,47 @@
#include <ostd/io.hh>
#include <ostd/concurrency.hh>
using namespace ostd;
thread_scheduler sched;
void foo() {
auto arr = ostd::iter({ 150, 38, 76, 25, 67, 18, -15, -38, 25, -10 });
auto h1 = arr.slice(0, arr.size() / 2);
auto h2 = arr.slice(arr.size() / 2, arr.size());
auto c = sched.make_channel<int>();
auto f = [&c](auto half) {
int ret = 0;
for (int i: half) {
ret += i;
}
c.put(ret);
};
sched.spawn(f, h1);
sched.spawn(f, h2);
int a, b;
c.get(a);
c.get(b);
writefln("first half: %s", a);
writefln("second half: %s", b);
writefln("total: %s", a + b);
}
int main() {
sched.start([]() {
writeln("starting...");
foo();
writeln("finishing...");
});
}
/*
starting...
first half: 356
second half: -20
total: 336
finishing...
*/

View File

@ -0,0 +1,78 @@
/* Concurrency module with custom scheduler support.
*
* This file is part of OctaSTD. See COPYING.md for futher information.
*/
#ifndef OSTD_CONCURRENCY_HH
#define OSTD_CONCURRENCY_HH
#include <stdio.h>
#include <thread>
#include <utility>
#include "ostd/channel.hh"
namespace ostd {
struct thread_scheduler {
~thread_scheduler() {
join_all();
}
template<typename F, typename ...A>
void start(F &&func, A &&...args) {
func(std::forward<A>(args)...);
}
template<typename F, typename ...A>
void spawn(F &&func, A &&...args) {
std::unique_lock<std::mutex> l{p_lock};
p_threads.emplace_front();
auto it = p_threads.begin();
*it = std::thread{
[this, it](auto func, auto ...args) {
func(std::move(args)...);
this->remove_thread(it);
},
std::forward<F>(func), std::forward<A>(args)...
};
}
void yield() {
std::this_thread::yield();
}
template<typename T>
channel<T> make_channel() {
return channel<T>{};
}
private:
void remove_thread(typename std::list<std::thread>::iterator it) {
std::unique_lock<std::mutex> l{p_lock};
std::thread t{std::exchange(p_dead, std::move(*it))};
if (t.joinable()) {
t.join();
}
p_threads.erase(it);
}
void join_all() {
/* wait for all threads to finish */
std::unique_lock<std::mutex> l{p_lock};
if (p_dead.joinable()) {
p_dead.join();
}
for (auto &t: p_threads) {
t.join();
}
}
std::list<std::thread> p_threads;
std::thread p_dead;
std::mutex p_lock;
};
} /* namespace ostd */
#endif