Removed trans_comm.rs from the compiler. Updating aio/sio to work with the new chan and port system, started on a networking module for the standard library.

This commit is contained in:
Eric Holk 2011-08-15 16:54:02 -07:00
parent e33af7e0b5
commit cf2def46c1
34 changed files with 326 additions and 600 deletions

View File

@ -70,12 +70,6 @@ import syntax::print::pprust::path_to_str;
import trans_common::*;
import trans_comm::trans_port;
import trans_comm::trans_chan;
import trans_comm::trans_spawn;
import trans_comm::trans_send;
import trans_comm::trans_recv;
import trans_objects::trans_anon_obj;
import trans_objects::trans_obj;
@ -5172,13 +5166,6 @@ fn trans_expr_out(cx: &@block_ctxt, e: &@ast::expr, output: out_method) ->
ast::expr_ret(ex) { ret trans_ret(cx, ex); }
ast::expr_put(ex) { ret trans_put(cx, ex); }
ast::expr_be(ex) { ret trans_be(cx, ex); }
ast::expr_port(_) { ret trans_port(cx, e.id); }
ast::expr_chan(ex) { ret trans_chan(cx, ex, e.id); }
ast::expr_send(lhs, rhs) { ret trans_send(cx, lhs, rhs, e.id); }
ast::expr_recv(lhs, rhs) { ret trans_recv(cx, lhs, rhs, e.id); }
ast::expr_spawn(dom, name, func, args) {
ret trans_spawn(cx, dom, name, func, args, e.id);
}
ast::expr_anon_obj(anon_obj) {
ret trans_anon_obj(cx, e.span, anon_obj, e.id);
}

View File

