std: dump the tcp::new_listener server API

This commit is contained in:
Jeff Olson 2012-05-29 05:47:21 -07:00 committed by Brian Anderson
parent 5d86686e7f
commit 68e129dc39

View File

@ -13,14 +13,13 @@ import libc::size_t;
import str::extensions;
// tcp interfaces
export tcp_socket, tcp_conn_port;
export tcp_socket;
// errors
export tcp_err_data, tcp_connect_err_data;
// operations on a tcp_socket
export write, write_future, read_start, read_stop;
// tcp server stuff
export listen_for_conn, accept;
export new_listener, conn_recv, conn_recv_spawn, conn_peek;
// tcp client stuff
export connect;
// helper methods
@ -51,23 +50,6 @@ class tcp_socket {
}
}
class tcp_conn_port {
let conn_data: @tcp_conn_port_data;
new(conn_data: @tcp_conn_port_data) { self.conn_data = conn_data; }
drop unsafe {
let conn_data_ptr = ptr::addr_of(*(self.conn_data));
let server_stream_ptr = ptr::addr_of((*conn_data_ptr).server_stream);
let stream_closed_po = (*(self.conn_data)).stream_closed_po;
let iotask = (*conn_data_ptr).iotask;
iotask::interact(iotask) {|loop_ptr|
log(debug, #fmt("dtor for tcp_conn_port loop: %?",
loop_ptr));
uv::ll::close(server_stream_ptr, tcp_nl_close_cb);
}
comm::recv(stream_closed_po);
}
}
#[doc="
Contains raw, string-based, error information returned from libuv
"]
@ -351,6 +333,7 @@ fn read_future(sock: tcp_socket, timeout_msecs: uint)
}
#[doc="
<<<<<<< HEAD
Bind to a given IP/port and listen for new connections
# Arguments
@ -527,6 +510,8 @@ fn conn_peek(server_port: tcp_conn_port) -> bool {
}
#[doc="
=======
>>>>>>> std: dump the tcp::new_listener server API
Bind an incoming client connection to a `net::tcp::tcp_socket`
# Notes
@ -782,17 +767,6 @@ fn listen_for_conn(host_ip: ip::ip_addr, port: uint, backlog: uint,
}
mod net_tcp_methods {
#[doc="
Convenience methods extending `net::tcp::tcp_conn_port`
"]
impl methods_tcp_conn_port for tcp_conn_port {
fn recv() -> result::result<tcp_socket, tcp_err_data> {
conn_recv(self) }
fn recv_spawn(cb: fn~(result::result<tcp_socket,tcp_err_data>))
{ conn_recv_spawn(self, cb); }
fn peek() -> bool { conn_peek(self) }
}
#[doc="
Convenience methods extending `net::tcp::tcp_socket`
"]
@ -993,6 +967,7 @@ fn write_common_impl(socket_data_ptr: *tcp_socket_data,
}
}
<<<<<<< HEAD
// various recv_* can use a tcp_conn_port can re-use this..
fn conn_port_new_tcp_socket(
stream_handle_ptr: *uv::ll::uv_tcp_t,
@ -1022,21 +997,12 @@ fn conn_port_new_tcp_socket(
result::ok(tcp_socket(client_socket_data))
}
=======
>>>>>>> std: dump the tcp::new_listener server API
enum tcp_new_connection {
new_tcp_conn(*uv::ll::uv_tcp_t)
}
type tcp_conn_port_data = {
server_stream: uv::ll::uv_tcp_t,
stream_closed_po: comm::port<()>,
stream_closed_ch: comm::chan<()>,
iotask: iotask,
new_conn_po: comm::port<result::result<*uv::ll::uv_tcp_t,
tcp_err_data>>,
new_conn_ch: comm::chan<result::result<*uv::ll::uv_tcp_t,
tcp_err_data>>
};
type tcp_listen_fc_data = {
server_stream_ptr: *uv::ll::uv_tcp_t,
stream_closed_ch: comm::chan<()>,
@ -1075,66 +1041,11 @@ crust fn tcp_lfc_on_connection_cb(handle: *uv::ll::uv_tcp_t,
}
}
crust fn tcp_nl_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
let conn_data_ptr = uv::ll::get_data_for_uv_handle(
handle) as *tcp_conn_port_data;
comm::send((*conn_data_ptr).stream_closed_ch, ());
}
fn malloc_uv_tcp_t() -> *uv::ll::uv_tcp_t unsafe {
rustrt::rust_uv_current_kernel_malloc(
rustrt::rust_uv_helper_uv_tcp_t_size()) as *uv::ll::uv_tcp_t
}
crust fn tcp_nl_on_connection_cb(server_handle_ptr: *uv::ll::uv_tcp_t,
status: libc::c_int) unsafe {
let server_data_ptr = uv::ll::get_data_for_uv_handle(server_handle_ptr)
as *tcp_conn_port_data;
let new_conn_ch = (*server_data_ptr).new_conn_ch;
let loop_ptr = uv::ll::get_loop_for_uv_handle(server_handle_ptr);
alt status {
0i32 {
let client_stream_handle_ptr = malloc_uv_tcp_t();
*(client_stream_handle_ptr as *mut uv::ll::uv_tcp_t) =
uv::ll::tcp_t();
alt uv::ll::tcp_init(loop_ptr, client_stream_handle_ptr) {
0i32 {
log(debug, "uv_tcp_init successful for client stream");
alt uv::ll::accept(
server_handle_ptr as *libc::c_void,
client_stream_handle_ptr as *libc::c_void) {
0i32 {
log(debug, "successfully accepted client connection");
comm::send(new_conn_ch,
result::ok(client_stream_handle_ptr));
}
_ {
log(debug, "failed to accept client conn");
comm::send(
new_conn_ch,
result::err(uv::ll::get_last_err_data(loop_ptr)
.to_tcp_err()));
}
}
}
_ {
log(debug, "failed to init client stream");
comm::send(
new_conn_ch,
result::err(uv::ll::get_last_err_data(loop_ptr)
.to_tcp_err()));
}
}
}
_ {
comm::send(
new_conn_ch,
result::err(uv::ll::get_last_err_data(loop_ptr)
.to_tcp_err()));
}
}
}
enum tcp_connect_result {
tcp_connected(tcp_socket),
tcp_connect_error(tcp_err_data)
@ -1331,10 +1242,6 @@ mod test {
impl_gl_tcp_ipv4_server_and_client();
}
#[test]
fn test_gl_tcp_server_listener_and_client_ipv4() unsafe {
impl_gl_tcp_ipv4_server_listener_and_client();
}
#[test]
fn test_gl_tcp_ipv4_client_error_connection_refused() unsafe {
impl_gl_tcp_ipv4_client_error_connection_refused();
}
@ -1348,11 +1255,6 @@ mod test {
}
#[test]
#[ignore(cfg(target_os = "linux"))]
fn test_gl_tcp_server_listener_and_client_ipv4() unsafe {
impl_gl_tcp_ipv4_server_listener_and_client();
}
#[test]
#[ignore(cfg(target_os = "linux"))]
fn test_gl_tcp_ipv4_client_error_connection_refused() unsafe {
impl_gl_tcp_ipv4_client_error_connection_refused();
}
@ -1404,52 +1306,6 @@ mod test {
assert str::contains(actual_req, expected_req);
assert str::contains(actual_resp, expected_resp);
}
fn impl_gl_tcp_ipv4_server_listener_and_client() {
let hl_loop = uv::global_loop::get();
let server_ip = "127.0.0.1";
let server_port = 8889u;
let expected_req = "ping";
let expected_resp = "pong";
let server_result_po = comm::port::<str>();
let server_result_ch = comm::chan(server_result_po);
let cont_po = comm::port::<()>();
let cont_ch = comm::chan(cont_po);
// server
task::spawn_sched(task::manual_threads(1u)) {||
let actual_req = comm::listen {|server_ch|
run_tcp_test_server_listener(
server_ip,
server_port,
expected_resp,
server_ch,
cont_ch,
hl_loop)
};
server_result_ch.send(actual_req);
};
comm::recv(cont_po);
// client
log(debug, "server started, firing up client..");
let actual_resp_result = comm::listen {|client_ch|
run_tcp_test_client(
server_ip,
server_port,
expected_req,
client_ch,
hl_loop)
};
assert actual_resp_result.is_success();
let actual_resp = actual_resp_result.get();
let actual_req = comm::recv(server_result_po);
log(debug, #fmt("REQ: expected: '%s' actual: '%s'",
expected_req, actual_req));
log(debug, #fmt("RESP: expected: '%s' actual: '%s'",
expected_resp, actual_resp));
assert str::contains(actual_req, expected_req);
assert str::contains(actual_resp, expected_resp);
}
fn impl_gl_tcp_ipv4_client_error_connection_refused() {
let hl_loop = uv::global_loop::get();
let server_ip = "127.0.0.1";
@ -1552,57 +1408,6 @@ mod test {
ret_val
}
fn run_tcp_test_server_listener(server_ip: str,
server_port: uint, resp: str,
server_ch: comm::chan<str>,
cont_ch: comm::chan<()>,
iotask: iotask) -> str {
task::spawn_sched(task::manual_threads(1u)) {||
let server_ip_addr = ip::v4::parse_addr(server_ip);
let new_listener_result =
new_listener(server_ip_addr, server_port, 128u, iotask);
if result::is_err(new_listener_result) {
let err_data = result::get_err(new_listener_result);
log(debug, #fmt("SERVER: exited abnormally name %s msg %s",
err_data.err_name, err_data.err_msg));
fail "couldn't set up new listener";
}
let server_port = result::unwrap(new_listener_result);
cont_ch.send(());
// receive a single new connection.. normally this'd be
// in a loop {}, but we're just going to take a single
// client.. get their req, write a resp and then exit
let new_conn_result = server_port.recv();
if result::is_err(new_conn_result) {
let err_data = result::get_err(new_conn_result);
log(debug, #fmt("SERVER: exited abnormally name %s msg %s",
err_data.err_name, err_data.err_msg));
fail "couldn't recv new conn";
}
let sock = result::unwrap(new_conn_result);
log(debug, "SERVER: successfully accepted"+
"connection!");
let received_req_bytes =
read(sock, 0u);
alt received_req_bytes {
result::ok(data) {
server_ch.send(
str::from_bytes(data));
log(debug, "SERVER: before write");
tcp_write_single(sock, str::bytes(resp));
log(debug, "SERVER: after write.. die");
}
result::err(err_data) {
server_ch.send("");
}
}
};
let ret_val = server_ch.recv();
log(debug, #fmt("SERVER: exited and got ret val: '%s'", ret_val));
ret_val
}
fn run_tcp_test_client(server_ip: str, server_port: uint, resp: str,
client_ch: comm::chan<str>,
iotask: iotask) -> result::result<str,