mirror of
https://github.com/rust-lang/rust.git
synced 2025-02-20 19:04:21 +00:00
moving new uv stuff into uv.rs and rust_uv.cpp
- removing the remains of uvtmp.rs and rust_uvtmp.rs - removing the displaced, low-level libuv bindings in uv.rs and rust_uv.cpp
This commit is contained in:
parent
cf08ed6458
commit
bb5960aa57
@ -7,7 +7,7 @@
|
||||
#[license = "MIT"];
|
||||
#[crate_type = "lib"];
|
||||
|
||||
export fs, io, net, run, uv, uvtmp;
|
||||
export fs, io, net, run, uv;
|
||||
export c_vec, four, tri, util;
|
||||
export bitv, deque, fun_treemap, list, map, smallintmap, sort, treemap, ufind;
|
||||
export rope;
|
||||
@ -25,7 +25,6 @@ mod net;
|
||||
#[path = "run_program.rs"]
|
||||
mod run;
|
||||
mod uv;
|
||||
mod uvtmp;
|
||||
|
||||
|
||||
// Utility modules
|
||||
|
682
src/libstd/uv.rs
682
src/libstd/uv.rs
@ -1,188 +1,558 @@
|
||||
/*
|
||||
This is intended to be a low-level binding to libuv that very closely mimics
|
||||
the C libuv API. Does very little right now pending scheduler improvements.
|
||||
*/
|
||||
export loop_new, run, close, run_in_bg, async_init, async_send,
|
||||
timer_init, timer_start, timer_stop;
|
||||
|
||||
export sanity_check;
|
||||
export loop_t, idle_t;
|
||||
export loop_new, loop_delete, default_loop, run, unref;
|
||||
export idle_init, idle_start;
|
||||
export idle_new;
|
||||
|
||||
import core::ctypes;
|
||||
|
||||
#[link_name = "rustrt"]
|
||||
native mod uv {
|
||||
fn rust_uv_loop_new() -> *loop_t;
|
||||
fn rust_uv_loop_delete(loop: *loop_t);
|
||||
fn rust_uv_default_loop() -> *loop_t;
|
||||
fn rust_uv_run(loop: *loop_t) -> ctypes::c_int;
|
||||
fn rust_uv_unref(loop: *loop_t);
|
||||
fn rust_uv_idle_init(loop: *loop_t, idle: *idle_t) -> ctypes::c_int;
|
||||
fn rust_uv_idle_start(idle: *idle_t, cb: idle_cb) -> ctypes::c_int;
|
||||
// these are processed solely in the
|
||||
// process_operation() crust fn below
|
||||
enum uv_operation {
|
||||
op_async_init([u8]),
|
||||
op_close(uv_handle, *ctypes::void),
|
||||
op_timer_init([u8]),
|
||||
op_timer_start([u8], *ctypes::void, u32, u32),
|
||||
op_timer_stop([u8], *ctypes::void, fn~(uv_handle))
|
||||
}
|
||||
|
||||
#[link_name = "rustrt"]
|
||||
native mod helpers {
|
||||
fn rust_uv_size_of_idle_t() -> ctypes::size_t;
|
||||
enum uv_handle {
|
||||
uv_async([u8], uv_loop),
|
||||
uv_timer([u8], uv_loop)
|
||||
}
|
||||
|
||||
type opaque_cb = *ctypes::void;
|
||||
enum uv_msg {
|
||||
// requests from library users
|
||||
msg_run(comm::chan<bool>),
|
||||
msg_run_in_bg(),
|
||||
msg_async_init(fn~(uv_handle), fn~(uv_handle)),
|
||||
msg_async_send([u8]),
|
||||
msg_close(uv_handle, fn~()),
|
||||
msg_timer_init(fn~(uv_handle)),
|
||||
msg_timer_start([u8], u32, u32, fn~(uv_handle)),
|
||||
msg_timer_stop([u8], fn~(uv_handle)),
|
||||
|
||||
type handle_type = ctypes::enum;
|
||||
// dispatches from libuv
|
||||
uv_async_init([u8], *ctypes::void),
|
||||
uv_async_send([u8]),
|
||||
uv_close([u8]),
|
||||
uv_timer_init([u8], *ctypes::void),
|
||||
uv_timer_call([u8]),
|
||||
uv_timer_stop([u8], fn~(uv_handle)),
|
||||
uv_end()
|
||||
}
|
||||
|
||||
type close_cb = opaque_cb;
|
||||
type idle_cb = opaque_cb;
|
||||
|
||||
type handle_private_fields = {
|
||||
a00: ctypes::c_int,
|
||||
a01: ctypes::c_int,
|
||||
a02: ctypes::c_int,
|
||||
a03: ctypes::c_int,
|
||||
a04: ctypes::c_int,
|
||||
a05: ctypes::c_int,
|
||||
a06: int,
|
||||
a07: int,
|
||||
a08: int,
|
||||
a09: int,
|
||||
a10: int,
|
||||
a11: int,
|
||||
a12: int
|
||||
type uv_loop_data = {
|
||||
operation_port: comm::port<uv_operation>,
|
||||
rust_loop_chan: comm::chan<uv_msg>
|
||||
};
|
||||
|
||||
type handle_fields = {
|
||||
loop: *loop_t,
|
||||
type_: handle_type,
|
||||
close_cb: close_cb,
|
||||
data: *ctypes::void,
|
||||
private: handle_private_fields
|
||||
};
|
||||
type uv_loop = comm::chan<uv_msg>;
|
||||
|
||||
type handle_t = {
|
||||
fields: handle_fields
|
||||
};
|
||||
|
||||
type loop_t = int;
|
||||
|
||||
|
||||
|
||||
|
||||
type idle_t = {
|
||||
fields: handle_fields
|
||||
/* private: idle_private_fields */
|
||||
};
|
||||
|
||||
fn idle_init(loop: *loop_t, idle: *idle_t) -> ctypes::c_int {
|
||||
uv::rust_uv_idle_init(loop, idle)
|
||||
#[nolink]
|
||||
native mod rustrt {
|
||||
fn rust_uv_loop_new() -> *ctypes::void;
|
||||
fn rust_uv_loop_set_data(
|
||||
loop: *ctypes::void,
|
||||
data: *uv_loop_data);
|
||||
fn rust_uv_bind_op_cb(loop: *ctypes::void, cb: *u8)
|
||||
-> *ctypes::void;
|
||||
fn rust_uv_stop_op_cb(handle: *ctypes::void);
|
||||
fn rust_uv_run(loop_handle: *ctypes::void);
|
||||
fn rust_uv_close(handle: *ctypes::void, cb: *u8);
|
||||
fn rust_uv_close_async(handle: *ctypes::void);
|
||||
fn rust_uv_close_timer(handle: *ctypes::void);
|
||||
fn rust_uv_async_send(handle: *ctypes::void);
|
||||
fn rust_uv_async_init(
|
||||
loop_handle: *ctypes::void,
|
||||
cb: *u8,
|
||||
id: *u8) -> *ctypes::void;
|
||||
fn rust_uv_timer_init(
|
||||
loop_handle: *ctypes::void,
|
||||
cb: *u8,
|
||||
id: *u8) -> *ctypes::void;
|
||||
fn rust_uv_timer_start(
|
||||
timer_handle: *ctypes::void,
|
||||
timeout: ctypes::c_uint,
|
||||
repeat: ctypes::c_uint);
|
||||
fn rust_uv_timer_stop(handle: *ctypes::void);
|
||||
}
|
||||
|
||||
fn idle_start(idle: *idle_t, cb: idle_cb) -> ctypes::c_int {
|
||||
uv::rust_uv_idle_start(idle, cb)
|
||||
}
|
||||
// public functions
|
||||
fn loop_new() -> uv_loop unsafe {
|
||||
let ret_recv_port: comm::port<uv_loop> =
|
||||
comm::port();
|
||||
let ret_recv_chan: comm::chan<uv_loop> =
|
||||
comm::chan(ret_recv_port);
|
||||
|
||||
task::spawn_sched(task::manual_threads(4u)) {||
|
||||
// our beloved uv_loop_t ptr
|
||||
let loop_handle = rustrt::
|
||||
rust_uv_loop_new();
|
||||
|
||||
// this port/chan pair are used to send messages to
|
||||
// libuv. libuv processes any pending messages on the
|
||||
// port (via crust) after receiving an async "wakeup"
|
||||
// on a special uv_async_t handle created below
|
||||
let operation_port = comm::port::<uv_operation>();
|
||||
let operation_chan = comm::chan::<uv_operation>(
|
||||
operation_port);
|
||||
|
||||
// this port/chan pair as used in the while() loop
|
||||
// below. It takes dispatches, originating from libuv
|
||||
// callbacks, to invoke handles registered by the
|
||||
// user
|
||||
let rust_loop_port = comm::port::<uv_msg>();
|
||||
let rust_loop_chan =
|
||||
comm::chan::<uv_msg>(rust_loop_port);
|
||||
// let the task-spawner return
|
||||
comm::send(ret_recv_chan, copy(rust_loop_chan));
|
||||
|
||||
fn default_loop() -> *loop_t {
|
||||
uv::rust_uv_default_loop()
|
||||
}
|
||||
// create our "special" async handle that will
|
||||
// allow all operations against libuv to be
|
||||
// "buffered" in the operation_port, for processing
|
||||
// from the thread that libuv runs on
|
||||
let loop_data: uv_loop_data = {
|
||||
operation_port: operation_port,
|
||||
rust_loop_chan: rust_loop_chan
|
||||
};
|
||||
rustrt::rust_uv_loop_set_data(
|
||||
loop_handle,
|
||||
ptr::addr_of(loop_data)); // pass an opaque C-ptr
|
||||
// to libuv, this will be
|
||||
// in the process_operation
|
||||
// crust fn
|
||||
let op_handle = rustrt::rust_uv_bind_op_cb(
|
||||
loop_handle,
|
||||
process_operation);
|
||||
|
||||
fn loop_new() -> *loop_t {
|
||||
uv::rust_uv_loop_new()
|
||||
}
|
||||
// all state goes here
|
||||
let handles: map::map<[u8], *ctypes::void> =
|
||||
map::new_bytes_hash();
|
||||
let id_to_handle: map::map<[u8], uv_handle> =
|
||||
map::new_bytes_hash();
|
||||
let after_cbs: map::map<[u8], fn~(uv_handle)> =
|
||||
map::new_bytes_hash();
|
||||
let close_callbacks: map::map<[u8], fn~()> =
|
||||
map::new_bytes_hash();
|
||||
let async_cbs: map::map<[u8], fn~(uv_handle)> =
|
||||
map::new_bytes_hash();
|
||||
let timer_cbs: map::map<[u8], fn~(uv_handle)> =
|
||||
map::new_bytes_hash();
|
||||
|
||||
fn loop_delete(loop: *loop_t) {
|
||||
uv::rust_uv_loop_delete(loop)
|
||||
}
|
||||
// the main loop that this task blocks on.
|
||||
// should have the same lifetime as the C libuv
|
||||
// event loop.
|
||||
let keep_going = true;
|
||||
while (keep_going) {
|
||||
alt comm::recv(rust_loop_port) {
|
||||
msg_run(end_chan) {
|
||||
// start the libuv event loop
|
||||
// we'll also do a uv_async_send with
|
||||
// the operation handle to have the
|
||||
// loop process any pending operations
|
||||
// once its up and running
|
||||
task::spawn_sched(task::manual_threads(1u)) {||
|
||||
// this call blocks
|
||||
rustrt::rust_uv_run(loop_handle);
|
||||
// when we're done, msg the
|
||||
// end chan
|
||||
rustrt::rust_uv_stop_op_cb(op_handle);
|
||||
comm::send(end_chan, true);
|
||||
comm::send(rust_loop_chan, uv_end);
|
||||
};
|
||||
}
|
||||
|
||||
fn run(loop: *loop_t) -> ctypes::c_int {
|
||||
uv::rust_uv_run(loop)
|
||||
}
|
||||
msg_run_in_bg {
|
||||
task::spawn_sched(task::manual_threads(1u)) {||
|
||||
// this call blocks
|
||||
rustrt::rust_uv_run(loop_handle);
|
||||
};
|
||||
}
|
||||
|
||||
fn unref(loop: *loop_t) {
|
||||
uv::rust_uv_unref(loop)
|
||||
}
|
||||
msg_close(handle, cb) {
|
||||
let id = get_id_from_handle(handle);
|
||||
close_callbacks.insert(id, cb);
|
||||
let handle_ptr = handles.get(id);
|
||||
let op = op_close(handle, handle_ptr);
|
||||
|
||||
pass_to_libuv(op_handle, operation_chan, op);
|
||||
}
|
||||
uv_close(id) {
|
||||
handles.remove(id);
|
||||
let handle = id_to_handle.get(id);
|
||||
id_to_handle.remove(id);
|
||||
alt handle {
|
||||
uv_async(id, _) {
|
||||
async_cbs.remove(id);
|
||||
}
|
||||
uv_timer(id, _) {
|
||||
timer_cbs.remove(id);
|
||||
}
|
||||
_ {
|
||||
fail "unknown form of uv_handle encountered "
|
||||
+ "in uv_close handler";
|
||||
}
|
||||
}
|
||||
let cb = close_callbacks.get(id);
|
||||
close_callbacks.remove(id);
|
||||
task::spawn {||
|
||||
cb();
|
||||
};
|
||||
}
|
||||
|
||||
fn sanity_check() {
|
||||
fn check_size(t: str, uv: ctypes::size_t, rust: ctypes::size_t) {
|
||||
#debug("size of %s: uv: %u, rust: %u", t, uv, rust);
|
||||
assert uv <= rust;
|
||||
}
|
||||
check_size("idle_t",
|
||||
helpers::rust_uv_size_of_idle_t(),
|
||||
sys::size_of::<idle_t>());
|
||||
}
|
||||
msg_async_init(callback, after_cb) {
|
||||
// create a new async handle
|
||||
// with the id as the handle's
|
||||
// data and save the callback for
|
||||
// invocation on msg_async_send
|
||||
let id = gen_handle_id();
|
||||
async_cbs.insert(id, callback);
|
||||
after_cbs.insert(id, after_cb);
|
||||
let op = op_async_init(id);
|
||||
pass_to_libuv(op_handle, operation_chan, op);
|
||||
}
|
||||
uv_async_init(id, async_handle) {
|
||||
// libuv created a handle, which is
|
||||
// passed back to us. save it and
|
||||
// then invoke the supplied callback
|
||||
// for after completion
|
||||
handles.insert(id, async_handle);
|
||||
let after_cb = after_cbs.get(id);
|
||||
after_cbs.remove(id);
|
||||
let async = uv_async(id, rust_loop_chan);
|
||||
id_to_handle.insert(id, copy(async));
|
||||
task::spawn {||
|
||||
after_cb(async);
|
||||
};
|
||||
}
|
||||
|
||||
fn handle_fields_new() -> handle_fields {
|
||||
{
|
||||
loop: ptr::null(),
|
||||
type_: 0u32,
|
||||
close_cb: ptr::null(),
|
||||
data: ptr::null(),
|
||||
private: {
|
||||
a00: 0i32,
|
||||
a01: 0i32,
|
||||
a02: 0i32,
|
||||
a03: 0i32,
|
||||
a04: 0i32,
|
||||
a05: 0i32,
|
||||
a06: 0,
|
||||
a07: 0,
|
||||
a08: 0,
|
||||
a09: 0,
|
||||
a10: 0,
|
||||
a11: 0,
|
||||
a12: 0
|
||||
msg_async_send(id) {
|
||||
let async_handle = handles.get(id);
|
||||
do_send(async_handle);
|
||||
}
|
||||
uv_async_send(id) {
|
||||
let async_cb = async_cbs.get(id);
|
||||
task::spawn {||
|
||||
async_cb(uv_async(id, rust_loop_chan));
|
||||
};
|
||||
}
|
||||
|
||||
msg_timer_init(after_cb) {
|
||||
let id = gen_handle_id();
|
||||
after_cbs.insert(id, after_cb);
|
||||
let op = op_timer_init(id);
|
||||
pass_to_libuv(op_handle, operation_chan, op);
|
||||
}
|
||||
uv_timer_init(id, handle) {
|
||||
handles.insert(id, handle);
|
||||
let after_cb = after_cbs.get(id);
|
||||
after_cbs.remove(id);
|
||||
let new_timer = uv_timer(id, rust_loop_chan);
|
||||
id_to_handle.insert(id, copy(new_timer));
|
||||
task::spawn {||
|
||||
after_cb(new_timer);
|
||||
};
|
||||
}
|
||||
|
||||
uv_timer_call(id) {
|
||||
let cb = timer_cbs.get(id);
|
||||
let the_timer = id_to_handle.get(id);
|
||||
task::spawn {||
|
||||
cb(the_timer);
|
||||
};
|
||||
}
|
||||
|
||||
msg_timer_start(id, timeout, repeat, timer_call_cb) {
|
||||
timer_cbs.insert(id, timer_call_cb);
|
||||
let handle = handles.get(id);
|
||||
let op = op_timer_start(id, handle, timeout,
|
||||
repeat);
|
||||
pass_to_libuv(op_handle, operation_chan, op);
|
||||
}
|
||||
|
||||
msg_timer_stop(id, after_cb) {
|
||||
let handle = handles.get(id);
|
||||
let op = op_timer_stop(id, handle, after_cb);
|
||||
pass_to_libuv(op_handle, operation_chan, op);
|
||||
}
|
||||
uv_timer_stop(id, after_cb) {
|
||||
let the_timer = id_to_handle.get(id);
|
||||
after_cb(the_timer);
|
||||
}
|
||||
|
||||
uv_end() {
|
||||
keep_going = false;
|
||||
}
|
||||
|
||||
_ { fail "unknown form of uv_msg received"; }
|
||||
}
|
||||
}
|
||||
};
|
||||
ret comm::recv(ret_recv_port);
|
||||
}
|
||||
|
||||
fn run(loop: uv_loop) {
|
||||
let end_port = comm::port::<bool>();
|
||||
let end_chan = comm::chan::<bool>(end_port);
|
||||
comm::send(loop, msg_run(end_chan));
|
||||
comm::recv(end_port);
|
||||
}
|
||||
|
||||
fn run_in_bg(loop: uv_loop) {
|
||||
comm::send(loop, msg_run_in_bg);
|
||||
}
|
||||
|
||||
fn async_init (
|
||||
loop: uv_loop,
|
||||
async_cb: fn~(uv_handle),
|
||||
after_cb: fn~(uv_handle)) {
|
||||
let msg = msg_async_init(async_cb, after_cb);
|
||||
comm::send(loop, msg);
|
||||
}
|
||||
|
||||
fn async_send(async: uv_handle) {
|
||||
alt async {
|
||||
uv_async(id, loop) {
|
||||
comm::send(loop, msg_async_send(id));
|
||||
}
|
||||
_ {
|
||||
fail "attempting to call async_send() with a" +
|
||||
" uv_async uv_handle";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn idle_new() -> idle_t {
|
||||
{
|
||||
fields: handle_fields_new()
|
||||
fn close(h: uv_handle, cb: fn~()) {
|
||||
let loop_chan = get_loop_chan_from_handle(h);
|
||||
comm::send(loop_chan, msg_close(h, cb));
|
||||
}
|
||||
|
||||
fn timer_init(loop: uv_loop, after_cb: fn~(uv_handle)) {
|
||||
let msg = msg_timer_init(after_cb);
|
||||
comm::send(loop, msg);
|
||||
}
|
||||
|
||||
fn timer_start(the_timer: uv_handle, timeout: u32, repeat:u32,
|
||||
timer_cb: fn~(uv_handle)) {
|
||||
alt the_timer {
|
||||
uv_timer(id, loop_chan) {
|
||||
let msg = msg_timer_start(id, timeout, repeat, timer_cb);
|
||||
comm::send(loop_chan, msg);
|
||||
}
|
||||
_ {
|
||||
fail "can only pass a uv_timer form of uv_handle to "+
|
||||
" uv::timer_start()";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_sanity_check() {
|
||||
sanity_check();
|
||||
}
|
||||
|
||||
// From test-ref.c
|
||||
mod test_ref {
|
||||
|
||||
#[test]
|
||||
fn ref() {
|
||||
let loop = loop_new();
|
||||
run(loop);
|
||||
loop_delete(loop);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn idle_ref() {
|
||||
let loop = loop_new();
|
||||
let h = idle_new();
|
||||
idle_init(loop, ptr::addr_of(h));
|
||||
idle_start(ptr::addr_of(h), ptr::null());
|
||||
unref(loop);
|
||||
run(loop);
|
||||
loop_delete(loop);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn async_ref() {
|
||||
/*
|
||||
let loop = loop_new();
|
||||
let h = async_new();
|
||||
async_init(loop, ptr::addr_of(h), ptr::null());
|
||||
unref(loop);
|
||||
run(loop);
|
||||
loop_delete(loop);
|
||||
*/
|
||||
}
|
||||
fn timer_stop(the_timer: uv_handle, after_cb: fn~(uv_handle)) {
|
||||
alt the_timer {
|
||||
uv_timer(id, loop_chan) {
|
||||
let msg = msg_timer_stop(id, after_cb);
|
||||
comm::send(loop_chan, msg);
|
||||
}
|
||||
_ {
|
||||
fail "only uv_timer form is allowed in calls to "+
|
||||
" uv::timer_stop()";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// internal functions
|
||||
fn pass_to_libuv(
|
||||
op_handle: *ctypes::void,
|
||||
operation_chan: comm::chan<uv_operation>,
|
||||
op: uv_operation) unsafe {
|
||||
comm::send(operation_chan, copy(op));
|
||||
do_send(op_handle);
|
||||
}
|
||||
fn do_send(h: *ctypes::void) {
|
||||
rustrt::rust_uv_async_send(h);
|
||||
}
|
||||
fn gen_handle_id() -> [u8] {
|
||||
ret rand::mk_rng().gen_bytes(16u);
|
||||
}
|
||||
fn get_handle_id_from(buf: *u8) -> [u8] unsafe {
|
||||
ret vec::unsafe::from_buf(buf, 16u);
|
||||
}
|
||||
|
||||
fn get_loop_chan_from_data(data: *uv_loop_data)
|
||||
-> uv_loop unsafe {
|
||||
ret (*data).rust_loop_chan;
|
||||
}
|
||||
|
||||
fn get_loop_chan_from_handle(handle: uv_handle)
|
||||
-> uv_loop {
|
||||
alt handle {
|
||||
uv_async(id,loop) | uv_timer(id,loop) {
|
||||
ret loop;
|
||||
}
|
||||
_ {
|
||||
fail "unknown form of uv_handle for get_loop_chan_from "
|
||||
+ " handle";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn get_id_from_handle(handle: uv_handle) -> [u8] {
|
||||
alt handle {
|
||||
uv_async(id,loop) | uv_timer(id,loop) {
|
||||
ret id;
|
||||
}
|
||||
_ {
|
||||
fail "unknown form of uv_handle for get_id_from handle";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// crust
|
||||
crust fn process_operation(
|
||||
loop: *ctypes::void,
|
||||
data: *uv_loop_data) unsafe {
|
||||
let op_port = (*data).operation_port;
|
||||
let loop_chan = get_loop_chan_from_data(data);
|
||||
let op_pending = comm::peek(op_port);
|
||||
while(op_pending) {
|
||||
alt comm::recv(op_port) {
|
||||
op_async_init(id) {
|
||||
let id_ptr = vec::unsafe::to_ptr(id);
|
||||
let async_handle = rustrt::rust_uv_async_init(
|
||||
loop,
|
||||
process_async_send,
|
||||
id_ptr);
|
||||
comm::send(loop_chan, uv_async_init(
|
||||
id,
|
||||
async_handle));
|
||||
}
|
||||
op_close(handle, handle_ptr) {
|
||||
handle_op_close(handle, handle_ptr);
|
||||
}
|
||||
op_timer_init(id) {
|
||||
let id_ptr = vec::unsafe::to_ptr(id);
|
||||
let timer_handle = rustrt::rust_uv_timer_init(
|
||||
loop,
|
||||
process_timer_call,
|
||||
id_ptr);
|
||||
comm::send(loop_chan, uv_timer_init(
|
||||
id,
|
||||
timer_handle));
|
||||
}
|
||||
op_timer_start(id, handle, timeout, repeat) {
|
||||
rustrt::rust_uv_timer_start(handle, timeout,
|
||||
repeat);
|
||||
}
|
||||
op_timer_stop(id, handle, after_cb) {
|
||||
rustrt::rust_uv_timer_stop(handle);
|
||||
comm::send(loop_chan, uv_timer_stop(id, after_cb));
|
||||
}
|
||||
_ { fail "unknown form of uv_operation received"; }
|
||||
}
|
||||
op_pending = comm::peek(op_port);
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_op_close(handle: uv_handle, handle_ptr: *ctypes::void) {
|
||||
// it's just like im doing C
|
||||
alt handle {
|
||||
uv_async(id, loop) {
|
||||
let cb = process_close_async;
|
||||
rustrt::rust_uv_close(
|
||||
handle_ptr, cb);
|
||||
}
|
||||
uv_timer(id, loop) {
|
||||
let cb = process_close_timer;
|
||||
rustrt::rust_uv_close(
|
||||
handle_ptr, cb);
|
||||
}
|
||||
_ {
|
||||
fail "unknown form of uv_handle encountered " +
|
||||
"in process_operation/op_close";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
crust fn process_async_send(id_buf: *u8, data: *uv_loop_data)
|
||||
unsafe {
|
||||
let handle_id = get_handle_id_from(id_buf);
|
||||
let loop_chan = get_loop_chan_from_data(data);
|
||||
comm::send(loop_chan, uv_async_send(handle_id));
|
||||
}
|
||||
|
||||
crust fn process_timer_call(id_buf: *u8, data: *uv_loop_data)
|
||||
unsafe {
|
||||
let handle_id = get_handle_id_from(id_buf);
|
||||
let loop_chan = get_loop_chan_from_data(data);
|
||||
comm::send(loop_chan, uv_timer_call(handle_id));
|
||||
}
|
||||
|
||||
fn process_close_common(id: [u8], data: *uv_loop_data)
|
||||
unsafe {
|
||||
// notify the rust loop that their handle is closed, then
|
||||
// the caller will invoke a per-handle-type c++ func to
|
||||
// free allocated memory
|
||||
let loop_chan = get_loop_chan_from_data(data);
|
||||
comm::send(loop_chan, uv_close(id));
|
||||
}
|
||||
|
||||
crust fn process_close_async(
|
||||
id_buf: *u8,
|
||||
handle_ptr: *ctypes::void,
|
||||
data: *uv_loop_data)
|
||||
unsafe {
|
||||
let id = get_handle_id_from(id_buf);
|
||||
rustrt::rust_uv_close_async(handle_ptr);
|
||||
// at this point, the handle and its data has been
|
||||
// released. notify the rust loop to remove the
|
||||
// handle and its data and call the user-supplied
|
||||
// close cb
|
||||
process_close_common(id, data);
|
||||
}
|
||||
|
||||
crust fn process_close_timer(
|
||||
id_buf: *u8,
|
||||
handle_ptr: *ctypes::void,
|
||||
data: *uv_loop_data)
|
||||
unsafe {
|
||||
let id = get_handle_id_from(id_buf);
|
||||
rustrt::rust_uv_close_timer(handle_ptr);
|
||||
process_close_common(id, data);
|
||||
}
|
||||
|
||||
|
||||
#[test]
|
||||
fn test_uv_new_loop_no_handles() {
|
||||
let test_loop = uv::loop_new();
|
||||
run(test_loop); // this should return immediately
|
||||
// since there aren't any handles..
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_uv_simple_async() {
|
||||
let test_loop = loop_new();
|
||||
let exit_port = comm::port::<bool>();
|
||||
let exit_chan = comm::chan::<bool>(exit_port);
|
||||
async_init(test_loop, {|new_async|
|
||||
close(new_async) {||
|
||||
comm::send(exit_chan, true);
|
||||
};
|
||||
}, {|new_async|
|
||||
async_send(new_async);
|
||||
});
|
||||
run(test_loop);
|
||||
assert comm::recv(exit_port);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_uv_timer() {
|
||||
let test_loop = loop_new();
|
||||
let exit_port = comm::port::<bool>();
|
||||
let exit_chan = comm::chan::<bool>(exit_port);
|
||||
timer_init(test_loop) {|new_timer|
|
||||
timer_start(new_timer, 1u32, 0u32) {|started_timer|
|
||||
timer_stop(started_timer) {|stopped_timer|
|
||||
close(stopped_timer) {||
|
||||
comm::send(exit_chan, true);
|
||||
};
|
||||
};
|
||||
};
|
||||
};
|
||||
run(test_loop);
|
||||
assert comm::recv(exit_port);
|
||||
}
|
@ -1,730 +0,0 @@
|
||||
// Some temporary libuv hacks for servo
|
||||
|
||||
// UV2
|
||||
|
||||
// these are processed solely in the
|
||||
// process_operation() crust fn below
|
||||
enum uv_operation {
|
||||
op_async_init([u8]),
|
||||
op_close(uv_handle, *ctypes::void),
|
||||
op_timer_init([u8]),
|
||||
op_timer_start([u8], *ctypes::void, u32, u32),
|
||||
op_timer_stop([u8], *ctypes::void, fn~(uv_handle))
|
||||
}
|
||||
|
||||
enum uv_handle {
|
||||
uv_async([u8], uv_loop),
|
||||
uv_timer([u8], uv_loop)
|
||||
}
|
||||
|
||||
enum uv_msg {
|
||||
// requests from library users
|
||||
msg_run(comm::chan<bool>),
|
||||
msg_run_in_bg(),
|
||||
msg_async_init(fn~(uv_handle), fn~(uv_handle)),
|
||||
msg_async_send([u8]),
|
||||
msg_close(uv_handle, fn~()),
|
||||
msg_timer_init(fn~(uv_handle)),
|
||||
msg_timer_start([u8], u32, u32, fn~(uv_handle)),
|
||||
msg_timer_stop([u8], fn~(uv_handle)),
|
||||
|
||||
// dispatches from libuv
|
||||
uv_async_init([u8], *ctypes::void),
|
||||
uv_async_send([u8]),
|
||||
uv_close([u8]),
|
||||
uv_timer_init([u8], *ctypes::void),
|
||||
uv_timer_call([u8]),
|
||||
uv_timer_stop([u8], fn~(uv_handle)),
|
||||
uv_end()
|
||||
}
|
||||
|
||||
type uv_loop_data = {
|
||||
operation_port: comm::port<uv_operation>,
|
||||
rust_loop_chan: comm::chan<uv_msg>
|
||||
};
|
||||
|
||||
type uv_loop = comm::chan<uv_msg>;
|
||||
|
||||
#[nolink]
|
||||
native mod rustrt {
|
||||
fn rust_uvtmp_create_thread() -> thread;
|
||||
fn rust_uvtmp_start_thread(thread: thread);
|
||||
fn rust_uvtmp_join_thread(thread: thread);
|
||||
fn rust_uvtmp_delete_thread(thread: thread);
|
||||
fn rust_uvtmp_connect(
|
||||
thread: thread,
|
||||
req_id: u32,
|
||||
ip: str::sbuf,
|
||||
chan: comm::chan<iomsg>) -> connect_data;
|
||||
fn rust_uvtmp_close_connection(thread: thread, req_id: u32);
|
||||
fn rust_uvtmp_write(
|
||||
thread: thread,
|
||||
req_id: u32,
|
||||
buf: *u8,
|
||||
len: ctypes::size_t,
|
||||
chan: comm::chan<iomsg>);
|
||||
fn rust_uvtmp_read_start(
|
||||
thread: thread,
|
||||
req_id: u32,
|
||||
chan: comm::chan<iomsg>);
|
||||
fn rust_uvtmp_timer(
|
||||
thread: thread,
|
||||
timeout: u32,
|
||||
req_id: u32,
|
||||
chan: comm::chan<iomsg>);
|
||||
fn rust_uvtmp_delete_buf(buf: *u8);
|
||||
fn rust_uvtmp_get_req_id(cd: connect_data) -> u32;
|
||||
|
||||
fn rust_uvtmp_uv_loop_new() -> *ctypes::void;
|
||||
fn rust_uvtmp_uv_loop_set_data(
|
||||
loop: *ctypes::void,
|
||||
data: *uv_loop_data);
|
||||
fn rust_uvtmp_uv_bind_op_cb(loop: *ctypes::void, cb: *u8)
|
||||
-> *ctypes::void;
|
||||
fn rust_uvtmp_uv_stop_op_cb(handle: *ctypes::void);
|
||||
fn rust_uvtmp_uv_run(loop_handle: *ctypes::void);
|
||||
fn rust_uvtmp_uv_close(handle: *ctypes::void, cb: *u8);
|
||||
fn rust_uvtmp_uv_close_async(handle: *ctypes::void);
|
||||
fn rust_uvtmp_uv_close_timer(handle: *ctypes::void);
|
||||
fn rust_uvtmp_uv_async_send(handle: *ctypes::void);
|
||||
fn rust_uvtmp_uv_async_init(
|
||||
loop_handle: *ctypes::void,
|
||||
cb: *u8,
|
||||
id: *u8) -> *ctypes::void;
|
||||
fn rust_uvtmp_uv_timer_init(
|
||||
loop_handle: *ctypes::void,
|
||||
cb: *u8,
|
||||
id: *u8) -> *ctypes::void;
|
||||
fn rust_uvtmp_uv_timer_start(
|
||||
timer_handle: *ctypes::void,
|
||||
timeout: ctypes::c_uint,
|
||||
repeat: ctypes::c_uint);
|
||||
fn rust_uvtmp_uv_timer_stop(handle: *ctypes::void);
|
||||
}
|
||||
|
||||
mod uv {
|
||||
export loop_new, run, close, run_in_bg, async_init, async_send,
|
||||
timer_init, timer_start, timer_stop;
|
||||
|
||||
// public functions
|
||||
fn loop_new() -> uv_loop unsafe {
|
||||
let ret_recv_port: comm::port<uv_loop> =
|
||||
comm::port();
|
||||
let ret_recv_chan: comm::chan<uv_loop> =
|
||||
comm::chan(ret_recv_port);
|
||||
|
||||
task::spawn_sched(task::manual_threads(4u)) {||
|
||||
// our beloved uv_loop_t ptr
|
||||
let loop_handle = rustrt::
|
||||
rust_uvtmp_uv_loop_new();
|
||||
|
||||
// this port/chan pair are used to send messages to
|
||||
// libuv. libuv processes any pending messages on the
|
||||
// port (via crust) after receiving an async "wakeup"
|
||||
// on a special uv_async_t handle created below
|
||||
let operation_port = comm::port::<uv_operation>();
|
||||
let operation_chan = comm::chan::<uv_operation>(
|
||||
operation_port);
|
||||
|
||||
// this port/chan pair as used in the while() loop
|
||||
// below. It takes dispatches, originating from libuv
|
||||
// callbacks, to invoke handles registered by the
|
||||
// user
|
||||
let rust_loop_port = comm::port::<uv_msg>();
|
||||
let rust_loop_chan =
|
||||
comm::chan::<uv_msg>(rust_loop_port);
|
||||
// let the task-spawner return
|
||||
comm::send(ret_recv_chan, copy(rust_loop_chan));
|
||||
|
||||
// create our "special" async handle that will
|
||||
// allow all operations against libuv to be
|
||||
// "buffered" in the operation_port, for processing
|
||||
// from the thread that libuv runs on
|
||||
let loop_data: uv_loop_data = {
|
||||
operation_port: operation_port,
|
||||
rust_loop_chan: rust_loop_chan
|
||||
};
|
||||
rustrt::rust_uvtmp_uv_loop_set_data(
|
||||
loop_handle,
|
||||
ptr::addr_of(loop_data)); // pass an opaque C-ptr
|
||||
// to libuv, this will be
|
||||
// in the process_operation
|
||||
// crust fn
|
||||
let op_handle = rustrt::rust_uvtmp_uv_bind_op_cb(
|
||||
loop_handle,
|
||||
process_operation);
|
||||
|
||||
// all state goes here
|
||||
let handles: map::map<[u8], *ctypes::void> =
|
||||
map::new_bytes_hash();
|
||||
let id_to_handle: map::map<[u8], uv_handle> =
|
||||
map::new_bytes_hash();
|
||||
let after_cbs: map::map<[u8], fn~(uv_handle)> =
|
||||
map::new_bytes_hash();
|
||||
let close_callbacks: map::map<[u8], fn~()> =
|
||||
map::new_bytes_hash();
|
||||
let async_cbs: map::map<[u8], fn~(uv_handle)> =
|
||||
map::new_bytes_hash();
|
||||
let timer_cbs: map::map<[u8], fn~(uv_handle)> =
|
||||
map::new_bytes_hash();
|
||||
|
||||
// the main loop that this task blocks on.
|
||||
// should have the same lifetime as the C libuv
|
||||
// event loop.
|
||||
let keep_going = true;
|
||||
while (keep_going) {
|
||||
alt comm::recv(rust_loop_port) {
|
||||
msg_run(end_chan) {
|
||||
// start the libuv event loop
|
||||
// we'll also do a uv_async_send with
|
||||
// the operation handle to have the
|
||||
// loop process any pending operations
|
||||
// once its up and running
|
||||
task::spawn_sched(task::manual_threads(1u)) {||
|
||||
// this call blocks
|
||||
rustrt::rust_uvtmp_uv_run(loop_handle);
|
||||
// when we're done, msg the
|
||||
// end chan
|
||||
rustrt::rust_uvtmp_uv_stop_op_cb(op_handle);
|
||||
comm::send(end_chan, true);
|
||||
comm::send(rust_loop_chan, uv_end);
|
||||
};
|
||||
}
|
||||
|
||||
msg_run_in_bg {
|
||||
task::spawn_sched(task::manual_threads(1u)) {||
|
||||
// this call blocks
|
||||
rustrt::rust_uvtmp_uv_run(loop_handle);
|
||||
};
|
||||
}
|
||||
|
||||
msg_close(handle, cb) {
|
||||
let id = get_id_from_handle(handle);
|
||||
close_callbacks.insert(id, cb);
|
||||
let handle_ptr = handles.get(id);
|
||||
let op = op_close(handle, handle_ptr);
|
||||
|
||||
pass_to_libuv(op_handle, operation_chan, op);
|
||||
}
|
||||
uv_close(id) {
|
||||
handles.remove(id);
|
||||
let handle = id_to_handle.get(id);
|
||||
id_to_handle.remove(id);
|
||||
alt handle {
|
||||
uv_async(id, _) {
|
||||
async_cbs.remove(id);
|
||||
}
|
||||
uv_timer(id, _) {
|
||||
timer_cbs.remove(id);
|
||||
}
|
||||
_ {
|
||||
fail "unknown form of uv_handle encountered "
|
||||
+ "in uv_close handler";
|
||||
}
|
||||
}
|
||||
let cb = close_callbacks.get(id);
|
||||
close_callbacks.remove(id);
|
||||
task::spawn {||
|
||||
cb();
|
||||
};
|
||||
}
|
||||
|
||||
msg_async_init(callback, after_cb) {
|
||||
// create a new async handle
|
||||
// with the id as the handle's
|
||||
// data and save the callback for
|
||||
// invocation on msg_async_send
|
||||
let id = gen_handle_id();
|
||||
async_cbs.insert(id, callback);
|
||||
after_cbs.insert(id, after_cb);
|
||||
let op = op_async_init(id);
|
||||
pass_to_libuv(op_handle, operation_chan, op);
|
||||
}
|
||||
uv_async_init(id, async_handle) {
|
||||
// libuv created a handle, which is
|
||||
// passed back to us. save it and
|
||||
// then invoke the supplied callback
|
||||
// for after completion
|
||||
handles.insert(id, async_handle);
|
||||
let after_cb = after_cbs.get(id);
|
||||
after_cbs.remove(id);
|
||||
let async = uv_async(id, rust_loop_chan);
|
||||
id_to_handle.insert(id, copy(async));
|
||||
task::spawn {||
|
||||
after_cb(async);
|
||||
};
|
||||
}
|
||||
|
||||
msg_async_send(id) {
|
||||
let async_handle = handles.get(id);
|
||||
do_send(async_handle);
|
||||
}
|
||||
uv_async_send(id) {
|
||||
let async_cb = async_cbs.get(id);
|
||||
task::spawn {||
|
||||
async_cb(uv_async(id, rust_loop_chan));
|
||||
};
|
||||
}
|
||||
|
||||
msg_timer_init(after_cb) {
|
||||
let id = gen_handle_id();
|
||||
after_cbs.insert(id, after_cb);
|
||||
let op = op_timer_init(id);
|
||||
pass_to_libuv(op_handle, operation_chan, op);
|
||||
}
|
||||
uv_timer_init(id, handle) {
|
||||
handles.insert(id, handle);
|
||||
let after_cb = after_cbs.get(id);
|
||||
after_cbs.remove(id);
|
||||
let new_timer = uv_timer(id, rust_loop_chan);
|
||||
id_to_handle.insert(id, copy(new_timer));
|
||||
task::spawn {||
|
||||
after_cb(new_timer);
|
||||
};
|
||||
}
|
||||
|
||||
uv_timer_call(id) {
|
||||
let cb = timer_cbs.get(id);
|
||||
let the_timer = id_to_handle.get(id);
|
||||
task::spawn {||
|
||||
cb(the_timer);
|
||||
};
|
||||
}
|
||||
|
||||
msg_timer_start(id, timeout, repeat, timer_call_cb) {
|
||||
timer_cbs.insert(id, timer_call_cb);
|
||||
let handle = handles.get(id);
|
||||
let op = op_timer_start(id, handle, timeout,
|
||||
repeat);
|
||||
pass_to_libuv(op_handle, operation_chan, op);
|
||||
}
|
||||
|
||||
msg_timer_stop(id, after_cb) {
|
||||
let handle = handles.get(id);
|
||||
let op = op_timer_stop(id, handle, after_cb);
|
||||
pass_to_libuv(op_handle, operation_chan, op);
|
||||
}
|
||||
uv_timer_stop(id, after_cb) {
|
||||
let the_timer = id_to_handle.get(id);
|
||||
after_cb(the_timer);
|
||||
}
|
||||
|
||||
uv_end() {
|
||||
keep_going = false;
|
||||
}
|
||||
|
||||
_ { fail "unknown form of uv_msg received"; }
|
||||
}
|
||||
}
|
||||
};
|
||||
ret comm::recv(ret_recv_port);
|
||||
}
|
||||
|
||||
fn run(loop: uv_loop) {
|
||||
let end_port = comm::port::<bool>();
|
||||
let end_chan = comm::chan::<bool>(end_port);
|
||||
comm::send(loop, msg_run(end_chan));
|
||||
comm::recv(end_port);
|
||||
}
|
||||
|
||||
fn run_in_bg(loop: uv_loop) {
|
||||
comm::send(loop, msg_run_in_bg);
|
||||
}
|
||||
|
||||
fn async_init (
|
||||
loop: uv_loop,
|
||||
async_cb: fn~(uv_handle),
|
||||
after_cb: fn~(uv_handle)) {
|
||||
let msg = msg_async_init(async_cb, after_cb);
|
||||
comm::send(loop, msg);
|
||||
}
|
||||
|
||||
fn async_send(async: uv_handle) {
|
||||
alt async {
|
||||
uv_async(id, loop) {
|
||||
comm::send(loop, msg_async_send(id));
|
||||
}
|
||||
_ {
|
||||
fail "attempting to call async_send() with a" +
|
||||
" uv_async uv_handle";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn close(h: uv_handle, cb: fn~()) {
|
||||
let loop_chan = get_loop_chan_from_handle(h);
|
||||
comm::send(loop_chan, msg_close(h, cb));
|
||||
}
|
||||
|
||||
fn timer_init(loop: uv_loop, after_cb: fn~(uv_handle)) {
|
||||
let msg = msg_timer_init(after_cb);
|
||||
comm::send(loop, msg);
|
||||
}
|
||||
|
||||
fn timer_start(the_timer: uv_handle, timeout: u32, repeat:u32,
|
||||
timer_cb: fn~(uv_handle)) {
|
||||
alt the_timer {
|
||||
uv_timer(id, loop_chan) {
|
||||
let msg = msg_timer_start(id, timeout, repeat, timer_cb);
|
||||
comm::send(loop_chan, msg);
|
||||
}
|
||||
_ {
|
||||
fail "can only pass a uv_timer form of uv_handle to "+
|
||||
" uv::timer_start()";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn timer_stop(the_timer: uv_handle, after_cb: fn~(uv_handle)) {
|
||||
alt the_timer {
|
||||
uv_timer(id, loop_chan) {
|
||||
let msg = msg_timer_stop(id, after_cb);
|
||||
comm::send(loop_chan, msg);
|
||||
}
|
||||
_ {
|
||||
fail "only uv_timer form is allowed in calls to "+
|
||||
" uv::timer_stop()";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// internal functions
|
||||
fn pass_to_libuv(
|
||||
op_handle: *ctypes::void,
|
||||
operation_chan: comm::chan<uv_operation>,
|
||||
op: uv_operation) unsafe {
|
||||
comm::send(operation_chan, copy(op));
|
||||
do_send(op_handle);
|
||||
}
|
||||
fn do_send(h: *ctypes::void) {
|
||||
rustrt::rust_uvtmp_uv_async_send(h);
|
||||
}
|
||||
fn gen_handle_id() -> [u8] {
|
||||
ret rand::mk_rng().gen_bytes(16u);
|
||||
}
|
||||
fn get_handle_id_from(buf: *u8) -> [u8] unsafe {
|
||||
ret vec::unsafe::from_buf(buf, 16u);
|
||||
}
|
||||
|
||||
fn get_loop_chan_from_data(data: *uv_loop_data)
|
||||
-> uv_loop unsafe {
|
||||
ret (*data).rust_loop_chan;
|
||||
}
|
||||
|
||||
fn get_loop_chan_from_handle(handle: uv_handle)
|
||||
-> uv_loop {
|
||||
alt handle {
|
||||
uv_async(id,loop) | uv_timer(id,loop) {
|
||||
ret loop;
|
||||
}
|
||||
_ {
|
||||
fail "unknown form of uv_handle for get_loop_chan_from "
|
||||
+ " handle";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn get_id_from_handle(handle: uv_handle) -> [u8] {
|
||||
alt handle {
|
||||
uv_async(id,loop) | uv_timer(id,loop) {
|
||||
ret id;
|
||||
}
|
||||
_ {
|
||||
fail "unknown form of uv_handle for get_id_from handle";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// crust
|
||||
crust fn process_operation(
|
||||
loop: *ctypes::void,
|
||||
data: *uv_loop_data) unsafe {
|
||||
let op_port = (*data).operation_port;
|
||||
let loop_chan = get_loop_chan_from_data(data);
|
||||
let op_pending = comm::peek(op_port);
|
||||
while(op_pending) {
|
||||
alt comm::recv(op_port) {
|
||||
op_async_init(id) {
|
||||
let id_ptr = vec::unsafe::to_ptr(id);
|
||||
let async_handle = rustrt::rust_uvtmp_uv_async_init(
|
||||
loop,
|
||||
process_async_send,
|
||||
id_ptr);
|
||||
comm::send(loop_chan, uv_async_init(
|
||||
id,
|
||||
async_handle));
|
||||
}
|
||||
op_close(handle, handle_ptr) {
|
||||
handle_op_close(handle, handle_ptr);
|
||||
}
|
||||
op_timer_init(id) {
|
||||
let id_ptr = vec::unsafe::to_ptr(id);
|
||||
let timer_handle = rustrt::rust_uvtmp_uv_timer_init(
|
||||
loop,
|
||||
process_timer_call,
|
||||
id_ptr);
|
||||
comm::send(loop_chan, uv_timer_init(
|
||||
id,
|
||||
timer_handle));
|
||||
}
|
||||
op_timer_start(id, handle, timeout, repeat) {
|
||||
rustrt::rust_uvtmp_uv_timer_start(handle, timeout,
|
||||
repeat);
|
||||
}
|
||||
op_timer_stop(id, handle, after_cb) {
|
||||
rustrt::rust_uvtmp_uv_timer_stop(handle);
|
||||
comm::send(loop_chan, uv_timer_stop(id, after_cb));
|
||||
}
|
||||
_ { fail "unknown form of uv_operation received"; }
|
||||
}
|
||||
op_pending = comm::peek(op_port);
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_op_close(handle: uv_handle, handle_ptr: *ctypes::void) {
|
||||
// it's just like im doing C
|
||||
alt handle {
|
||||
uv_async(id, loop) {
|
||||
let cb = process_close_async;
|
||||
rustrt::rust_uvtmp_uv_close(
|
||||
handle_ptr, cb);
|
||||
}
|
||||
uv_timer(id, loop) {
|
||||
let cb = process_close_timer;
|
||||
rustrt::rust_uvtmp_uv_close(
|
||||
handle_ptr, cb);
|
||||
}
|
||||
_ {
|
||||
fail "unknown form of uv_handle encountered " +
|
||||
"in process_operation/op_close";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
crust fn process_async_send(id_buf: *u8, data: *uv_loop_data)
|
||||
unsafe {
|
||||
let handle_id = get_handle_id_from(id_buf);
|
||||
let loop_chan = get_loop_chan_from_data(data);
|
||||
comm::send(loop_chan, uv_async_send(handle_id));
|
||||
}
|
||||
|
||||
crust fn process_timer_call(id_buf: *u8, data: *uv_loop_data)
|
||||
unsafe {
|
||||
let handle_id = get_handle_id_from(id_buf);
|
||||
let loop_chan = get_loop_chan_from_data(data);
|
||||
comm::send(loop_chan, uv_timer_call(handle_id));
|
||||
}
|
||||
|
||||
fn process_close_common(id: [u8], data: *uv_loop_data)
|
||||
unsafe {
|
||||
// notify the rust loop that their handle is closed, then
|
||||
// the caller will invoke a per-handle-type c++ func to
|
||||
// free allocated memory
|
||||
let loop_chan = get_loop_chan_from_data(data);
|
||||
comm::send(loop_chan, uv_close(id));
|
||||
}
|
||||
|
||||
crust fn process_close_async(
|
||||
id_buf: *u8,
|
||||
handle_ptr: *ctypes::void,
|
||||
data: *uv_loop_data)
|
||||
unsafe {
|
||||
let id = get_handle_id_from(id_buf);
|
||||
rustrt::rust_uvtmp_uv_close_async(handle_ptr);
|
||||
// at this point, the handle and its data has been
|
||||
// released. notify the rust loop to remove the
|
||||
// handle and its data and call the user-supplied
|
||||
// close cb
|
||||
process_close_common(id, data);
|
||||
}
|
||||
|
||||
crust fn process_close_timer(
|
||||
id_buf: *u8,
|
||||
handle_ptr: *ctypes::void,
|
||||
data: *uv_loop_data)
|
||||
unsafe {
|
||||
let id = get_handle_id_from(id_buf);
|
||||
rustrt::rust_uvtmp_uv_close_timer(handle_ptr);
|
||||
process_close_common(id, data);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_uvtmp_uv_new_loop_no_handles() {
|
||||
let test_loop = uv::loop_new();
|
||||
uv::run(test_loop); // this should return immediately
|
||||
// since there aren't any handles..
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_uvtmp_uv_simple_async() {
|
||||
let test_loop = uv::loop_new();
|
||||
let exit_port = comm::port::<bool>();
|
||||
let exit_chan = comm::chan::<bool>(exit_port);
|
||||
uv::async_init(test_loop, {|new_async|
|
||||
uv::close(new_async) {||
|
||||
comm::send(exit_chan, true);
|
||||
};
|
||||
}, {|new_async|
|
||||
uv::async_send(new_async);
|
||||
});
|
||||
uv::run(test_loop);
|
||||
assert comm::recv(exit_port);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_uvtmp_uv_timer() {
|
||||
let test_loop = uv::loop_new();
|
||||
let exit_port = comm::port::<bool>();
|
||||
let exit_chan = comm::chan::<bool>(exit_port);
|
||||
uv::timer_init(test_loop) {|new_timer|
|
||||
uv::timer_start(new_timer, 1u32, 0u32) {|started_timer|
|
||||
uv::timer_stop(started_timer) {|stopped_timer|
|
||||
uv::close(stopped_timer) {||
|
||||
comm::send(exit_chan, true);
|
||||
};
|
||||
};
|
||||
};
|
||||
};
|
||||
uv::run(test_loop);
|
||||
assert comm::recv(exit_port);
|
||||
}
|
||||
|
||||
// END OF UV2
|
||||
|
||||
type thread = *ctypes::void;
|
||||
|
||||
type connect_data = *ctypes::void;
|
||||
|
||||
enum iomsg {
|
||||
whatever,
|
||||
connected(connect_data),
|
||||
wrote(connect_data),
|
||||
read(connect_data, *u8, ctypes::ssize_t),
|
||||
timer(u32),
|
||||
exit
|
||||
}
|
||||
|
||||
fn create_thread() -> thread {
|
||||
rustrt::rust_uvtmp_create_thread()
|
||||
}
|
||||
|
||||
fn start_thread(thread: thread) {
|
||||
rustrt::rust_uvtmp_start_thread(thread)
|
||||
}
|
||||
|
||||
fn join_thread(thread: thread) {
|
||||
rustrt::rust_uvtmp_join_thread(thread)
|
||||
}
|
||||
|
||||
fn delete_thread(thread: thread) {
|
||||
rustrt::rust_uvtmp_delete_thread(thread)
|
||||
}
|
||||
|
||||
fn connect(thread: thread, req_id: u32,
|
||||
ip: str, ch: comm::chan<iomsg>) -> connect_data {
|
||||
str::as_buf(ip) {|ipbuf|
|
||||
rustrt::rust_uvtmp_connect(thread, req_id, ipbuf, ch)
|
||||
}
|
||||
}
|
||||
|
||||
fn close_connection(thread: thread, req_id: u32) {
|
||||
rustrt::rust_uvtmp_close_connection(thread, req_id);
|
||||
}
|
||||
|
||||
fn write(thread: thread, req_id: u32, bytes: [u8],
|
||||
chan: comm::chan<iomsg>) unsafe {
|
||||
rustrt::rust_uvtmp_write(
|
||||
thread, req_id, vec::to_ptr(bytes), vec::len(bytes), chan);
|
||||
}
|
||||
|
||||
fn read_start(thread: thread, req_id: u32,
|
||||
chan: comm::chan<iomsg>) {
|
||||
rustrt::rust_uvtmp_read_start(thread, req_id, chan);
|
||||
}
|
||||
|
||||
fn timer_start(thread: thread, timeout: u32, req_id: u32,
|
||||
chan: comm::chan<iomsg>) {
|
||||
rustrt::rust_uvtmp_timer(thread, timeout, req_id, chan);
|
||||
}
|
||||
|
||||
fn delete_buf(buf: *u8) {
|
||||
rustrt::rust_uvtmp_delete_buf(buf);
|
||||
}
|
||||
|
||||
fn get_req_id(cd: connect_data) -> u32 {
|
||||
ret rustrt::rust_uvtmp_get_req_id(cd);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_start_stop() {
|
||||
let thread = create_thread();
|
||||
start_thread(thread);
|
||||
join_thread(thread);
|
||||
delete_thread(thread);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_connect() {
|
||||
let thread = create_thread();
|
||||
start_thread(thread);
|
||||
let port = comm::port();
|
||||
let chan = comm::chan(port);
|
||||
connect(thread, 0u32, "74.125.224.146", chan);
|
||||
alt comm::recv(port) {
|
||||
connected(cd) {
|
||||
close_connection(thread, 0u32);
|
||||
}
|
||||
_ { fail "test_connect: port isn't connected"; }
|
||||
}
|
||||
join_thread(thread);
|
||||
delete_thread(thread);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_http() {
|
||||
let thread = create_thread();
|
||||
start_thread(thread);
|
||||
let port = comm::port();
|
||||
let chan = comm::chan(port);
|
||||
connect(thread, 0u32, "74.125.224.146", chan);
|
||||
alt comm::recv(port) {
|
||||
connected(cd) {
|
||||
write(thread, 0u32, str::bytes("GET / HTTP/1.0\n\n"), chan);
|
||||
alt comm::recv(port) {
|
||||
wrote(cd) {
|
||||
read_start(thread, 0u32, chan);
|
||||
let keep_going = true;
|
||||
while keep_going {
|
||||
alt comm::recv(port) {
|
||||
read(_, buf, -1) {
|
||||
keep_going = false;
|
||||
delete_buf(buf);
|
||||
}
|
||||
read(_, buf, len) {
|
||||
unsafe {
|
||||
log(error, len);
|
||||
let buf = vec::unsafe::from_buf(buf,
|
||||
len as uint);
|
||||
let str = str::from_bytes(buf);
|
||||
#error("read something");
|
||||
io::println(str);
|
||||
}
|
||||
delete_buf(buf);
|
||||
}
|
||||
_ { fail "test_http: protocol error"; }
|
||||
}
|
||||
}
|
||||
close_connection(thread, 0u32);
|
||||
}
|
||||
_ { fail "test_http: expected `wrote`"; }
|
||||
}
|
||||
}
|
||||
_ { fail "test_http: port not connected"; }
|
||||
}
|
||||
join_thread(thread);
|
||||
delete_thread(thread);
|
||||
}
|
@ -1,51 +1,183 @@
|
||||
#include "rust_internal.h"
|
||||
#include "uv.h"
|
||||
|
||||
/*
|
||||
Wrappers of uv_* functions. These can be eliminated by figuring
|
||||
out how to build static uv with externs, or by just using dynamic libuv
|
||||
*/
|
||||
// crust fn pointers
|
||||
typedef void (*crust_async_op_cb)(uv_loop_t* loop, void* data);
|
||||
typedef void (*crust_simple_cb)(uint8_t* id_buf, void* loop_data);
|
||||
typedef void (*crust_close_cb)(uint8_t* id_buf, void* handle,
|
||||
void* data);
|
||||
|
||||
extern "C" CDECL uv_loop_t*
|
||||
rust_uv_default_loop() {
|
||||
return uv_default_loop();
|
||||
// data types
|
||||
#define RUST_UV_HANDLE_LEN 16
|
||||
|
||||
struct handle_data {
|
||||
uint8_t id_buf[RUST_UV_HANDLE_LEN];
|
||||
crust_simple_cb cb;
|
||||
crust_close_cb close_cb;
|
||||
};
|
||||
|
||||
// helpers
|
||||
static void*
|
||||
current_kernel_malloc(size_t size, const char* tag) {
|
||||
void* ptr = rust_task_thread::get_task()->kernel->malloc(size, tag);
|
||||
return ptr;
|
||||
}
|
||||
|
||||
extern "C" CDECL uv_loop_t*
|
||||
static void
|
||||
current_kernel_free(void* ptr) {
|
||||
rust_task_thread::get_task()->kernel->free(ptr);
|
||||
}
|
||||
|
||||
static handle_data*
|
||||
new_handle_data_from(uint8_t* buf, crust_simple_cb cb) {
|
||||
handle_data* data = (handle_data*)current_kernel_malloc(
|
||||
sizeof(handle_data),
|
||||
"handle_data");
|
||||
memcpy(data->id_buf, buf, RUST_UV_HANDLE_LEN);
|
||||
data->cb = cb;
|
||||
return data;
|
||||
}
|
||||
|
||||
// libuv callback impls
|
||||
static void
|
||||
native_crust_async_op_cb(uv_async_t* handle, int status) {
|
||||
crust_async_op_cb cb = (crust_async_op_cb)handle->data;
|
||||
void* loop_data = handle->loop->data;
|
||||
cb(handle->loop, loop_data);
|
||||
}
|
||||
|
||||
static void
|
||||
native_async_cb(uv_async_t* handle, int status) {
|
||||
handle_data* handle_d = (handle_data*)handle->data;
|
||||
void* loop_data = handle->loop->data;
|
||||
handle_d->cb(handle_d->id_buf, loop_data);
|
||||
}
|
||||
|
||||
static void
|
||||
native_timer_cb(uv_timer_t* handle, int status) {
|
||||
handle_data* handle_d = (handle_data*)handle->data;
|
||||
void* loop_data = handle->loop->data;
|
||||
handle_d->cb(handle_d->id_buf, loop_data);
|
||||
}
|
||||
|
||||
static void
|
||||
native_close_cb(uv_handle_t* handle) {
|
||||
handle_data* data = (handle_data*)handle->data;
|
||||
data->close_cb(data->id_buf, handle, handle->loop->data);
|
||||
}
|
||||
|
||||
static void
|
||||
native_close_op_cb(uv_handle_t* op_handle) {
|
||||
uv_loop_t* loop = op_handle->loop;
|
||||
current_kernel_free(op_handle);
|
||||
loop->data = 0; // a ptr to some stack-allocated rust mem
|
||||
uv_loop_delete(loop);
|
||||
}
|
||||
|
||||
// native fns bound in rust
|
||||
extern "C" void*
|
||||
rust_uv_loop_new() {
|
||||
return uv_loop_new();
|
||||
return (void*)uv_loop_new();
|
||||
}
|
||||
|
||||
extern "C" CDECL void
|
||||
rust_uv_loop_delete(uv_loop_t *loop) {
|
||||
return uv_loop_delete(loop);
|
||||
extern "C" void
|
||||
rust_uv_loop_set_data(uv_loop_t* loop, void* data) {
|
||||
loop->data = data;
|
||||
}
|
||||
|
||||
extern "C" CDECL int
|
||||
rust_uv_run(uv_loop_t *loop) {
|
||||
return uv_run(loop);
|
||||
extern "C" void*
|
||||
rust_uv_bind_op_cb(uv_loop_t* loop, crust_async_op_cb cb) {
|
||||
uv_async_t* async = (uv_async_t*)current_kernel_malloc(
|
||||
sizeof(uv_async_t),
|
||||
"uv_async_t");
|
||||
uv_async_init(loop, async, native_crust_async_op_cb);
|
||||
async->data = (void*)cb;
|
||||
// decrement the ref count, so that our async bind
|
||||
// doesn't count towards keeping the loop alive
|
||||
uv_unref(loop);
|
||||
return async;
|
||||
}
|
||||
|
||||
extern "C" CDECL void
|
||||
rust_uv_unref(uv_loop_t *loop) {
|
||||
return uv_unref(loop);
|
||||
extern "C" void
|
||||
rust_uv_stop_op_cb(uv_handle_t* op_handle) {
|
||||
/* // this is a hack to get libuv to cleanup a
|
||||
// handle that was made to not prevent the loop
|
||||
// from exiting via uv_unref().
|
||||
uv_ref(op_handle->loop);
|
||||
uv_close(op_handle, native_close_op_cb);
|
||||
uv_run(op_handle->loop); // this should process the handle's
|
||||
// close event and then return
|
||||
*/
|
||||
// the above code is supposed to work to cleanly close
|
||||
// a handler that was uv_unref()'d. but it causes much spew
|
||||
// instead. this is the ugly/quick way to deal w/ it for now.
|
||||
uv_close(op_handle, native_close_op_cb);
|
||||
native_close_op_cb(op_handle);
|
||||
}
|
||||
|
||||
extern "C" CDECL int
|
||||
rust_uv_idle_init(uv_loop_t* loop, uv_idle_t* idle) {
|
||||
return uv_idle_init(loop, idle);
|
||||
extern "C" void
|
||||
rust_uv_run(uv_loop_t* loop) {
|
||||
uv_run(loop);
|
||||
}
|
||||
|
||||
extern "C" CDECL int
|
||||
rust_uv_idle_start(uv_idle_t* idle, uv_idle_cb cb) {
|
||||
return uv_idle_start(idle, cb);
|
||||
extern "C" void
|
||||
rust_uv_close(uv_handle_t* handle, crust_close_cb cb) {
|
||||
handle_data* data = (handle_data*)handle->data;
|
||||
data->close_cb = cb;
|
||||
uv_close(handle, native_close_cb);
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
extern "C" CDECL size_t
|
||||
rust_uv_size_of_idle_t() {
|
||||
return sizeof(uv_idle_t);
|
||||
extern "C" void
|
||||
rust_uv_close_async(uv_async_t* handle) {
|
||||
current_kernel_free(handle->data);
|
||||
current_kernel_free(handle);
|
||||
}
|
||||
|
||||
extern "C" void
|
||||
rust_uv_close_timer(uv_async_t* handle) {
|
||||
current_kernel_free(handle->data);
|
||||
current_kernel_free(handle);
|
||||
}
|
||||
|
||||
extern "C" void
|
||||
rust_uv_async_send(uv_async_t* handle) {
|
||||
uv_async_send(handle);
|
||||
}
|
||||
|
||||
extern "C" void*
|
||||
rust_uv_async_init(uv_loop_t* loop, crust_simple_cb cb,
|
||||
uint8_t* buf) {
|
||||
uv_async_t* async = (uv_async_t*)current_kernel_malloc(
|
||||
sizeof(uv_async_t),
|
||||
"uv_async_t");
|
||||
uv_async_init(loop, async, native_async_cb);
|
||||
handle_data* data = new_handle_data_from(buf, cb);
|
||||
async->data = data;
|
||||
|
||||
return async;
|
||||
}
|
||||
|
||||
extern "C" void*
|
||||
rust_uv_timer_init(uv_loop_t* loop, crust_simple_cb cb,
|
||||
uint8_t* buf) {
|
||||
uv_timer_t* new_timer = (uv_timer_t*)current_kernel_malloc(
|
||||
sizeof(uv_timer_t),
|
||||
"uv_timer_t");
|
||||
uv_timer_init(loop, new_timer);
|
||||
handle_data* data = new_handle_data_from(buf, cb);
|
||||
new_timer->data = data;
|
||||
|
||||
return new_timer;
|
||||
}
|
||||
|
||||
extern "C" void
|
||||
rust_uv_timer_start(uv_timer_t* the_timer, uint32_t timeout,
|
||||
uint32_t repeat) {
|
||||
uv_timer_start(the_timer, native_timer_cb, timeout, repeat);
|
||||
}
|
||||
|
||||
extern "C" void
|
||||
rust_uv_timer_stop(uv_timer_t* the_timer) {
|
||||
uv_timer_stop(the_timer);
|
||||
}
|
||||
|
||||
|
@ -1,612 +0,0 @@
|
||||
#include <utility>
|
||||
#include <queue>
|
||||
#include <string>
|
||||
#include "rust_internal.h"
|
||||
#include "uv.h"
|
||||
|
||||
class rust_uvtmp_thread;
|
||||
|
||||
struct connect_data {
|
||||
uint32_t req_id;
|
||||
rust_uvtmp_thread *thread;
|
||||
char * ip_addr;
|
||||
uv_connect_t connect;
|
||||
uv_tcp_t tcp;
|
||||
chan_handle chan;
|
||||
};
|
||||
|
||||
const intptr_t whatever_tag = 0;
|
||||
const intptr_t connected_tag = 1;
|
||||
const intptr_t wrote_tag = 2;
|
||||
const intptr_t read_tag = 3;
|
||||
const intptr_t timer_tag = 4;
|
||||
const intptr_t exit_tag = 5;
|
||||
|
||||
struct iomsg {
|
||||
intptr_t tag;
|
||||
union {
|
||||
connect_data *connected_val;
|
||||
connect_data *wrote_val;
|
||||
struct {
|
||||
connect_data *cd;
|
||||
uint8_t *buf;
|
||||
ssize_t nread;
|
||||
} read_val;
|
||||
uint32_t timer_req_id;
|
||||
} val;
|
||||
};
|
||||
|
||||
struct write_data {
|
||||
connect_data *cd;
|
||||
uint8_t *buf;
|
||||
size_t len;
|
||||
chan_handle chan;
|
||||
};
|
||||
|
||||
struct read_start_data {
|
||||
connect_data *cd;
|
||||
chan_handle chan;
|
||||
};
|
||||
|
||||
struct timer_start_data {
|
||||
rust_uvtmp_thread *thread;
|
||||
uint32_t timeout;
|
||||
uint32_t req_id;
|
||||
chan_handle chan;
|
||||
};
|
||||
|
||||
// UVTMP REWORK
|
||||
|
||||
// crust fn pointers
|
||||
typedef void (*crust_async_op_cb)(uv_loop_t* loop, void* data);
|
||||
typedef void (*crust_simple_cb)(uint8_t* id_buf, void* loop_data);
|
||||
typedef void (*crust_close_cb)(uint8_t* id_buf, void* handle,
|
||||
void* data);
|
||||
|
||||
// data types
|
||||
#define RUST_UV_HANDLE_LEN 16
|
||||
|
||||
struct handle_data {
|
||||
uint8_t id_buf[RUST_UV_HANDLE_LEN];
|
||||
crust_simple_cb cb;
|
||||
crust_close_cb close_cb;
|
||||
};
|
||||
|
||||
// helpers
|
||||
static void*
|
||||
current_kernel_malloc(size_t size, const char* tag) {
|
||||
void* ptr = rust_task_thread::get_task()->kernel->malloc(size, tag);
|
||||
return ptr;
|
||||
}
|
||||
|
||||
static void
|
||||
current_kernel_free(void* ptr) {
|
||||
rust_task_thread::get_task()->kernel->free(ptr);
|
||||
}
|
||||
|
||||
static handle_data*
|
||||
new_handle_data_from(uint8_t* buf, crust_simple_cb cb) {
|
||||
handle_data* data = (handle_data*)current_kernel_malloc(
|
||||
sizeof(handle_data),
|
||||
"handle_data");
|
||||
memcpy(data->id_buf, buf, RUST_UV_HANDLE_LEN);
|
||||
data->cb = cb;
|
||||
return data;
|
||||
}
|
||||
|
||||
// libuv callback impls
|
||||
static void
|
||||
native_crust_async_op_cb(uv_async_t* handle, int status) {
|
||||
crust_async_op_cb cb = (crust_async_op_cb)handle->data;
|
||||
void* loop_data = handle->loop->data;
|
||||
cb(handle->loop, loop_data);
|
||||
}
|
||||
|
||||
static void
|
||||
native_async_cb(uv_async_t* handle, int status) {
|
||||
handle_data* handle_d = (handle_data*)handle->data;
|
||||
void* loop_data = handle->loop->data;
|
||||
handle_d->cb(handle_d->id_buf, loop_data);
|
||||
}
|
||||
|
||||
static void
|
||||
native_timer_cb(uv_timer_t* handle, int status) {
|
||||
handle_data* handle_d = (handle_data*)handle->data;
|
||||
void* loop_data = handle->loop->data;
|
||||
handle_d->cb(handle_d->id_buf, loop_data);
|
||||
}
|
||||
|
||||
static void
|
||||
native_close_cb(uv_handle_t* handle) {
|
||||
handle_data* data = (handle_data*)handle->data;
|
||||
data->close_cb(data->id_buf, handle, handle->loop->data);
|
||||
}
|
||||
|
||||
static void
|
||||
native_close_op_cb(uv_handle_t* op_handle) {
|
||||
uv_loop_t* loop = op_handle->loop;
|
||||
current_kernel_free(op_handle);
|
||||
loop->data = 0; // a ptr to some stack-allocated rust mem
|
||||
uv_loop_delete(loop);
|
||||
}
|
||||
|
||||
// native fns bound in rust
|
||||
extern "C" void*
|
||||
rust_uvtmp_uv_loop_new() {
|
||||
return (void*)uv_loop_new();
|
||||
}
|
||||
|
||||
extern "C" void
|
||||
rust_uvtmp_uv_loop_set_data(uv_loop_t* loop, void* data) {
|
||||
loop->data = data;
|
||||
}
|
||||
|
||||
extern "C" void*
|
||||
rust_uvtmp_uv_bind_op_cb(uv_loop_t* loop, crust_async_op_cb cb) {
|
||||
uv_async_t* async = (uv_async_t*)current_kernel_malloc(
|
||||
sizeof(uv_async_t),
|
||||
"uv_async_t");
|
||||
uv_async_init(loop, async, native_crust_async_op_cb);
|
||||
async->data = (void*)cb;
|
||||
// decrement the ref count, so that our async bind
|
||||
// doesn't count towards keeping the loop alive
|
||||
uv_unref(loop);
|
||||
return async;
|
||||
}
|
||||
|
||||
extern "C" void
|
||||
rust_uvtmp_uv_stop_op_cb(uv_handle_t* op_handle) {
|
||||
/* // this is a hack to get libuv to cleanup a
|
||||
// handle that was made to not prevent the loop
|
||||
// from exiting via uv_unref().
|
||||
uv_ref(op_handle->loop);
|
||||
uv_close(op_handle, native_close_op_cb);
|
||||
uv_run(op_handle->loop); // this should process the handle's
|
||||
// close event and then return
|
||||
*/
|
||||
// the above code is supposed to work to cleanly close
|
||||
// a handler that was uv_unref()'d. but it causes much spew
|
||||
// instead. this is the ugly/quick way to deal w/ it for now.
|
||||
uv_close(op_handle, native_close_op_cb);
|
||||
native_close_op_cb(op_handle);
|
||||
}
|
||||
|
||||
extern "C" void
|
||||
rust_uvtmp_uv_run(uv_loop_t* loop) {
|
||||
uv_run(loop);
|
||||
}
|
||||
|
||||
extern "C" void
|
||||
rust_uvtmp_uv_close(uv_handle_t* handle, crust_close_cb cb) {
|
||||
handle_data* data = (handle_data*)handle->data;
|
||||
data->close_cb = cb;
|
||||
uv_close(handle, native_close_cb);
|
||||
}
|
||||
|
||||
extern "C" void
|
||||
rust_uvtmp_uv_close_async(uv_async_t* handle) {
|
||||
current_kernel_free(handle->data);
|
||||
current_kernel_free(handle);
|
||||
}
|
||||
|
||||
extern "C" void
|
||||
rust_uvtmp_uv_close_timer(uv_async_t* handle) {
|
||||
current_kernel_free(handle->data);
|
||||
current_kernel_free(handle);
|
||||
}
|
||||
|
||||
extern "C" void
|
||||
rust_uvtmp_uv_async_send(uv_async_t* handle) {
|
||||
uv_async_send(handle);
|
||||
}
|
||||
|
||||
extern "C" void*
|
||||
rust_uvtmp_uv_async_init(uv_loop_t* loop, crust_simple_cb cb,
|
||||
uint8_t* buf) {
|
||||
uv_async_t* async = (uv_async_t*)current_kernel_malloc(
|
||||
sizeof(uv_async_t),
|
||||
"uv_async_t");
|
||||
uv_async_init(loop, async, native_async_cb);
|
||||
handle_data* data = new_handle_data_from(buf, cb);
|
||||
async->data = data;
|
||||
|
||||
return async;
|
||||
}
|
||||
|
||||
extern "C" void*
|
||||
rust_uvtmp_uv_timer_init(uv_loop_t* loop, crust_simple_cb cb,
|
||||
uint8_t* buf) {
|
||||
uv_timer_t* new_timer = (uv_timer_t*)current_kernel_malloc(
|
||||
sizeof(uv_timer_t),
|
||||
"uv_timer_t");
|
||||
uv_timer_init(loop, new_timer);
|
||||
handle_data* data = new_handle_data_from(buf, cb);
|
||||
new_timer->data = data;
|
||||
|
||||
return new_timer;
|
||||
}
|
||||
|
||||
extern "C" void
|
||||
rust_uvtmp_uv_timer_start(uv_timer_t* the_timer, uint32_t timeout,
|
||||
uint32_t repeat) {
|
||||
uv_timer_start(the_timer, native_timer_cb, timeout, repeat);
|
||||
}
|
||||
|
||||
extern "C" void
|
||||
rust_uvtmp_uv_timer_stop(uv_timer_t* the_timer) {
|
||||
uv_timer_stop(the_timer);
|
||||
}
|
||||
|
||||
// UVTMP REWORK
|
||||
|
||||
// FIXME: Copied from rust_builtins.cpp. Could bitrot easily
|
||||
static void
|
||||
send(rust_task *task, chan_handle chan, void *data) {
|
||||
rust_task *target_task = task->kernel->get_task_by_id(chan.task);
|
||||
if(target_task) {
|
||||
rust_port *port = target_task->get_port_by_id(chan.port);
|
||||
if(port) {
|
||||
port->send(data);
|
||||
scoped_lock with(target_task->lock);
|
||||
port->deref();
|
||||
}
|
||||
target_task->deref();
|
||||
}
|
||||
}
|
||||
|
||||
class rust_uvtmp_thread : public rust_thread {
|
||||
|
||||
private:
|
||||
std::map<int, connect_data *> req_map;
|
||||
rust_task *task;
|
||||
uv_loop_t *loop;
|
||||
uv_idle_t idle;
|
||||
lock_and_signal lock;
|
||||
bool stop_flag;
|
||||
std::queue<std::pair<connect_data *, chan_handle> > connect_queue;
|
||||
std::queue<connect_data*> close_connection_queue;
|
||||
std::queue<write_data*> write_queue;
|
||||
std::queue<read_start_data*> read_start_queue;
|
||||
std::queue<timer_start_data*> timer_start_queue;
|
||||
public:
|
||||
|
||||
rust_uvtmp_thread() {
|
||||
task = rust_task_thread::get_task();
|
||||
stop_flag = false;
|
||||
loop = uv_loop_new();
|
||||
uv_idle_init(loop, &idle);
|
||||
idle.data = this;
|
||||
uv_idle_start(&idle, idle_cb);
|
||||
}
|
||||
|
||||
~rust_uvtmp_thread() {
|
||||
uv_loop_delete(loop);
|
||||
}
|
||||
|
||||
void stop() {
|
||||
scoped_lock with(lock);
|
||||
stop_flag = true;
|
||||
}
|
||||
|
||||
connect_data *connect(uint32_t req_id, char *ip, chan_handle chan) {
|
||||
scoped_lock with(lock);
|
||||
if (req_map.count(req_id)) return NULL;
|
||||
connect_data *cd = new connect_data();
|
||||
req_map[req_id] = cd;
|
||||
cd->req_id = req_id;
|
||||
cd->ip_addr = ip;
|
||||
connect_queue.push(
|
||||
std::pair<connect_data *, chan_handle>(cd, chan));
|
||||
return cd;
|
||||
}
|
||||
|
||||
void
|
||||
close_connection(uint32_t req_id) {
|
||||
scoped_lock with(lock);
|
||||
connect_data *cd = req_map[req_id];
|
||||
close_connection_queue.push(cd);
|
||||
req_map.erase(req_id);
|
||||
}
|
||||
|
||||
void
|
||||
write(uint32_t req_id, uint8_t *buf, size_t len, chan_handle chan) {
|
||||
scoped_lock with(lock);
|
||||
connect_data *cd = req_map[req_id];
|
||||
write_data *wd = new write_data();
|
||||
wd->cd = cd;
|
||||
wd->buf = new uint8_t[len];
|
||||
wd->len = len;
|
||||
wd->chan = chan;
|
||||
|
||||
memcpy(wd->buf, buf, len);
|
||||
|
||||
write_queue.push(wd);
|
||||
}
|
||||
|
||||
void
|
||||
read_start(uint32_t req_id, chan_handle chan) {
|
||||
scoped_lock with(lock);
|
||||
connect_data *cd = req_map[req_id];
|
||||
read_start_data *rd = new read_start_data();
|
||||
rd->cd = cd;
|
||||
rd->chan = chan;
|
||||
|
||||
read_start_queue.push(rd);
|
||||
}
|
||||
|
||||
void
|
||||
timer(uint32_t timeout, uint32_t req_id, chan_handle chan) {
|
||||
scoped_lock with(lock);
|
||||
|
||||
timer_start_data *td = new timer_start_data();
|
||||
td->timeout = timeout;
|
||||
td->req_id = req_id;
|
||||
td->chan = chan;
|
||||
timer_start_queue.push(td);
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
virtual void
|
||||
run() {
|
||||
uv_run(loop);
|
||||
}
|
||||
|
||||
static void
|
||||
idle_cb(uv_idle_t* handle, int status) {
|
||||
rust_uvtmp_thread *self = (rust_uvtmp_thread*) handle->data;
|
||||
self->on_idle();
|
||||
}
|
||||
|
||||
void
|
||||
on_idle() {
|
||||
scoped_lock with(lock);
|
||||
make_new_connections();
|
||||
close_connections();
|
||||
write_buffers();
|
||||
start_reads();
|
||||
start_timers();
|
||||
close_idle_if_stop();
|
||||
}
|
||||
|
||||
void
|
||||
make_new_connections() {
|
||||
assert(lock.lock_held_by_current_thread());
|
||||
while (!connect_queue.empty()) {
|
||||
std::pair<connect_data *, chan_handle> pair = connect_queue.front();
|
||||
connect_queue.pop();
|
||||
connect_data *cd = pair.first;
|
||||
struct sockaddr_in client_addr = uv_ip4_addr("0.0.0.0", 0);
|
||||
struct sockaddr_in server_addr = uv_ip4_addr(cd->ip_addr, 80);
|
||||
|
||||
cd->thread = this;
|
||||
cd->chan = pair.second;
|
||||
cd->connect.data = cd;
|
||||
|
||||
uv_tcp_init(loop, &cd->tcp);
|
||||
uv_tcp_bind(&cd->tcp, client_addr);
|
||||
|
||||
uv_tcp_connect(&cd->connect, &cd->tcp, server_addr, connect_cb);
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
connect_cb(uv_connect_t *handle, int status) {
|
||||
connect_data *cd = (connect_data*)handle->data;
|
||||
cd->thread->on_connect(cd);
|
||||
}
|
||||
|
||||
void
|
||||
on_connect(connect_data *cd) {
|
||||
iomsg msg;
|
||||
msg.tag = connected_tag;
|
||||
msg.val.connected_val = cd;
|
||||
|
||||
send(task, cd->chan, &msg);
|
||||
}
|
||||
|
||||
void
|
||||
close_connections() {
|
||||
assert(lock.lock_held_by_current_thread());
|
||||
while (!close_connection_queue.empty()) {
|
||||
connect_data *cd = close_connection_queue.front();
|
||||
close_connection_queue.pop();
|
||||
|
||||
cd->tcp.data = cd;
|
||||
|
||||
uv_close((uv_handle_t*)&cd->tcp, tcp_close_cb);
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
tcp_close_cb(uv_handle_t *handle) {
|
||||
connect_data *cd = (connect_data*)handle->data;
|
||||
delete cd;
|
||||
}
|
||||
|
||||
void
|
||||
write_buffers() {
|
||||
assert(lock.lock_held_by_current_thread());
|
||||
while (!write_queue.empty()) {
|
||||
write_data *wd = write_queue.front();
|
||||
write_queue.pop();
|
||||
|
||||
uv_write_t *write = new uv_write_t();
|
||||
|
||||
write->data = wd;
|
||||
|
||||
uv_buf_t buf;
|
||||
buf.base = (char*)wd->buf;
|
||||
buf.len = wd->len;
|
||||
|
||||
uv_write(write, (uv_stream_t*)&wd->cd->tcp, &buf, 1, write_cb);
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
write_cb(uv_write_t *handle, int status) {
|
||||
write_data *wd = (write_data*)handle->data;
|
||||
rust_uvtmp_thread *self = wd->cd->thread;
|
||||
self->on_write(handle, wd);
|
||||
}
|
||||
|
||||
void
|
||||
on_write(uv_write_t *handle, write_data *wd) {
|
||||
iomsg msg;
|
||||
msg.tag = timer_tag;
|
||||
msg.val.wrote_val = wd->cd;
|
||||
|
||||
send(task, wd->chan, &msg);
|
||||
|
||||
delete [] wd->buf;
|
||||
delete wd;
|
||||
delete handle;
|
||||
}
|
||||
|
||||
void
|
||||
start_reads() {
|
||||
assert (lock.lock_held_by_current_thread());
|
||||
while (!read_start_queue.empty()) {
|
||||
read_start_data *rd = read_start_queue.front();
|
||||
read_start_queue.pop();
|
||||
|
||||
connect_data *cd = rd->cd;
|
||||
cd->tcp.data = rd;
|
||||
|
||||
uv_read_start((uv_stream_t*)&cd->tcp, alloc_cb, read_cb);
|
||||
}
|
||||
}
|
||||
|
||||
static uv_buf_t
|
||||
alloc_cb(uv_handle_t* handle, size_t size) {
|
||||
uv_buf_t buf;
|
||||
buf.base = new char[size];
|
||||
buf.len = size;
|
||||
return buf;
|
||||
}
|
||||
|
||||
static void
|
||||
read_cb(uv_stream_t *handle, ssize_t nread, uv_buf_t buf) {
|
||||
read_start_data *rd = (read_start_data*)handle->data;
|
||||
rust_uvtmp_thread *self = rd->cd->thread;
|
||||
self->on_read(rd, nread, buf);
|
||||
}
|
||||
|
||||
void
|
||||
on_read(read_start_data *rd, ssize_t nread, uv_buf_t buf) {
|
||||
iomsg msg;
|
||||
msg.tag = read_tag;
|
||||
msg.val.read_val.cd = rd->cd;
|
||||
msg.val.read_val.buf = (uint8_t*)buf.base;
|
||||
msg.val.read_val.nread = nread;
|
||||
|
||||
send(task, rd->chan, &msg);
|
||||
if (nread == -1) {
|
||||
delete rd;
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
start_timers() {
|
||||
assert (lock.lock_held_by_current_thread());
|
||||
while (!timer_start_queue.empty()) {
|
||||
timer_start_data *td = timer_start_queue.front();
|
||||
timer_start_queue.pop();
|
||||
|
||||
td->thread = this;
|
||||
|
||||
uv_timer_t *timer = (uv_timer_t *)malloc(sizeof(uv_timer_t));
|
||||
timer->data = td;
|
||||
uv_timer_init(loop, timer);
|
||||
uv_timer_start(timer, timer_cb, td->timeout, 0);
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
timer_cb(uv_timer_t *handle, int what) {
|
||||
timer_start_data *td = (timer_start_data*)handle->data;
|
||||
rust_uvtmp_thread *self = td->thread;
|
||||
self->on_timer(td);
|
||||
free(handle);
|
||||
}
|
||||
|
||||
void
|
||||
on_timer(timer_start_data *rd) {
|
||||
iomsg msg;
|
||||
msg.tag = timer_tag;
|
||||
msg.val.timer_req_id = rd->req_id;
|
||||
|
||||
send(task, rd->chan, &msg);
|
||||
delete rd;
|
||||
}
|
||||
|
||||
void
|
||||
close_idle_if_stop() {
|
||||
assert(lock.lock_held_by_current_thread());
|
||||
if (stop_flag) {
|
||||
uv_close((uv_handle_t*)&idle, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
extern "C" rust_uvtmp_thread *
|
||||
rust_uvtmp_create_thread() {
|
||||
rust_uvtmp_thread *thread = new rust_uvtmp_thread();
|
||||
return thread;
|
||||
}
|
||||
|
||||
extern "C" void
|
||||
rust_uvtmp_start_thread(rust_uvtmp_thread *thread) {
|
||||
thread->start();
|
||||
}
|
||||
|
||||
extern "C" void
|
||||
rust_uvtmp_join_thread(rust_uvtmp_thread *thread) {
|
||||
thread->stop();
|
||||
thread->join();
|
||||
}
|
||||
|
||||
extern "C" void
|
||||
rust_uvtmp_delete_thread(rust_uvtmp_thread *thread) {
|
||||
delete thread;
|
||||
}
|
||||
|
||||
extern "C" connect_data *
|
||||
rust_uvtmp_connect(rust_uvtmp_thread *thread, uint32_t req_id, char *ip, chan_handle *chan) {
|
||||
return thread->connect(req_id, ip, *chan);
|
||||
}
|
||||
|
||||
extern "C" void
|
||||
rust_uvtmp_close_connection(rust_uvtmp_thread *thread, uint32_t req_id) {
|
||||
thread->close_connection(req_id);
|
||||
}
|
||||
|
||||
extern "C" void
|
||||
rust_uvtmp_write(rust_uvtmp_thread *thread, uint32_t req_id,
|
||||
uint8_t *buf, size_t len, chan_handle *chan) {
|
||||
thread->write(req_id, buf, len, *chan);
|
||||
}
|
||||
|
||||
extern "C" void
|
||||
rust_uvtmp_read_start(rust_uvtmp_thread *thread, uint32_t req_id,
|
||||
chan_handle *chan) {
|
||||
thread->read_start(req_id, *chan);
|
||||
}
|
||||
|
||||
extern "C" void
|
||||
rust_uvtmp_timer(rust_uvtmp_thread *thread, uint32_t timeout, uint32_t req_id, chan_handle *chan) {
|
||||
thread->timer(timeout, req_id, *chan);
|
||||
}
|
||||
|
||||
extern "C" void
|
||||
rust_uvtmp_delete_buf(uint8_t *buf) {
|
||||
delete [] buf;
|
||||
}
|
||||
|
||||
extern "C" uint32_t
|
||||
rust_uvtmp_get_req_id(connect_data *cd) {
|
||||
return cd->req_id;
|
||||
}
|
||||
|
||||
|
@ -80,38 +80,19 @@ upcall_call_shim_on_rust_stack
|
||||
upcall_new_stack
|
||||
upcall_del_stack
|
||||
upcall_reset_stack_limit
|
||||
rust_uv_default_loop
|
||||
rust_uv_loop_new
|
||||
rust_uv_loop_delete
|
||||
rust_uv_loop_set_data
|
||||
rust_uv_bind_op_cb
|
||||
rust_uv_stop_op_cb
|
||||
rust_uv_run
|
||||
rust_uv_unref
|
||||
rust_uv_idle_init
|
||||
rust_uv_idle_start
|
||||
rust_uv_size_of_idle_t
|
||||
rust_uvtmp_create_thread
|
||||
rust_uvtmp_start_thread
|
||||
rust_uvtmp_join_thread
|
||||
rust_uvtmp_delete_thread
|
||||
rust_uvtmp_connect
|
||||
rust_uvtmp_close_connection
|
||||
rust_uvtmp_write
|
||||
rust_uvtmp_read_start
|
||||
rust_uvtmp_timer
|
||||
rust_uvtmp_delete_buf
|
||||
rust_uvtmp_get_req_id
|
||||
rust_uvtmp_uv_loop_new
|
||||
rust_uvtmp_uv_loop_set_data
|
||||
rust_uvtmp_uv_bind_op_cb
|
||||
rust_uvtmp_uv_stop_op_cb
|
||||
rust_uvtmp_uv_run
|
||||
rust_uvtmp_uv_close
|
||||
rust_uvtmp_uv_close_async
|
||||
rust_uvtmp_uv_close_timer
|
||||
rust_uvtmp_uv_async_send
|
||||
rust_uvtmp_uv_async_init
|
||||
rust_uvtmp_uv_timer_init
|
||||
rust_uvtmp_uv_timer_start
|
||||
rust_uvtmp_uv_timer_stop
|
||||
rust_uv_close
|
||||
rust_uv_close_async
|
||||
rust_uv_close_timer
|
||||
rust_uv_async_send
|
||||
rust_uv_async_init
|
||||
rust_uv_timer_init
|
||||
rust_uv_timer_start
|
||||
rust_uv_timer_stop
|
||||
rust_dbg_lock_create
|
||||
rust_dbg_lock_destroy
|
||||
rust_dbg_lock_lock
|
||||
|
Loading…
Reference in New Issue
Block a user