@ -1,302 +0,0 @@
/**
Translation for various task and comm-related things.
Most of this will probably go away as we move more of this into
libraries.
*/
import std::str;
import std::option;
import option::none;
import option::some;
import lib::llvm::llvm;
import lib::llvm::llvm::ValueRef;
import util::ppaux::ty_to_str;
import syntax::print::pprust::expr_to_str;
import syntax::ast;
import back::link::mangle_internal_name_by_path_and_seq;
import trans_common::*;
import trans::*;
export trans_port;
export trans_chan;
export trans_spawn;
export trans_send;
export trans_recv;
fn trans_port(cx: &@block_ctxt, id: ast::node_id) -> result {
let t = node_id_type(cx.fcx.lcx.ccx, id);
let unit_ty;
alt ty::struct(cx.fcx.lcx.ccx.tcx, t) {
ty::ty_port(t) { unit_ty = t; }
_ { cx.fcx.lcx.ccx.sess.bug("non-port type in trans_port"); }
}
let bcx = cx;
let unit_sz = size_of(bcx, unit_ty);
bcx = unit_sz.bcx;
let port_raw_val =
bcx.build.Call(bcx.fcx.lcx.ccx.upcalls.new_port,
~[bcx.fcx.lltaskptr, unit_sz.val]);
let llty = type_of(cx.fcx.lcx.ccx, cx.sp, t);
let port_val = bcx.build.PointerCast(port_raw_val, llty);
add_clean_temp(bcx, port_val, t);
ret rslt(bcx, port_val);
}
fn trans_chan(cx: &@block_ctxt, e: &@ast::expr, id: ast::node_id) -> result {
let bcx = cx;
let prt = trans_expr(bcx, e);
bcx = prt.bcx;
let prt_val = bcx.build.PointerCast(prt.val, T_opaque_port_ptr());
let chan_raw_val =
bcx.build.Call(bcx.fcx.lcx.ccx.upcalls.new_chan,
~[bcx.fcx.lltaskptr, prt_val]);
let chan_ty = node_id_type(bcx.fcx.lcx.ccx, id);
let chan_llty = type_of(bcx.fcx.lcx.ccx, e.span, chan_ty);
let chan_val = bcx.build.PointerCast(chan_raw_val, chan_llty);
add_clean_temp(bcx, chan_val, chan_ty);
ret rslt(bcx, chan_val);
}
fn trans_spawn(cx: &@block_ctxt, dom: &ast::spawn_dom, name: &option::t[str],
func: &@ast::expr, args: &[@ast::expr], id: ast::node_id) ->
result {
let bcx = cx;
// Make the task name
let tname =
alt name {
none. {
let argss = std::ivec::map(expr_to_str, args);
#fmt("%s(%s)", expr_to_str(func), str::connect(argss, ", "))
}
some(n) { n }
};
// Generate code
//
// This is a several step process. The following things need to happen
// (not necessarily in order):
//
// 1. Evaluate all the arguments to the spawnee.
//
// 2. Alloca a tuple that holds these arguments (they must be in reverse
// order, so that they match the expected stack layout for the spawnee)
//
// 3. Fill the tuple with the arguments we evaluated.
//
// 3.5. Generate a wrapper function that takes the tuple and unpacks it to
// call the real task.
//
// 4. Pass a pointer to the wrapper function and the argument tuple to
// upcall_start_task. In order to do this, we need to allocate another
// tuple that matches the arguments expected by rust_task::start.
//
// 5. Oh yeah, we have to create the task before we start it...
// But first, we'll create a task.
let lltname: ValueRef = C_str(bcx.fcx.lcx.ccx, tname);
let new_task =
bcx.build.Call(bcx.fcx.lcx.ccx.upcalls.new_task,
~[bcx.fcx.lltaskptr, lltname]);
// Translate the arguments, remembering their types and where the values
// ended up.
let arg_tys: [ty::t] = ~[];
let arg_vals: [ValueRef] = ~[];
for e: @ast::expr in args {
let e_ty = ty::expr_ty(cx.fcx.lcx.ccx.tcx, e);
let arg = trans_expr(bcx, e);
arg = deep_copy(arg.bcx, arg.val, e_ty, new_task);
bcx = arg.bcx;
arg_vals += ~[arg.val];
arg_tys += ~[e_ty];
}
// Make the tuple.
let args_ty = ty::mk_tup(cx.fcx.lcx.ccx.tcx, arg_tys);
// Allocate and fill the tuple.
let llargs = alloc_ty(bcx, args_ty);
let i = 0u;
for v: ValueRef in arg_vals {
let target = bcx.build.GEP(llargs.val, ~[C_int(0), C_int(i as int)]);
bcx.build.Store(v, target);
i += 1u;
}
// Generate the wrapper function
let wrapper = mk_spawn_wrapper(bcx, func, args_ty);
bcx = wrapper.bcx;
let llfnptr_i = bcx.build.PointerCast(wrapper.val, T_int());
// And start the task
let llargs_i = bcx.build.PointerCast(llargs.val, T_int());
let args_size = size_of(bcx, args_ty).val;
bcx.build.Call(bcx.fcx.lcx.ccx.upcalls.start_task,
~[bcx.fcx.lltaskptr, new_task, llfnptr_i, llargs_i,
args_size]);
let task_ty = node_id_type(bcx.fcx.lcx.ccx, id);
add_clean_temp(bcx, new_task, task_ty);
ret rslt(bcx, new_task);
}
fn mk_spawn_wrapper(cx: &@block_ctxt, func: &@ast::expr, args_ty: &ty::t) ->
result {
let llmod = cx.fcx.lcx.ccx.llmod;
let wrapper_fn_type =
type_of_fn(cx.fcx.lcx.ccx, cx.sp, ast::proto_fn,
~[{mode: ty::mo_alias(false), ty: args_ty}], ty::idx_nil,
0u);
// TODO: construct a name based on tname
let wrap_name: str =
mangle_internal_name_by_path_and_seq(cx.fcx.lcx.ccx, cx.fcx.lcx.path,
"spawn_wrapper");
let llfndecl = decl_cdecl_fn(llmod, wrap_name, wrapper_fn_type);
let fcx = new_fn_ctxt(cx.fcx.lcx, cx.sp, llfndecl);
let fbcx = new_top_block_ctxt(fcx);
// 3u to skip the three implicit args
let arg: ValueRef = llvm::LLVMGetParam(fcx.llfn, 3u);
let child_args: [ValueRef] =
~[llvm::LLVMGetParam(fcx.llfn, 0u), llvm::LLVMGetParam(fcx.llfn, 1u),
llvm::LLVMGetParam(fcx.llfn, 2u)];
// unpack the arguments
alt ty::struct(fcx.lcx.ccx.tcx, args_ty) {
ty::ty_tup(elts) {
let i = 0;
for elt in elts {
let src = fbcx.build.GEP(arg, ~[C_int(0), C_int(i)]);
i += 1;
let child_arg = fbcx.build.Load(src);
child_args += ~[child_arg];
}
}
}
// Find the function
let fnptr = trans_lval(fbcx, func).res;
fbcx = fnptr.bcx;
let llfnptr = fbcx.build.GEP(fnptr.val, ~[C_int(0), C_int(0)]);
let llfn = fbcx.build.Load(llfnptr);
fbcx.build.FastCall(llfn, child_args);
fbcx.build.RetVoid();
finish_fn(fcx, fbcx.llbb);
// TODO: make sure we clean up everything we need to.
ret rslt(cx, llfndecl);
}
fn trans_send(cx: &@block_ctxt, lhs: &@ast::expr, rhs: &@ast::expr,
id: ast::node_id) -> result {
let bcx = cx;
let chn = trans_expr(bcx, lhs);
bcx = chn.bcx;
let data = trans_lval(bcx, rhs);
bcx = data.res.bcx;
let chan_ty = node_id_type(cx.fcx.lcx.ccx, id);
let unit_ty;
alt ty::struct(cx.fcx.lcx.ccx.tcx, chan_ty) {
ty::ty_chan(t) { unit_ty = t; }
_ { bcx.fcx.lcx.ccx.sess.bug("non-chan type in trans_send"); }
}
let data_alloc = alloc_ty(bcx, unit_ty);
bcx = data_alloc.bcx;
let data_tmp = move_val_if_temp(bcx, INIT, data_alloc.val, data, unit_ty);
bcx = data_tmp.bcx;
let llchanval = bcx.build.PointerCast(chn.val, T_opaque_chan_ptr());
let lldataptr = bcx.build.PointerCast(data_alloc.val, T_ptr(T_i8()));
bcx.build.Call(bcx.fcx.lcx.ccx.upcalls.send,
~[bcx.fcx.lltaskptr, llchanval, lldataptr]);
// Deinit the stuff we sent.
bcx = zero_alloca(bcx, data_alloc.val, unit_ty).bcx;
ret rslt(bcx, chn.val);
}
fn trans_recv(cx: &@block_ctxt, lhs: &@ast::expr, rhs: &@ast::expr,
id: ast::node_id) -> result {
let bcx = cx;
// FIXME: calculate copy init-ness in typestate.
let unit_ty = node_id_type(cx.fcx.lcx.ccx, id);
let tmp_alloc = alloc_ty(bcx, unit_ty);
bcx = tmp_alloc.bcx;
let prt = trans_expr(bcx, lhs);
bcx = prt.bcx;
let lldataptr = bcx.build.PointerCast(tmp_alloc.val,
T_ptr(T_ptr(T_i8())));
let llportptr = bcx.build.PointerCast(prt.val, T_opaque_port_ptr());
bcx.build.Call(bcx.fcx.lcx.ccx.upcalls.recv,
~[bcx.fcx.lltaskptr, lldataptr, llportptr]);
let tmp = load_if_immediate(bcx, tmp_alloc.val, unit_ty);
let data = trans_lval(bcx, rhs);
assert (data.is_mem);
bcx = data.res.bcx;
let tmp_lval = lval_val(bcx, tmp);
let recv_res =
move_val(bcx, DROP_EXISTING, data.res.val, tmp_lval, unit_ty);
ret rslt(recv_res.bcx, recv_res.val);
}
// Does a deep copy of a value. This is needed for passing arguments to child
// tasks, and for sending things through channels. There are probably some
// uniqueness optimizations and things we can do here for tasks in the same
// domain.
fn deep_copy(bcx: &@block_ctxt, v: ValueRef, t: ty::t, target_task: ValueRef)
-> result {
// TODO: make sure all paths add any reference counting that they need to.
// TODO: Teach deep copy to understand everything else it needs to.
let tcx = bcx.fcx.lcx.ccx.tcx;
if ty::type_is_scalar(tcx, t) {
ret rslt(bcx, v);
} else if (ty::type_is_str(tcx, t)) {
ret rslt(bcx,
bcx.build.Call(bcx.fcx.lcx.ccx.upcalls.dup_str,
~[bcx.fcx.lltaskptr, target_task, v]));
} else if (ty::type_is_chan(tcx, t)) {
// If this is a channel, we need to clone it.
let chan_ptr = bcx.build.PointerCast(v, T_opaque_chan_ptr());
let chan_raw_val =
bcx.build.Call(bcx.fcx.lcx.ccx.upcalls.clone_chan,
~[bcx.fcx.lltaskptr, target_task, chan_ptr]);
// Cast back to the type the context was expecting.
let chan_val = bcx.build.PointerCast(chan_raw_val, val_ty(v));
ret rslt(bcx, chan_val);
} else if (ty::type_is_structural(tcx, t)) {
fn inner_deep_copy(bcx: &@block_ctxt, v: ValueRef, t: ty::t) ->
result {
log_err "Unimplemented type for deep_copy.";
fail;
}
ret iter_structural_ty(bcx, v, t, inner_deep_copy);
} else {
bcx.fcx.lcx.ccx.sess.bug("unexpected type in " + "trans::deep_copy: "
+ ty_to_str(tcx, t));
}
}

