Removed spawn and task from the parser. Updated all the tests except for the benchmarks.

This commit is contained in:
Eric Holk 2011-08-13 15:20:11 -07:00
parent c3535f5842
commit be7325073a
36 changed files with 368 additions and 266 deletions

View File

@ -475,8 +475,10 @@ fn parse_ty(p: &parser) -> @ast::ty {
t = ast::ty_istr;
} else if (eat_word(p, "char")) {
t = ast::ty_char;
/*
} else if (eat_word(p, "task")) {
t = ast::ty_task;
*/
} else if (eat_word(p, "i8")) {
t = ast::ty_machine(ast::ty_i8);
} else if (eat_word(p, "i16")) {
@ -804,8 +806,10 @@ fn parse_bottom_expr(p: &parser) -> @ast::expr {
ret parse_do_while_expr(p);
} else if (eat_word(p, "alt")) {
ret parse_alt_expr(p);
/*
} else if (eat_word(p, "spawn")) {
ret parse_spawn_expr(p);
*/
} else if (eat_word(p, "fn")) {
ret parse_fn_expr(p, ast::proto_fn);
} else if (eat_word(p, "block")) {

View File

@ -1,4 +1,5 @@
import str::sbuf;
import task;
native "rust" mod rustrt {
type socket;
@ -102,7 +103,7 @@ fn server_task(ip: str, portnum: int, events: chan[server_event],
log "client was actually null, returning";
ret;
} else {
spawn accept_task(client, events);
task::_spawn(bind accept_task(client, events));
}
}
}
@ -126,10 +127,10 @@ fn request_task(c: chan[ctx]) {
ret;
}
connect(remote(ip,portnum),client) {
spawn connect_task(ip, portnum, client);
task::_spawn(bind connect_task(ip, portnum, client));
}
serve(ip,portnum,events,server) {
spawn server_task(ip, portnum, events, server);
task::_spawn(bind server_task(ip, portnum, events, server));
}
write(socket,v,status) {
rustrt::aio_writedata(socket,
@ -155,19 +156,19 @@ fn iotask(c: chan[ctx]) {
log "io task init";
// Spawn our request task
let reqtask: task = spawn request_task(c);
let reqtask = task::_spawn(bind request_task(c));
log "uv run task init";
// Enter IO loop. This never returns until aio_stop is called.
rustrt::aio_run();
log "waiting for request task to finish";
task::join(reqtask);
task::join_id(reqtask);
}
fn new() -> ctx {
let p: port[ctx] = port();
let t: task = spawn iotask(chan(p));
let t = task::_spawn(bind iotask(chan(p)));
let cx: ctx;
p |> cx;
ret cx;

View File

@ -43,9 +43,12 @@ fn yield() { ret rustrt::task_yield(); }
tag task_result { tr_success; tr_failure; }
// FIXME: Re-enable this once the task type is removed from the compiler.
/*
fn join(t: task) -> task_result {
join_id(cast(t))
}
*/
fn join_id(t : task_id) -> task_result {
alt rustrt::task_join(t) { 0 { tr_success } _ { tr_failure } }

View File

@ -4,6 +4,7 @@
// while providing a base that other test frameworks may build off of.
import generic_os::getenv;
import task::task_id;
export test_name;
export test_fn;
@ -94,7 +95,7 @@ tag test_result { tr_ok; tr_failed; tr_ignored; }
// In cases where test functions and closures it is not ok to just dump them
// into a task and run them, so this transformation gives the caller a chance
// to create the test task.
type test_to_task = fn(&fn()) -> task ;
type test_to_task = fn(&fn()) -> task_id ;
// A simple console test runner
fn run_tests_console(opts: &test_opts, tests: &[test_desc]) -> bool {
@ -318,12 +319,12 @@ fn run_test(test: &test_desc, to_task: &test_to_task) -> test_future {
ret {test: test,
fnref: fnref,
wait:
bind fn (test_task: &task) -> test_result {
alt task::join(test_task) {
task::tr_success. { tr_ok }
task::tr_failure. { tr_failed }
}
}(test_task)};
bind fn (test_task: task_id) -> test_result {
alt task::join_id(test_task) {
task::tr_success. { tr_ok }
task::tr_failure. { tr_failed }
}
}(test_task)};
} else {
ret {test: test,
fnref: fnref,
@ -335,14 +336,14 @@ fn run_test(test: &test_desc, to_task: &test_to_task) -> test_future {
// But, at least currently, functions can't be used as spawn arguments so
// we've got to treat our test functions as unsafe pointers. This function
// only works with functions that don't contain closures.
fn default_test_to_task(f: &fn()) -> task {
fn default_test_to_task(f: &fn()) -> task_id {
fn run_task(fptr: *mutable fn() ) {
configure_test_task();
// Run the test
(*fptr)()
}
let fptr = ptr::addr_of(f);
ret spawn run_task(fptr);
ret task::_spawn(bind run_task(fptr));
}
// Call from within a test task to make sure it's set up correctly

View File

@ -4,7 +4,7 @@
// NB: please do not commit code with this uncommented. It's
// hugely expensive and should only be used as a last resort.
//
// #define TRACK_ALLOCATIONS
#define TRACK_ALLOCATIONS
#define MAGIC 0xbadc0ffe

View File

@ -107,12 +107,19 @@ struct spawn_args {
uintptr_t, uintptr_t);
};
struct rust_closure {
intptr_t ref_count;
type_desc *td;
};
extern "C" CDECL
void task_exit(intptr_t *env, int rval, rust_task *task) {
void task_exit(rust_closure *env, int rval, rust_task *task) {
LOG(task, task, "task exited with value %d", rval);
if(env) {
// free the environment.
I(task->sched, 1 == *env); // the ref count better be 1
I(task->sched, 1 == env->ref_count); // the ref count better be 1
//env->td->drop_glue(NULL, task, NULL, env->td->first_param, env);
//env->td->free_glue(NULL, task, NULL, env->td->first_param, env);
task->free(env);
}
task->die();

View File

@ -1,16 +0,0 @@
// error-pattern:spawning functions with type params not allowed
fn main() {
fn echo[T](c: chan[T], oc: chan[chan[T]]) {
let p: port[T] = port();
oc <| chan(p);
let x;
p |> x;
c <| x;
}
let p = port[int]();
let p2 = port[chan[int]]();
spawn echo(chan(p), chan(p2));
}

View File

@ -1,5 +1,8 @@
// error-pattern: mismatched types
use std;
import std::task;
fn f(x: int) -> int { ret x; }
fn main() { spawn f(10); }
fn main() { task::_spawn(bind f(10)); }

View File

@ -5,6 +5,13 @@ import std::fs;
import std::str;
import std::ivec;
import std::task;
import std::task::task_id;
import std::comm;
import std::comm::_port;
import std::comm::_chan;
import std::comm::send;
import std::comm::mk_port;
import common::cx;
import common::config;
@ -116,11 +123,11 @@ fn test_opts(config: &config) -> test::test_opts {
}
type tests_and_conv_fn =
{tests: [test::test_desc], to_task: fn(&fn() ) -> task };
{tests: [test::test_desc], to_task: fn(&fn() ) -> task_id };
fn make_tests(cx: &cx) -> tests_and_conv_fn {
log #fmt("making tests from %s", cx.config.src_base);
let configport = port[str]();
let configport = mk_port[[u8]]();
let tests = ~[];
for file: str in fs::list_dir(cx.config.src_base) {
log #fmt("inspecting file %s", file);
@ -153,11 +160,11 @@ fn is_test(config: &config, testfile: &str) -> bool {
ret valid;
}
fn make_test(cx: &cx, testfile: &str, configport: &port[str]) ->
fn make_test(cx: &cx, testfile: &str, configport: &_port[[u8]]) ->
test::test_desc {
{name: make_test_name(cx.config, testfile),
fn: make_test_closure(testfile, chan(configport)),
ignore: header::is_test_ignored(cx.config, testfile)}
fn: make_test_closure(testfile, configport.mk_chan()),
ignore: header::is_test_ignored(cx.config, testfile)}
}
fn make_test_name(config: &config, testfile: &str) -> str {
@ -183,12 +190,13 @@ up. Then we'll spawn that data into another task and return the task.
Really convoluted. Need to think up of a better definition for tests.
*/
fn make_test_closure(testfile: &str, configchan: chan[str]) -> test::test_fn {
fn make_test_closure(testfile: &str, configchan: _chan[[u8]]) -> test::test_fn
{
bind send_config(testfile, configchan)
}
fn send_config(testfile: str, configchan: chan[str]) {
task::send(configchan, testfile);
fn send_config(testfile: str, configchan: _chan[[u8]]) {
send(configchan, str::bytes(testfile));
}
/*
@ -201,24 +209,25 @@ break up the config record and pass everything individually to the spawned
function.
*/
fn closure_to_task(cx: cx, configport: port[str], testfn: &fn() ) -> task {
fn closure_to_task(cx: cx, configport: _port[[u8]], testfn: &fn() ) -> task_id
{
testfn();
let testfile = task::recv(configport);
ret spawn run_test_task(cx.config.compile_lib_path,
let testfile = configport.recv();
ret task::_spawn(bind run_test_task(cx.config.compile_lib_path,
cx.config.run_lib_path, cx.config.rustc_path,
cx.config.src_base, cx.config.build_base,
cx.config.stage_id, mode_str(cx.config.mode),
cx.config.run_ignored, opt_str(cx.config.filter),
opt_str(cx.config.runtool),
opt_str(cx.config.rustcflags), cx.config.verbose,
task::clone_chan(cx.procsrv.chan), testfile);
cx.procsrv.chan, testfile));
}
fn run_test_task(compile_lib_path: str, run_lib_path: str, rustc_path: str,
src_base: str, build_base: str, stage_id: str, mode: str,
run_ignored: bool, opt_filter: str, opt_runtool: str,
opt_rustcflags: str, verbose: bool,
procsrv_chan: procsrv::reqchan, testfile: str) {
procsrv_chan: procsrv::reqchan, testfile: -[u8]) {
let config =
{compile_lib_path: compile_lib_path,

View File

@ -6,14 +6,18 @@
import std::option;
import std::task;
import std::task::task_id;
import std::generic_os::setenv;
import std::generic_os::getenv;
import std::ivec;
import std::os;
import std::run;
import std::unsafe;
import std::io;
import std::str;
import std::comm::_chan;
import std::comm::mk_port;
import std::comm::_port;
import std::comm::send;
export handle;
export mk;
@ -22,47 +26,47 @@ export run;
export close;
export reqchan;
type reqchan = chan[request];
type reqchan = _chan[request];
type handle = {task: option::t[task], chan: reqchan};
type handle = {task: option::t[task_id], chan: reqchan};
tag request {
exec(str, str, [str], chan[response]);
exec([u8], [u8], [[u8]], _chan[response]);
stop;
}
type response = {pid: int, infd: int, outfd: int, errfd: int};
fn mk() -> handle {
let setupport = port();
let task = spawn fn(setupchan: chan[chan[request]]) {
let reqport = port();
let reqchan = chan(reqport);
task::send(setupchan, task::clone_chan(reqchan));
let setupport = mk_port();
let task = task::_spawn(bind fn(setupchan: _chan[_chan[request]]) {
let reqport = mk_port();
let reqchan = reqport.mk_chan();
send(setupchan, reqchan);
worker(reqport);
} (chan(setupport));
} (setupport.mk_chan()));
ret {task: option::some(task),
chan: task::recv(setupport)
chan: setupport.recv()
};
}
fn from_chan(ch: &reqchan) -> handle { {task: option::none, chan: ch} }
fn close(handle: &handle) {
task::send(handle.chan, stop);
task::join(option::get(handle.task));
send(handle.chan, stop);
task::join_id(option::get(handle.task));
}
fn run(handle: &handle, lib_path: &str,
prog: &str, args: &[str], input: &option::t[str]) ->
{status: int, out: str, err: str} {
let p = port[response]();
let ch = chan(p);
task::send(handle.chan, exec(lib_path,
prog,
clone_ivecstr(args),
task::clone_chan(ch)));
let resp = task::recv(p);
let p = mk_port[response]();
let ch = p.mk_chan();
send(handle.chan, exec(str::bytes(lib_path),
str::bytes(prog),
clone_ivecstr(args),
ch));
let resp = p.recv();
writeclose(resp.infd, input);
let output = readclose(resp.outfd);
@ -95,7 +99,7 @@ fn readclose(fd: int) -> str {
ret buf;
}
fn worker(p: port[request]) {
fn worker(p: _port[request]) {
// FIXME (787): If we declare this inside of the while loop and then
// break out of it before it's ever initialized (i.e. we don't run
@ -105,8 +109,7 @@ fn worker(p: port[request]) {
lib_path: "",
prog: "",
args: ~[],
// This works because a NULL box is ignored during cleanup
respchan: unsafe::reinterpret_cast(0)
respchan: p.mk_chan()
};
while true {
@ -121,12 +124,12 @@ fn worker(p: port[request]) {
// put the entire alt in another block to make sure the exec
// message goes out of scope. Seems like the scoping rules for
// the alt discriminant are wrong.
alt task::recv(p) {
alt p.recv() {
exec(lib_path, prog, args, respchan) {
{
lib_path: clone_str(lib_path),
prog: clone_str(prog),
args: clone_ivecstr(args),
lib_path: str::unsafe_from_bytes(lib_path),
prog: str::unsafe_from_bytes(prog),
args: clone_ivecu8str(args),
respchan: respchan
}
}
@ -156,11 +159,11 @@ fn worker(p: port[request]) {
fail;
}
task::send(execparms.respchan,
{pid: pid,
infd: pipe_in.out,
outfd: pipe_out.in,
errfd: pipe_err.in});
send(execparms.respchan,
{pid: pid,
infd: pipe_in.out,
outfd: pipe_out.in,
errfd: pipe_err.in});
}
}
@ -181,19 +184,18 @@ fn append_lib_path(path: &str) { export_lib_path(util::make_new_path(path)); }
fn export_lib_path(path: &str) { setenv(util::lib_path_env_var(), path); }
fn clone_str(s: &str) -> str {
let new = s + "";
// new should be a different pointer
let sptr: int = unsafe::reinterpret_cast(s);
let newptr: int = unsafe::reinterpret_cast(new);
assert sptr != newptr;
new
}
fn clone_ivecstr(v: &[str]) -> [str] {
let r = ~[];
fn clone_ivecstr(v: &[str]) -> [[u8]] {
let r = [];
for t: str in ivec::slice(v, 0u, ivec::len(v)) {
r += ~[clone_str(t)];
r += [str::bytes(t)];
}
ret r;
}
fn clone_ivecu8str(v: &[[u8]]) -> [str] {
let r = [];
for t in ivec::slice(v, 0u, ivec::len(v)) {
r += [str::unsafe_from_bytes(t)];
}
ret r;
}

View File

@ -18,7 +18,8 @@ import util::logv;
export run;
fn run(cx: &cx, testfile: &str) {
fn run(cx: &cx, _testfile: -[u8]) {
let testfile = str::unsafe_from_bytes(_testfile);
test::configure_test_task();
if (cx.config.verbose) {
// We're going to be dumping a lot of info. Start on a new line.

View File

@ -3,6 +3,13 @@
// error-pattern:1 == 2
// no-valgrind
use std;
import std::task;
fn child() { assert (1 == 2); }
fn main() { let p: port[int] = port(); spawn child(); let x: int; p |> x; }
fn main() {
let p: port[int] = port();
task::_spawn(bind child());
let x: int; p |> x;
}

View File

@ -3,14 +3,18 @@
// xfail-stage3
// -*- rust -*-
fn f(c: chan[int]) {
use std;
import std::comm;
import std::task;
fn f(c: comm::_chan[int]) {
type t = {_0: int, _1: int, _2: int};
// Allocate a box.
let x: @t = @{_0: 1, _1: 2, _2: 3};
// Signal parent that we've allocated a box.
c <| 1;
comm::send(c, 1);
while true {
@ -21,18 +25,17 @@ fn f(c: chan[int]) {
// sending to the channel are never received
// by the parent, therefore this test cases drops
// messages on the floor
c <| 1;
comm::send(c, 1);
}
}
fn main() {
let p: port[int] = port();
spawn f(chan(p));
let p = comm::mk_port();
task::_spawn(bind f(p.mk_chan()));
let i: int;
// synchronize on event from child.
p |> i;
i = p.recv();
log "parent exiting, killing child";
}

View File

@ -1,23 +1,29 @@
// -*- rust -*-
fn a(c: chan[int]) { c <| 10; }
use std;
import std::comm::_chan;
import std::comm::mk_port;
import std::comm::send;
import std::task;
fn a(c: _chan[int]) { send(c, 10); }
fn main() {
let p: port[int] = port();
spawn a(chan(p));
spawn b(chan(p));
let p = mk_port();
task::_spawn(bind a(p.mk_chan()));
task::_spawn(bind b(p.mk_chan()));
let n: int = 0;
p |> n;
p |> n;
n = p.recv();
n = p.recv();
// log "Finished.";
}
fn b(c: chan[int]) {
fn b(c: _chan[int]) {
// log "task b0";
// log "task b1";
// log "task b2";
// log "task b3";
// log "task b4";
// log "task b5";
c <| 10;
send(c, 10);
}

View File

@ -1,22 +1,28 @@
// -*- rust -*-
fn a(c: chan[int]) { log "task a0"; log "task a1"; c <| 10; }
use std;
import std::comm;
import std::comm::send;
import std::comm::_chan;
import std::task;
fn a(c: _chan[int]) { log "task a0"; log "task a1"; send(c, 10); }
fn main() {
let p: port[int] = port();
spawn a(chan(p));
spawn b(chan(p));
let p = comm::mk_port();
task::_spawn(bind a(p.mk_chan()));
task::_spawn(bind b(p.mk_chan()));
let n: int = 0;
p |> n;
p |> n;
n = p.recv();
n = p.recv();
log "Finished.";
}
fn b(c: chan[int]) {
fn b(c: _chan[int]) {
log "task b0";
log "task b1";
log "task b2";
log "task b2";
log "task b3";
c <| 10;
send(c, 10);
}

View File

@ -1,6 +1,12 @@
// -*- rust -*-
fn a(c: chan[int]) {
use std;
import std::comm;
import std::comm::send;
import std::comm::_chan;
import std::task;
fn a(c: _chan[int]) {
if true {
log "task a";
log "task a";
@ -8,7 +14,7 @@ fn a(c: chan[int]) {
log "task a";
log "task a";
}
c <| 10;
send(c, 10);
}
fn k(x: int) -> int { ret 15; }
@ -18,19 +24,19 @@ fn g(x: int, y: str) -> int { log x; log y; let z: int = k(1); ret z; }
fn main() {
let n: int = 2 + 3 * 7;
let s: str = "hello there";
let p: port[int] = port();
spawn a(chan(p));
spawn b(chan(p));
let p = comm::mk_port();
task::_spawn(bind a(p.mk_chan()));
task::_spawn(bind b(p.mk_chan()));
let x: int = 10;
x = g(n, s);
log x;
p |> n;
p |> n;
n = p.recv();
n = p.recv();
// FIXME: use signal-channel for this.
log "children finished, root finishing";
}
fn b(c: chan[int]) {
fn b(c: _chan[int]) {
if true {
log "task b";
log "task b";
@ -39,5 +45,5 @@ fn b(c: chan[int]) {
log "task b";
log "task b";
}
c <| 10;
send(c, 10);
}

View File

@ -0,0 +1,14 @@
/*
Can we bind native things?
*/
//xfail-stage0
//xfail-stage1
//xfail-stage2
//xfail-stage3
native "rust" mod rustrt {
fn task_yield();
}
fn main() { bind rustrt::task_yield(); }

View File

@ -2,6 +2,7 @@
use std;
import std::unsafe::reinterpret_cast;
import std::task;
fn test_nil() {
assert (() == ());
@ -82,8 +83,8 @@ fn test_ptr() {
fn test_task() {
fn f() { }
let t1 = spawn f();
let t2 = spawn f();
let t1 = task::_spawn(bind f());
let t2 = task::_spawn(bind f());
assert t1 == t1;
assert t1 != t2;

View File

@ -2,35 +2,39 @@
use std;
import std::task;
import std::comm::_chan;
import std::comm::send;
import std::comm;
import std::comm::mk_port;
tag request {
quit;
close(chan[bool]);
close(_chan[bool]);
}
type ctx = chan[request];
type ctx = _chan[request];
fn request_task(c: chan[ctx]) {
let p: port[request] = port();
c <| chan(p);
fn request_task(c: _chan[ctx]) {
let p = mk_port();
send(c, p.mk_chan());
let req: request;
p |> req;
req = p.recv();
// Need to drop req before receiving it again
p |> req;
req = p.recv();
}
fn new() -> ctx {
let p: port[ctx] = port();
let t = spawn request_task(chan(p));
let p = mk_port();
let t = task::_spawn(bind request_task(p.mk_chan()));
let cx: ctx;
p |> cx;
cx = p.recv();
ret cx;
}
fn main() {
let cx = new();
let p: port[bool] = port();
cx <| close(chan(p));
cx <| quit;
let p = mk_port[bool]();
send(cx, close(p.mk_chan()));
send(cx, quit);
}

View File

@ -1,4 +1,8 @@
// Reported as issue #126, child leaks the string.
use std;
import std::task;
fn child2(s: str) { }
fn main() { let x = spawn child2("hi"); }
fn main() { let x = task::_spawn(bind child2("hi")); }

View File

@ -1,13 +1,18 @@
// xfail-stage1
// xfail-stage2
// xfail-stage3
fn f(@rec(int a, int b) x) {
assert (x.a == 10);
assert (x.b == 12);
//xfail-stage0
//xfail-stage1
//xfail-stage2
//xfail-stage3
use std;
import std::task;
fn f(x : @{a:int, b:int}) {
assert (x.a == 10);
assert (x.b == 12);
}
fn main() {
let @rec(int a, int b) z = rec(a=10, b=12);
let task p = spawn thread f(z);
join p;
let z : @{a:int, b:int} = @{ a : 10, b : 12};
let p = task::_spawn(bind f(z));
task::join_id(p);
}

View File

@ -1,13 +1,22 @@
// -*- rust -*-
use std;
import std::comm;
import std::comm::_chan;
import std::comm::send;
import std::task;
fn main() {
let p: port[int] = port();
let t: task = spawn child(chan(p));
let y: int;
p |> y;
let p = comm::mk_port();
let t = task::_spawn(bind child(p.mk_chan()));
let y = p.recv();
log_err "received";
log_err y;
assert (y == 10);
}
fn child(c: chan[int]) { log_err "sending"; c <| 10; log_err "value sent" }
fn child(c: _chan[int]) {
log_err "sending";
send(c, 10);
log_err "value sent"
}

View File

@ -12,6 +12,12 @@ import std::option::none;
import std::str;
import std::ivec;
import std::map;
import std::task;
import std::comm::_chan;
import std::comm::_port;
import std::comm::send;
import std::comm::mk_port;
import std::comm;
fn map(filename: str, emit: map_reduce::putter) { emit(filename, "1"); }
@ -24,27 +30,27 @@ mod map_reduce {
type mapper = fn(str, putter) ;
tag ctrl_proto { find_reducer(str, chan[int]); mapper_done; }
tag ctrl_proto { find_reducer([u8], _chan[int]); mapper_done; }
fn start_mappers(ctrl: chan[ctrl_proto], inputs: &[str]) {
for i: str in inputs { spawn map_task(ctrl, i); }
fn start_mappers(ctrl: _chan[ctrl_proto], inputs: &[str]) {
for i: str in inputs { task::_spawn(bind map_task(ctrl, i)); }
}
fn map_task(ctrl: chan[ctrl_proto], input: str) {
fn map_task(ctrl: _chan[ctrl_proto], input: str) {
let intermediates = map::new_str_hash();
fn emit(im: &map::hashmap[str, int], ctrl: chan[ctrl_proto], key: str,
val: str) {
fn emit(im: &map::hashmap[str, int], ctrl: _chan[ctrl_proto],
key: str, val: str) {
let c;
alt im.find(key) {
some(_c) { c = _c }
none. {
let p = port();
let p = mk_port();
log_err "sending find_reducer";
ctrl <| find_reducer(key, chan(p));
send(ctrl, find_reducer(str::bytes(key), p.mk_chan()));
log_err "receiving";
p |> c;
c = p.recv();
log_err c;
im.insert(key, c);
}
@ -52,11 +58,11 @@ mod map_reduce {
}
map(input, bind emit(intermediates, ctrl, _, _));
ctrl <| mapper_done;
send(ctrl, mapper_done);
}
fn map_reduce(inputs: &[str]) {
let ctrl = port[ctrl_proto]();
let ctrl = mk_port[ctrl_proto]();
// This task becomes the master control task. It spawns others
// to do the rest.
@ -65,22 +71,20 @@ mod map_reduce {
reducers = map::new_str_hash();
start_mappers(chan(ctrl), inputs);
start_mappers(ctrl.mk_chan(), inputs);
let num_mappers = ivec::len(inputs) as int;
while num_mappers > 0 {
let m;
ctrl |> m;
alt m {
alt ctrl.recv() {
mapper_done. { num_mappers -= 1; }
find_reducer(k, cc) {
let c;
alt reducers.find(k) { some(_c) { c = _c; } none. { c = 0; } }
cc <| c;
alt reducers.find(str::unsafe_from_bytes(k)) {
some(_c) { c = _c; }
none. { c = 0; }
}
send(cc, c);
}
}
}

View File

@ -2,9 +2,15 @@
A reduced test case for Issue #506, provided by Rob Arnold.
*/
use std;
import std::task;
native "rust" mod rustrt {
fn task_yield();
}
fn main() { spawn rustrt::task_yield(); }
fn yield_wrap() {
rustrt::task_yield();
}
fn main() { task::_spawn(bind yield_wrap()); }

View File

@ -7,26 +7,29 @@
use std;
import std::task::join;
import std::task;
import std::task::join_id;
import std::comm;
import std::comm::_chan;
import std::comm::send;
fn grandchild(c: chan[int]) { c <| 42; }
fn grandchild(c: _chan[int]) { send(c, 42); }
fn child(c: chan[int]) {
let _grandchild = spawn grandchild(c);
join(_grandchild);
fn child(c: _chan[int]) {
let _grandchild = task::_spawn(bind grandchild(c));
join_id(_grandchild);
}
fn main() {
let p: port[int] = port();
let p = comm::mk_port();
let _child = spawn child(chan(p));
let _child = task::_spawn(bind child(p.mk_chan()));
let x: int;
p |> x;
let x: int = p.recv();
log x;
assert (x == 42);
join(_child);
join_id(_child);
}

View File

@ -1,49 +1,52 @@
use std;
import std::ivec;
import std::task;
import std::comm;
import std::comm::_chan;
import std::comm::_port;
import std::comm::mk_port;
import std::comm::send;
tag msg { closed; received([u8]); }
fn producer(c: chan[[u8]]) {
c <| ~[1u8, 2u8, 3u8, 4u8];
let empty: [u8] = ~[];
c <| empty;
fn producer(c: _chan[[u8]]) {
send(c, [1u8, 2u8, 3u8, 4u8]);
let empty: [u8] = [];
send(c, empty);
}
fn packager(cb: chan[chan[[u8]]], msg: chan[msg]) {
let p: port[[u8]] = port();
cb <| chan(p);
fn packager(cb: _chan[_chan[[u8]]], msg: _chan[msg]) {
let p: _port[[u8]] = mk_port();
send(cb, p.mk_chan());
while true {
log "waiting for bytes";
let data: [u8];
p |> data;
let data = p.recv();
log "got bytes";
if ivec::len[u8](data) == 0u {
if ivec::len(data) == 0u {
log "got empty bytes, quitting";
break;
}
log "sending non-empty buffer of length";
log ivec::len[u8](data);
msg <| received(data);
log ivec::len(data);
send(msg, received(data));
log "sent non-empty buffer";
}
log "sending closed message";
msg <| closed;
send(msg, closed);
log "sent closed message";
}
fn main() {
let p: port[msg] = port();
let recv_reader: port[chan[[u8]]] = port();
let pack = spawn packager(chan(recv_reader), chan(p));
let source_chan: chan[[u8]];
recv_reader |> source_chan;
let prod: task = spawn producer(source_chan);
let p: _port[msg] = mk_port();
let recv_reader: _port[_chan[[u8]]] = mk_port();
let pack = task::_spawn(bind packager(recv_reader.mk_chan(),
p.mk_chan()));
let source_chan: _chan[[u8]] = recv_reader.recv();
let prod = task::_spawn(bind producer(source_chan));
while true {
let msg: msg;
p |> msg;
let msg = p.recv();
alt msg {
closed. { log "Got close message"; break; }
received(data) {

View File

@ -1,14 +1,20 @@
use std;
fn producer(c: chan[[u8]]) {
c <| ~[1u8, 2u8, 3u8, 4u8, 5u8, 6u8, 7u8,
8u8, 9u8, 10u8, 11u8, 12u8, 13u8 ];
import std::task;
import std::comm;
import std::comm::_chan;
import std::comm::_port;
import std::comm::mk_port;
import std::comm::send;
fn producer(c: _chan[[u8]]) {
send(c, [1u8, 2u8, 3u8, 4u8, 5u8, 6u8, 7u8,
8u8, 9u8, 10u8, 11u8, 12u8, 13u8 ]);
}
fn main() {
let p: port[[u8]] = port();
let prod: task = spawn producer(chan(p));
let p: _port[[u8]] = mk_port();
let prod = task::_spawn(bind producer(p.mk_chan()));
let data: [u8];
p |> data;
let data: [u8] = p.recv();
}

View File

@ -5,10 +5,10 @@ use std;
import std::task::*;
fn main() {
let other = spawn child();
let other = _spawn(bind child());
log_err "1";
yield();
join(other);
join_id(other);
log_err "3";
}

View File

@ -1,20 +1,24 @@
// -*- rust -*-
use std;
import std::task;
import std::comm::*;
fn main() {
let p: port[int] = port();
let p = mk_port();
let y: int;
spawn child(chan(p));
p |> y;
task::_spawn(bind child(p.mk_chan()));
y = p.recv();
log "received 1";
log y;
assert (y == 10);
spawn child(chan(p));
p |> y;
task::_spawn(bind child(p.mk_chan()));
y = p.recv();
log "received 2";
log y;
assert (y == 10);
}
fn child(c: chan[int]) { c <| 10; }
fn child(c: _chan[int]) { send(c, 10); }

View File

@ -1,20 +1,24 @@
// -*- rust -*-
fn sub(parent: chan[int], id: int) {
use std;
import std::task;
import std::comm;
fn sub(parent: comm::_chan[int], id: int) {
if (id == 0) {
parent <| 0;
comm::send(parent, 0);
} else {
let p: port[int] = port();
let child = spawn sub(chan(p), id-1);
let y: int; p |> y;
parent <| y + 1;
let p = comm::mk_port();
let child = task::_spawn(bind sub(p.mk_chan(), id-1));
let y = p.recv();
comm::send(parent, y + 1);
}
}
fn main() {
let p: port[int] = port();
let child = spawn sub(chan(p), 200);
let y: int; p |> y;
let p = comm::mk_port();
let child = task::_spawn(bind sub(p.mk_chan(), 200));
let y = p.recv();
log "transmission complete";
log y;
assert (y == 200);

View File

@ -2,13 +2,14 @@
use std;
import std::task::yield;
import std::task;
fn x(s: str, n: int) { log s; log n; }
fn main() {
spawn x("hello from first spawned fn", 65);
spawn x("hello from second spawned fn", 66);
spawn x("hello from third spawned fn", 67);
task::_spawn(bind x("hello from first spawned fn", 65));
task::_spawn(bind x("hello from second spawned fn", 66));
task::_spawn(bind x("hello from third spawned fn", 67));
let i: int = 30;
while i > 0 { i = i - 1; log "parent sleeping"; yield(); }
}

View File

@ -1,9 +1,10 @@
use std;
import std::task::join;
import std::task::join_id;
import std::task::_spawn;
fn main() {
let x = spawn m::child(10);
join(x);
let x = _spawn(bind m::child(10));
join_id(x);
}
mod m {
fn child(i: int) {

View File

@ -9,8 +9,7 @@ use std;
import std::task;
import std::comm;
fn start(pcc: *u8) {
let c = comm::chan_from_unsafe_ptr(pcc);
fn start(c: comm::_chan[str]) {
let p = comm::mk_port[str]();
c.send(p.mk_chan().unsafe_ptr());
@ -23,11 +22,10 @@ fn start(pcc: *u8) {
}
fn main() {
let p = comm::mk_port[*u8]();
let child = spawn start(p.mk_chan().unsafe_ptr());
let p = comm::mk_port();
let child = task::_spawn(bind start(p.mk_chan()));
let pc = p.recv();
let c = comm::chan_from_unsafe_ptr(pc);
let c = p.recv();
c.send("A");
c.send("B");
task::yield();

View File

@ -7,13 +7,13 @@ fn start(task_number: int) { log "Started / Finished task."; }
fn test00() {
let i: int = 0;
let t: task = spawn start(i);
let t = task::_spawn(bind start(i));
// Sleep long enough for the task to finish.
task::sleep(10000u);
// Try joining tasks that have already finished.
task::join(t);
task::join_id(t);
log "Joined task.";
}

View File

@ -1,27 +1,19 @@
// xfail-stage1
// xfail-stage2
// xfail-stage3
// This test fails when run with multiple threads
use std;
import std::comm;
import std::task;
fn start(pc: *u8, n: int) {
let c = comm::chan_from_unsafe_ptr();
fn start(c : comm::_chan[int], n: int) {
let i: int = n;
while i > 0 { c.send(0); i = i - 1; }
while i > 0 { comm::send(c, 0); i = i - 1; }
}
fn main() {
let p = comm::mk_port();
let p = comm::mk_port[comm::_chan[int]]();
// Spawn a task that sends us back messages. The parent task
// is likely to terminate before the child completes, so from
// the child's point of view the receiver may die. We should
// drop messages on the floor in this case, and not crash!
let child = spawn start(p.mk_chan().unsafe_ptr(), 10);
let c;
let pc = p.recv();
c = chan::chan_from_unsafe_ptr();
let child = task::_spawn(bind start(p.mk_chan(), 10));
let c = p.recv();
}

View File

@ -7,22 +7,22 @@ fn test_sleep() { task::sleep(1000000u); }
#[test]
fn test_unsupervise() {
fn f() { task::unsupervise(); fail; }
spawn f();
task::_spawn(bind f());
}
#[test]
fn test_join() {
fn winner() { }
let wintask = spawn winner();
let wintask = task::_spawn(bind winner());
assert (task::join(wintask) == task::tr_success);
assert (task::join_id(wintask) == task::tr_success);
fn failer() { task::unsupervise(); fail; }
let failtask = spawn failer();
let failtask = task::_spawn(bind failer());
assert (task::join(failtask) == task::tr_failure);
assert (task::join_id(failtask) == task::tr_failure);
}
#[test]