Adding test case dealing with communication and hashmaps.

This commit is contained in:
Eric Holk 2011-07-12 14:19:38 -07:00
parent c7e967148c
commit 404d4e0528

View File

@ -0,0 +1,108 @@
/**
A somewhat reduced test case to expose some Valgrind issues.
This originally came from the word-count benchmark.
*/
use std;
import std::io;
import option = std::option::t;
import std::option::some;
import std::option::none;
import std::str;
import std::vec;
import std::map;
fn map(str filename, map_reduce::putter emit) {
emit(filename, "1");
}
mod map_reduce {
export putter;
export mapper;
export map_reduce;
type putter = fn(str, str) -> ();
type mapper = fn(str, putter);
tag ctrl_proto {
find_reducer(str, chan[int]);
mapper_done;
}
fn start_mappers(chan[ctrl_proto] ctrl,
vec[str] inputs) {
for(str i in inputs) {
spawn map_task(ctrl, i);
}
}
fn map_task(chan[ctrl_proto] ctrl,
str input) {
auto intermediates = map::new_str_hash();
fn emit(&map::hashmap[str, int] im,
chan[ctrl_proto] ctrl,
str key, str val) {
auto c;
alt(im.find(key)) {
case(some(?_c)) {
c = _c
}
case(none) {
auto p = port();
log_err "sending find_reducer";
ctrl <| find_reducer(key, chan(p));
log_err "receiving";
p |> c;
log_err c;
im.insert(key, c);
}
}
}
map(input, bind emit(intermediates, ctrl, _, _));
ctrl <| mapper_done;
}
fn map_reduce (vec[str] inputs) {
auto ctrl = port[ctrl_proto]();
// This task becomes the master control task. It spawns others
// to do the rest.
let map::hashmap[str, int] reducers;
reducers = map::new_str_hash();
start_mappers(chan(ctrl), inputs);
auto num_mappers = vec::len(inputs) as int;
while(num_mappers > 0) {
auto m;
ctrl |> m;
alt(m) {
case(mapper_done) { num_mappers -= 1; }
case(find_reducer(?k, ?cc)) {
auto c;
alt(reducers.find(k)) {
case(some(?_c)) { c = _c; }
case(none) {
c = 0;
}
}
cc <| c;
}
}
}
}
}
fn main(vec[str] argv) {
map_reduce::map_reduce(["../src/test/run-pass/hashmap-memory.rs"]);
}