View File

@ -1,4 +1,3 @@
// -*- rust -*-
#[link(name = "rustc",
@ -18,7 +17,6 @@ mod middle {
mod trans_common;
mod trans;
mod trans_alt;
mod trans_comm;
mod trans_dps;
mod trans_objects;
mod trans_vec;

View File

@ -577,16 +577,6 @@ fn parse_ty(p: &parser, colons_before_params: bool) -> @ast::ty {
alt t { ast::ty_fn(_, _, out, _, _) { hi = out.span.hi; } }
} else if (eat_word(p, "obj")) {
t = parse_ty_obj(p, hi);
} else if (eat_word(p, "port")) {
expect(p, token::LBRACKET);
t = ast::ty_port(parse_ty(p, false));
hi = p.get_hi_pos();
expect(p, token::RBRACKET);
} else if (eat_word(p, "chan")) {
expect(p, token::LBRACKET);
t = ast::ty_chan(parse_ty(p, false));
hi = p.get_hi_pos();
expect(p, token::RBRACKET);
} else if (eat_word(p, "mutable")) {
p.warn("ignoring deprecated 'mutable' type constructor");
let typ = parse_ty(p, false);
@ -990,30 +980,11 @@ fn parse_bottom_expr(p: &parser) -> @ast::expr {
}
} else if (eat_word(p, "be")) {
let e = parse_expr(p);
// FIXME: Is this the right place for this check?
if /*check*/ast::is_call_expr(e) {
hi = e.span.hi;
ex = ast::expr_be(e);
} else { p.fatal("Non-call expression in tail call"); }
} else if (eat_word(p, "port")) {
let ty = @spanned(lo, hi, ast::ty_infer);
if token::LBRACKET == p.peek() {
expect(p, token::LBRACKET);
ty = parse_ty(p, false);
expect(p, token::RBRACKET);
}
expect(p, token::LPAREN);
expect(p, token::RPAREN);
hi = p.get_hi_pos();
ex = ast::expr_port(ty);
} else if (eat_word(p, "chan")) {
expect(p, token::LPAREN);
let e = parse_expr(p);
hi = e.span.hi;
expect(p, token::RPAREN);
ex = ast::expr_chan(e);
} else if (eat_word(p, "copy")) {
let e = parse_expr(p);
ex = ast::expr_copy(e);
@ -1271,16 +1242,6 @@ fn parse_assign_expr(p: &parser) -> @ast::expr {
let rhs = parse_expr(p);
ret mk_expr(p, lo, rhs.span.hi, ast::expr_move(lhs, rhs));
}
token::SEND. {
p.bump();
let rhs = parse_expr(p);
ret mk_expr(p, lo, rhs.span.hi, ast::expr_send(lhs, rhs));
}
token::RECV. {
p.bump();
let rhs = parse_expr(p);
ret mk_expr(p, lo, rhs.span.hi, ast::expr_recv(lhs, rhs));
}
token::DARROW. {
p.bump();
let rhs = parse_expr(p);

View File

@ -1,5 +1,13 @@
import str::sbuf;
import task;
import ivec;
import comm;
import comm::_chan;
import comm::_port;
import comm::mk_port;
import comm::send;
import net;
native "rust" mod rustrt {
type socket;
@ -7,19 +15,21 @@ native "rust" mod rustrt {
fn aio_init();
fn aio_run();
fn aio_stop();
fn aio_connect(host: sbuf, port: int, connected: chan[socket]);
fn aio_serve(host: sbuf, port: int, acceptChan: chan[socket]) -> server;
fn aio_writedata(s: socket, buf: *u8, size: uint, status: chan[bool]);
fn aio_read(s: socket, reader: chan[[u8]]);
fn aio_close_server(s: server, status: chan[bool]);
fn aio_connect(host: *u8, port: int, connected: &_chan[socket]);
fn aio_serve(host: *u8, port: int, acceptChan: &_chan[socket]) -> server;
fn aio_writedata(s: socket, buf: *u8, size: uint, status: &_chan[bool]);
fn aio_read(s: socket, reader: &_chan[[u8]]);
fn aio_close_server(s: server, status: &_chan[bool]);
fn aio_close_socket(s: socket);
fn aio_is_null_client(s: socket) -> bool;
}
// FIXME: these should be unsafe pointers or something, but those aren't
// currently in the sendable kind, so we'll unsafely cast between ints.
type server = rustrt::server;
type client = rustrt::socket;
tag pending_connection {
remote(str,int);
remote(net::ip_addr,int);
incoming(server);
}
@ -30,41 +40,43 @@ tag socket_event {
}
tag server_event {
pending(chan[chan[socket_event]]);
pending(_chan[_chan[socket_event]]);
}
tag request {
quit;
connect(pending_connection,chan[socket_event]);
serve(str,int,chan[server_event],chan[server]);
write(client,[u8],chan[bool]);
close_server(server, chan[bool]);
connect(pending_connection,_chan[socket_event]);
serve(net::ip_addr,int,_chan[server_event],_chan[server]);
write(client,[u8],_chan[bool]);
close_server(server, _chan[bool]);
close_client(client);
}
type ctx = chan[request];
type ctx = _chan[request];
fn connect_task(ip: str, portnum: int, evt: chan[socket_event]) {
let connecter: port[client] = port();
rustrt::aio_connect(str::buf(ip), portnum, chan(connecter));
let client: client;
connecter |> client;
fn ip_to_sbuf(ip: net::ip_addr) -> *u8 {
ivec::to_ptr(str::bytes(net::format_addr(ip)))
}
fn connect_task(ip: net::ip_addr, portnum: int, evt: _chan[socket_event]) {
let connecter: _port[client] = mk_port();
rustrt::aio_connect(ip_to_sbuf(ip), portnum, connecter.mk_chan());
let client = connecter.recv();
new_client(client, evt);
}
fn new_client(client: client, evt: chan[socket_event]) {
fn new_client(client: client, evt: _chan[socket_event]) {
// Start the read before notifying about the connect. This avoids a race
// condition where the receiver can close the socket before we start
// reading.
let reader: port[[u8]] = port();
rustrt::aio_read(client, chan(reader));
let reader: _port[[u8]] = mk_port();
rustrt::aio_read(client, reader.mk_chan());
evt <| connected(client);
send(evt, connected(client));
while (true) {
log "waiting for bytes";
let data: [u8];
reader |> data;
let data: [u8] = reader.recv();
log "got some bytes";
log ivec::len[u8](data);
if (ivec::len[u8](data) == 0u) {
@ -72,33 +84,33 @@ fn new_client(client: client, evt: chan[socket_event]) {
break;
}
log "got non-empty buffer, sending";
evt <| received(data);
send(evt, received(data));
log "sent non-empty buffer";
}
log "done reading";
evt <| closed;
send(evt, closed);
log "close message sent";
}
fn accept_task(client: client, events: chan[server_event]) {
fn accept_task(client: client, events: _chan[server_event]) {
log "accept task was spawned";
let p: port[chan[socket_event]] = port();
events <| pending(chan(p));
let evt: chan[socket_event];
p |> evt;
let p: _port[_chan[socket_event]] = mk_port();
send(events, pending(p.mk_chan()));
let evt = p.recv();
new_client(client, evt);
log "done accepting";
}
fn server_task(ip: str, portnum: int, events: chan[server_event],
server: chan[server]) {
let accepter: port[client] = port();
server <| rustrt::aio_serve(str::buf(ip), portnum, chan(accepter));
fn server_task(ip: net::ip_addr, portnum: int, events: _chan[server_event],
server: _chan[server]) {
let accepter: _port[client] = mk_port();
send(server, rustrt::aio_serve(ip_to_sbuf(ip), portnum,
accepter.mk_chan()));
let client: client;
while (true) {
log "preparing to accept a client";
accepter |> client;
client = accepter.recv();
if (rustrt::aio_is_null_client(client)) {
log "client was actually null, returning";
ret;
@ -108,48 +120,48 @@ fn server_task(ip: str, portnum: int, events: chan[server_event],
}
}
fn request_task(c: chan[ctx]) {
fn request_task(c: _chan[ctx]) {
// Create a port to accept IO requests on
let p: port[request] = port();
let p: _port[request] = mk_port();
// Hand of its channel to our spawner
c <| chan(p);
send(c, p.mk_chan());
log "uv run task spawned";
// Spin for requests
let req: request;
while (true) {
p |> req;
req = p.recv();
log_err req;
alt req {
quit. {
log "got quit message";
log "stopping libuv";
rustrt::aio_stop();
ret;
}
connect(remote(ip,portnum),client) {
task::_spawn(bind connect_task(ip, portnum, client));
}
serve(ip,portnum,events,server) {
task::_spawn(bind server_task(ip, portnum, events, server));
}
write(socket,v,status) {
rustrt::aio_writedata(socket,
ivec::to_ptr[u8](v), ivec::len[u8](v),
status);
}
close_server(server,status) {
log "closing server";
rustrt::aio_close_server(server,status);
}
close_client(client) {
log "closing client";
rustrt::aio_close_socket(client);
}
quit. {
log "got quit message";
log "stopping libuv";
rustrt::aio_stop();
ret;
}
connect(remote(ip,portnum),client) {
task::_spawn(bind connect_task(ip, portnum, client));
}
serve(ip,portnum,events,server) {
task::_spawn(bind server_task(ip, portnum, events, server));
}
write(socket,v,status) {
rustrt::aio_writedata(socket,
ivec::to_ptr[u8](v), ivec::len[u8](v),
status);
}
close_server(server,status) {
log "closing server";
rustrt::aio_close_server(server,status);
}
close_client(client) {
log "closing client";
rustrt::aio_close_socket(client);
}
}
}
}
fn iotask(c: chan[ctx]) {
fn iotask(c: _chan[ctx]) {
log "io task spawned";
// Initialize before accepting requests
rustrt::aio_init();
@ -167,11 +179,9 @@ fn iotask(c: chan[ctx]) {
}
fn new() -> ctx {
let p: port[ctx] = port();
let t = task::_spawn(bind iotask(chan(p)));
let cx: ctx;
p |> cx;
ret cx;
let p: _port[ctx] = mk_port();
task::_spawn(bind iotask(p.mk_chan()));
ret p.recv();
}
// Local Variables:

30
src/lib/net.rs Normal file
View File

@ -0,0 +1,30 @@
import str;
import ivec;
import uint;
tag ip_addr {
ipv4(u8, u8, u8, u8);
}
fn format_addr(ip : ip_addr) -> str {
alt(ip) {
ipv4(a, b, c, d) {
#fmt("%u.%u.%u.%u",
a as uint,
b as uint,
c as uint,
d as uint)
}
_ { fail "Unsupported address type"; }
}
}
fn parse_addr(ip : str) -> ip_addr {
let parts = ivec::map(uint::from_str, str::split(ip, ".".(0)));
if ivec::len(parts) != 4u { fail "Too many dots in IP address"; }
for i in parts { if i > 255u { fail "Invalid IP Address part."; } }
ipv4(parts.(0) as u8,
parts.(1) as u8,
parts.(2) as u8,
parts.(3) as u8)
}

View File

@ -1,18 +1,27 @@
import comm::_port;
import comm::_chan;
import comm::mk_port;
import comm::send;
import str;
import net;
type ctx = aio::ctx;
type client = { ctx: ctx, client: aio::client, evt: port[aio::socket_event] };
type server = { ctx: ctx, server: aio::server, evt: port[aio::server_event] };
type client = { ctx: ctx, client: aio::client,
evt: _port[aio::socket_event] };
type server = { ctx: ctx, server: aio::server,
evt: _port[aio::server_event] };
fn new() -> ctx {
ret aio::new();
}
fn destroy(ctx: ctx) {
ctx <| aio::quit;
send(ctx, aio::quit);
}
fn make_socket(ctx: ctx, p: port[aio::socket_event]) -> client {
let evt: aio::socket_event;
p |> evt;
fn make_socket(ctx: ctx, p: _port[aio::socket_event]) -> client {
let evt: aio::socket_event = p.recv();
alt evt {
aio::connected(client) {
ret { ctx: ctx, client: client, evt: p };
@ -21,16 +30,14 @@ fn make_socket(ctx: ctx, p: port[aio::socket_event]) -> client {
}
}
fn connect_to(ctx: ctx, ip: str, portnum: int) -> client {
let p: port[aio::socket_event] = port();
ctx <| aio::connect(aio::remote(ip, portnum), chan(p));
fn connect_to(ctx: ctx, ip: net::ip_addr, portnum: int) -> client {
let p: _port[aio::socket_event] = mk_port();
send(ctx, aio::connect(aio::remote(ip, portnum), p.mk_chan()));
ret make_socket(ctx, p);
}
fn read(c: client) -> [u8] {
let evt: aio::socket_event;
c.evt |> evt;
alt evt {
alt c.evt.recv() {
aio::closed. {
ret ~[];
}
@ -40,55 +47,51 @@ fn read(c: client) -> [u8] {
}
}
fn create_server(ctx: ctx, ip: str, portnum: int) -> server {
let evt: port[aio::server_event] = port();
let p: port[aio::server] = port();
ctx <| aio::serve(ip, portnum, chan(evt), chan(p));
let srv: aio::server;
p |> srv;
fn create_server(ctx: ctx, ip: net::ip_addr, portnum: int) -> server {
let evt: _port[aio::server_event] = mk_port();
let p: _port[aio::server] = mk_port();
send(ctx, aio::serve(ip, portnum,
evt.mk_chan(), p.mk_chan()));
let srv: aio::server = p.recv();
ret { ctx: ctx, server: srv, evt: evt };
}
fn accept_from(server: server) -> client {
let evt: aio::server_event;
server.evt |> evt;
let evt: aio::server_event = server.evt.recv();
alt evt {
aio::pending(callback) {
let p: port[aio::socket_event] = port();
callback <| chan(p);
ret make_socket(server.ctx, p);
}
aio::pending(callback) {
let p: _port[aio::socket_event] = mk_port();
send(callback, p.mk_chan());
ret make_socket(server.ctx, p);
}
}
}
fn write_data(c: client, data: [u8]) -> bool {
let p: port[bool] = port();
c.ctx <| aio::write(c.client, data, chan(p));
let success: bool;
p |> success;
ret success;
let p: _port[bool] = mk_port();
send(c.ctx, aio::write(c.client, data, p.mk_chan()));
ret p.recv();
}
fn close_server(server: server) {
// TODO: make this unit once we learn to send those from native code
let p: port[bool] = port();
server.ctx <| aio::close_server(server.server, chan(p));
let success: bool;
let p: _port[bool] = mk_port();
send(server.ctx, aio::close_server(server.server, p.mk_chan()));
log "Waiting for close";
p |> success;
p.recv();
log "Got close";
}
fn close_client(client: client) {
client.ctx <| aio::close_client(client.client);
send(client.ctx, aio::close_client(client.client));
let evt: aio::socket_event;
do {
client.evt |> evt;
evt = client.evt.recv();
alt evt {
aio::closed. {
ret;
}
_ {}
aio::closed. {
ret;
}
_ {}
}
} while (true);
}

View File

@ -70,6 +70,7 @@ mod run = "run_program.rs";
mod fs;
mod aio;
mod sio;
mod net;
// FIXME: parametric
mod map;

View File

@ -8,7 +8,6 @@ native "rust" mod rustrt {
fn pin_task();
fn unpin_task();
fn get_task_id() -> task_id;
fn clone_chan(c: *rust_chan) -> *rust_chan;
type rust_chan;
type rust_task;
@ -61,16 +60,6 @@ fn pin() { rustrt::pin_task(); }
fn unpin() { rustrt::unpin_task(); }
// FIXME: remove this
fn clone_chan[T](c: chan[T]) -> chan[T] {
let cloned = rustrt::clone_chan(unsafe::reinterpret_cast(c));
ret unsafe::reinterpret_cast(cloned);
}
fn send[T](c: chan[T], v: &T) { c <| v; }
fn recv[T](p: port[T]) -> T { let v; p |> v; v }
fn set_min_stack(stack_size : uint) {
rustrt::set_min_stack(stack_size);
}

View File

@ -56,6 +56,10 @@ fn parse_buf(buf: &[u8], radix: uint) -> uint {
fail;
}
fn from_str(s : &str) -> uint {
parse_buf(str::bytes(s), 10u)
}
fn to_str(num: uint, radix: uint) -> str {
let n = num;
assert (0u < radix && radix <= 16u);

View File

@ -4,7 +4,7 @@
// NB: please do not commit code with this uncommented. It's
// hugely expensive and should only be used as a last resort.
//
// #define TRACK_ALLOCATIONS
#define TRACK_ALLOCATIONS
#define MAGIC 0xbadc0ffe

View File

@ -775,11 +775,6 @@ migrate_alloc(rust_task *task, void *alloc, rust_task_id tid) {
}
}
extern "C" CDECL rust_chan *
clone_chan(rust_task *task, rust_chan *chan) {
return chan->clone(task);
}
// defined in rust_task.cpp
extern size_t g_custom_min_stack_size;
extern "C" CDECL void

View File

@ -25,6 +25,12 @@ public:
rust_chan *clone(rust_task *target);
};
// Corresponds to the rust chan (currently _chan) type.
struct chan_handle {
rust_task_id task;
rust_port_id port;
};
//
// Local Variables:
// mode: C++

View File

@ -56,6 +56,7 @@ struct rust_task;
class rust_log;
class rust_port;
class rust_chan;
struct chan_handle;
struct rust_token;
class rust_kernel;
class rust_crate_cache;

View File

@ -527,6 +527,17 @@ rust_port *rust_task::get_port_by_id(rust_port_id id) {
return port;
}
rust_chan *rust_task::get_chan_by_handle(chan_handle *handle) {
rust_task *target_task = kernel->get_task_by_id(handle->task);
if(target_task) {
rust_port *port = target_task->get_port_by_id(handle->port);
target_task->deref();
port->remote_chan->ref();
return port->remote_chan;
}
return NULL;
}
//
// Local Variables:
// mode: C++

View File

@ -173,6 +173,8 @@ rust_task : public kernel_owned<rust_task>, rust_cond
// Use this function sparingly. Depending on the ref count is generally
// not at all safe.
intptr_t get_ref_count() const { return ref_count; }
rust_chan *get_chan_by_handle(chan_handle *handle);
};
//

View File

@ -170,8 +170,10 @@ static void new_connection(uv_handle_t *socket, int status) {
}
extern "C" CDECL socket_data *aio_serve(rust_task *task, const char *ip,
int port, rust_chan *chan) {
int port, chan_handle *_chan) {
LOG_UPCALL_ENTRY(task);
rust_chan *chan = task->get_chan_by_handle(_chan);
if(!chan) return NULL;
struct sockaddr_in addr = uv_ip4_addr(const_cast<char*>(ip), port);
socket_data *server = make_socket(iotask, chan);
if (!server)
@ -179,10 +181,13 @@ extern "C" CDECL socket_data *aio_serve(rust_task *task, const char *ip,
if (uv_tcp_bind(&server->socket, addr) ||
uv_tcp_listen(&server->socket, 128, new_connection)) {
aio_close_socket(task, server);
chan->deref();
return NULL;
}
chan->deref();
return server;
oom:
chan->deref();
task->fail();
return NULL;
}
@ -218,8 +223,10 @@ extern "C" CDECL void aio_close_socket(rust_task *task, socket_data *client) {
}
extern "C" CDECL void aio_close_server(rust_task *task, socket_data *server,
rust_chan *chan) {
chan_handle *_chan) {
LOG_UPCALL_ENTRY(task);
rust_chan *chan = task->get_chan_by_handle(_chan);
if(!chan) return;
// XXX: hax until rust_task::kill
// send null and the receiver knows to call back into native code to check
void* null_client = NULL;
@ -227,6 +234,7 @@ extern "C" CDECL void aio_close_server(rust_task *task, socket_data *server,
server->chan->deref();
server->chan = chan->clone(iotask);
aio_close_socket(task, server);
chan->deref();
}
extern "C" CDECL bool aio_is_null_client(rust_task *task,
@ -243,8 +251,10 @@ static void connection_complete(request *req, int status) {
}
extern "C" CDECL void aio_connect(rust_task *task, const char *host,
int port, rust_chan *chan) {
int port, chan_handle *_chan) {
LOG_UPCALL_ENTRY(task);
rust_chan *chan = task->get_chan_by_handle(_chan);
if(!chan) return;
struct sockaddr_in addr = uv_ip4_addr(const_cast<char*>(host), port);
request *req;
socket_data *client = make_socket(iotask, NULL);
@ -257,11 +267,13 @@ extern "C" CDECL void aio_connect(rust_task *task, const char *host,
goto oom_req;
}
if (0 == uv_tcp_connect(req, addr)) {
return;
chan->deref();
return;
}
oom_req:
aio_close_socket(task, client);
oom_client:
chan->deref();
task->fail();
return;
}
@ -274,8 +286,11 @@ static void write_complete(request *req, int status) {
}
extern "C" CDECL void aio_writedata(rust_task *task, socket_data *data,
char *buf, size_t size, rust_chan *chan) {
char *buf, size_t size,
chan_handle *_chan) {
LOG_UPCALL_ENTRY(task);
rust_chan *chan = task->get_chan_by_handle(_chan);
if(!chan) return;
// uv_buf_t is defined backwards on win32...
// maybe an indication we shouldn't be building directly?
@ -294,15 +309,20 @@ extern "C" CDECL void aio_writedata(rust_task *task, socket_data *data,
delete req;
goto fail;
}
chan->deref();
return;
fail:
chan->deref();
task->fail();
}
extern "C" CDECL void aio_read(rust_task *task, socket_data *data,
rust_chan *reader) {
chan_handle *_chan) {
LOG_UPCALL_ENTRY(task);
rust_chan *reader = task->get_chan_by_handle(_chan);
if(!reader) return;
I(task->sched, data->reader == NULL);
data->reader = reader->clone(iotask);
uv_read_start((uv_stream_t*)&data->socket, alloc_buffer, read_progress);
reader->deref();
}

View File

@ -12,7 +12,6 @@ align_of
chan_id_send
chan_send
check_claims
clone_chan
debug_box
debug_fn
debug_obj

View File

@ -1,3 +0,0 @@
// error-pattern: mismatched types
fn main() { 10 |> 10; }

View File

@ -1,3 +0,0 @@
// error-pattern: mismatched types
fn main() { 10 <| 10; }

View File

@ -1,7 +0,0 @@
// error-pattern:>> cannot be applied to type `port[int]`
fn main() {
let p1: port[int] = port();
let p2: port[int] = port();
let x = p1 >> p2;
}

View File

@ -5,11 +5,12 @@
use std;
import std::task;
import std::comm::mk_port;
fn child() { assert (1 == 2); }
fn main() {
let p: port[int] = port();
let p = mk_port[int]();
task::_spawn(bind child());
let x: int; p |> x;
let x = p.recv();
}

View File

@ -1,13 +1,17 @@
// error-pattern:meep
fn echo[T](c: chan[T], oc: chan[chan[T]]) {
use std;
import std::comm::_chan;
import std::comm::mk_port;
import std::comm::send;
fn echo[~T](c: _chan[T], oc: _chan[_chan[T]]) {
// Tests that the type argument in port gets
// visited
let p = port[T]();
oc <| chan(p);
let p = mk_port[T]();
send(oc, p.mk_chan());
let x;
p |> x;
c <| x;
let x = p.recv();
send(c, x);
}
fn main() { fail "meep"; }
fn main() { fail "meep"; }

View File

@ -3,6 +3,7 @@
use std;
import std::unsafe::reinterpret_cast;
import std::task;
import std::comm;
fn test_nil() {
assert (() == ());
@ -60,20 +61,21 @@ fn test_box() {
}
fn test_port() {
let p1: port[int] = port();
let p2: port[int] = port();
let p1 = comm::mk_port[int]();
let p2 = comm::mk_port[int]();
assert p1 == p1;
assert p1 != p2;
}
fn test_chan() {
let p: port[int] = port();
let ch1 = chan(p);
let ch2 = chan(p);
let p: comm::_port[int] = comm::mk_port();
let ch1 = p.mk_chan();
let ch2 = p.mk_chan();
assert ch1 == ch1;
assert ch1 != ch2;
// Chans are equal because they are just task:port addresses.
assert ch1 == ch2;
}
fn test_ptr() {

View File

@ -1,16 +1,16 @@
// -*- rust -*-
use std;
import std::comm::mk_port;
import std::comm::send;
fn main() {
let po: port[int] = port();
let ch: chan[int] = chan(po);
ch <| 10;
let i: int;
po |> i;
let po = mk_port();
let ch = po.mk_chan();
send(ch, 10);
let i = po.recv();
assert (i == 10);
ch <| 11;
let j;
po |> j;
send(ch, 11);
let j = po.recv();
assert (j == 11);
}

View File

@ -1,5 +1,9 @@
// Sanity-check the code examples that appear in the object system
// documentation.
use std;
import std::comm::_chan;
import std::comm::send;
import std::comm::mk_port;
fn main() {
@ -43,9 +47,9 @@ fn main() {
}
}
obj sender(c: chan[int]) {
obj sender(c: _chan[int]) {
fn take(z: int) {
c <| z;
send(c, z);
}
}
@ -55,10 +59,10 @@ fn main() {
t.take(3);
}
let p: port[int] = port();
let p = mk_port[int]();
let t1: taker = adder(@mutable 0);
let t2: taker = sender(chan(p));
let t2: taker = sender(p.mk_chan());
give_ints(t1);
give_ints(t2);

View File

@ -1,5 +1,3 @@
// -*- rust -*-
// Regression tests for circular_buffer when using a unit
@ -7,7 +5,9 @@
use std;
import std::option;
import std::uint;
import std::comm;
import std::comm::mk_port;
import std::comm::send;
// A 12-byte unit to send over the channel
type record = {val1: u32, val2: u32, val3: u32};
@ -18,50 +18,52 @@ type record = {val1: u32, val2: u32, val3: u32};
// power of two so needs to be rounded up. Don't trigger any
// assertions.
fn test_init() {
let myport: port[record] = port();
let mychan = chan(myport);
let myport = mk_port[record]();
let mychan = myport.mk_chan();
let val: record = {val1: 0u32, val2: 0u32, val3: 0u32};
mychan <| val;
send(mychan, val);
}
// Dump lots of items into the channel so it has to grow.
// Don't trigger any assertions.
fn test_grow() {
let myport: port[record] = port();
let mychan = chan(myport);
let val: record = {val1: 0u32, val2: 0u32, val3: 0u32};
for each i: uint in uint::range(0u, 100u) { mychan <| val; }
let myport: comm::_port[record] = comm::mk_port();
let mychan = myport.mk_chan();
for each i: uint in uint::range(0u, 100u) {
let val: record = {val1: 0u32, val2: 0u32, val3: 0u32};
comm::send(mychan, val);
}
}
// Don't allow the buffer to shrink below it's original size
fn test_shrink1() {
let myport: port[i8] = port();
let mychan = chan(myport);
mychan <| 0i8;
let x;
myport |> x;
let myport = comm::mk_port[i8]();
let mychan = myport.mk_chan();
send(mychan, 0i8);
let x = myport.recv();
}
fn test_shrink2() {
let myport: port[record] = port();
let mychan = chan(myport);
let val: record = {val1: 0u32, val2: 0u32, val3: 0u32};
for each i: uint in uint::range(0u, 100u) { mychan <| val; }
for each i: uint in uint::range(0u, 100u) { let x; myport |> x; }
let myport = mk_port[record]();
let mychan = myport.mk_chan();
for each i: uint in uint::range(0u, 100u) {
let val: record = {val1: 0u32, val2: 0u32, val3: 0u32};
send(mychan, val);
}
for each i: uint in uint::range(0u, 100u) { let x = myport.recv(); }
}
// Test rotating the buffer when the unit size is not a power of two
fn test_rotate() {
let myport: port[record] = port();
let mychan = chan(myport);
for each i: uint in uint::range(0u, 100u) {
let myport = mk_port[record]();
let mychan = myport.mk_chan();
for each i: uint in uint::range(0u, 100u) {
let val = {val1: i as u32, val2: i as u32, val3: i as u32};
mychan <| val;
let x;
myport |> x;
send(mychan, val);
let x = myport.recv();
assert (x.val1 == i as u32);
assert (x.val2 == i as u32);
assert (x.val3 == i as u32);
@ -72,17 +74,16 @@ fn test_rotate() {
// Test rotating and growing the buffer when
// the unit size is not a power of two
fn test_rotate_grow() {
let myport: port[record] = port();
let mychan = chan(myport);
for each j: uint in uint::range(0u, 10u) {
for each i: uint in uint::range(0u, 10u) {
let myport = mk_port[record]();
let mychan = myport.mk_chan();
for each j: uint in uint::range(0u, 10u) {
for each i: uint in uint::range(0u, 10u) {
let val: record =
{val1: i as u32, val2: i as u32, val3: i as u32};
mychan <| val;
send(mychan, val);
}
for each i: uint in uint::range(0u, 10u) {
let x;
myport |> x;
for each i: uint in uint::range(0u, 10u) {
let x = myport.recv();
assert (x.val1 == i as u32);
assert (x.val2 == i as u32);
assert (x.val3 == i as u32);

View File

@ -1,8 +1,13 @@
use std;
import std::comm::_chan;
import std::comm::send;
import std::comm::mk_port;
// tests that ctrl's type gets inferred properly
type command[K, V] = {key: K, val: V};
fn cache_server[K, V](c: chan[chan[command[K, V]]]) {
let ctrl = port();
c <| chan(ctrl);
fn cache_server[K, V](c: _chan[_chan[command[K, V]]]) {
let ctrl = mk_port[_chan[command[K, V]]]();
send(c, ctrl.mk_chan());
}
fn main() { }

View File

@ -1,12 +1,12 @@
// xfail-stage1
// xfail-stage2
// xfail-stage3
use std;
import std::sio;
import std::task;
import std::net;
fn connectTask(cx: sio::ctx, ip: str, portnum: int) {
fn connectTask(cx: sio::ctx, ip: net::ip_addr, portnum: int) {
let client: sio::client;
client = sio::connect_to(cx, ip, portnum);
sio::close_client(client);
@ -14,10 +14,13 @@ fn connectTask(cx: sio::ctx, ip: str, portnum: int) {
fn main() {
let cx: sio::ctx = sio::new();
let srv: sio::server = sio::create_server(cx, "0.0.0.0", 9090);
let child: task = spawn connectTask(cx, "127.0.0.1", 9090);
let srv: sio::server = sio::create_server(cx,
net::parse_addr("0.0.0.0"), 9090);
let child = task::_spawn(bind connectTask(cx,
net::parse_addr("127.0.0.1"),
9090));
let client: sio::client = sio::accept_from(srv);
task::join(child);
task::join_id(child);
sio::close_client(client);
sio::close_server(srv);
sio::destroy(cx);

View File

@ -1,16 +1,15 @@
// -*- rust -*-
use std;
import std::comm;
// rustboot can't transmit nils across channels because they don't have
// any size, but rustc currently can because they do have size. Whether
// or not this is desirable I don't know, but here's a regression test.
fn main() {
let po: port[()] = port();
let ch: chan[()] = chan(po);
ch <| ();
let n: ();
po |> n;
let po: comm::_port[()] = comm::mk_port();
let ch: comm::_chan[()] = po.mk_chan();
comm::send(ch, ());
let n: () = po.recv();
assert (n == ());
}

View File

@ -1,15 +1,15 @@
use std;
import std::comm::mk_port;
import std::comm::send;
/*
This is about the simplest program that can successfully send a
message.
*/
fn main() {
let po: port[int] = port();
let ch: chan[int] = chan(po);
ch <| 42;
let r;
po |> r;
let po = mk_port[int]();
let ch = po.mk_chan();
send(ch, 42);
let r = po.recv();
log_err r;
}

12
src/test/stdtest/net.rs Normal file
View File

@ -0,0 +1,12 @@
use std;
import std::net;
#[test]
fn test_format_ip() {
assert(net::format_addr(net::ipv4(127u8,0u8,0u8,1u8)) == "127.0.0.1")
}
#[test]
fn test_parse_ip() {
assert(net::parse_addr("127.0.0.1") == net::ipv4(127u8,0u8,0u8,1u8));
}

View File

@ -12,6 +12,7 @@ mod io;
mod ivec;
mod list;
mod map;
mod net;
mod option;
mod os;
mod path;

View File

@ -25,14 +25,6 @@ fn test_join() {
assert (task::join_id(failtask) == task::tr_failure);
}
#[test]
fn test_send_recv() {
let p = port[int]();
let c = chan(p);
task::send(c, 10);
assert (task::recv(p) == 10);
}
#[test]
fn test_lib_spawn() {
fn foo() { log_err "Hello, World!"; }