Drop enqueued elements when a port is destructed. Closes #1155

This commit is contained in:
Brian Anderson 2011-11-13 15:28:59 -08:00
parent 3cc2641fc3
commit 138d9ca5d5
7 changed files with 39 additions and 2 deletions

View File

@ -45,6 +45,7 @@ native "cdecl" mod rustrt {
fn new_port(unit_sz: uint) -> *rust_port; fn new_port(unit_sz: uint) -> *rust_port;
fn del_port(po: *rust_port); fn del_port(po: *rust_port);
fn get_port_id(po: *rust_port) -> port_id; fn get_port_id(po: *rust_port) -> port_id;
fn rust_port_size(po: *rust_port) -> ctypes::size_t;
} }
native "rust-intrinsic" mod rusti { native "rust-intrinsic" mod rusti {
@ -75,7 +76,13 @@ tag chan<uniq T> {
chan_t(task::task, port_id); chan_t(task::task, port_id);
} }
resource port_ptr(po: *rustrt::rust_port) { resource port_ptr<uniq T>(po: *rustrt::rust_port) {
// Drain the port so that all the still-enqueued items get dropped
while rustrt::rust_port_size(po) > 0u {
// FIXME: For some reason if we don't assign to something here
// we end up with invalid reads in the drop glue.
let t = rusti::recv::<T>(po);
}
rustrt::del_port(po); rustrt::del_port(po);
} }
@ -90,7 +97,7 @@ transmitted. If a port value is copied, both copies refer to the same port.
Ports may be associated with multiple <chan>s. Ports may be associated with multiple <chan>s.
*/ */
tag port<uniq T> { port_t(@port_ptr); } tag port<uniq T> { port_t(@port_ptr<T>); }
/* /*
Function: send Function: send

View File

@ -475,6 +475,11 @@ del_port(rust_port *port) {
port->deref(); port->deref();
} }
extern "C" CDECL size_t
rust_port_size(rust_port *port) {
return port->size();
}
extern "C" CDECL rust_port_id extern "C" CDECL rust_port_id
get_port_id(rust_port *port) { get_port_id(rust_port *port) {
return port->id; return port->id;

View File

@ -48,6 +48,11 @@ bool rust_port::receive(void *dptr) {
return false; return false;
} }
size_t rust_port::size() {
scoped_lock with(lock);
return buffer.size();
}
void rust_port::log_state() { void rust_port::log_state() {
LOG(task, comm, LOG(task, comm,
"port size: %d", "port size: %d",

View File

@ -19,6 +19,7 @@ public:
void log_state(); void log_state();
void send(void *sptr); void send(void *sptr);
bool receive(void *dptr); bool receive(void *dptr);
size_t size();
}; };
// //

View File

@ -34,6 +34,7 @@ rust_get_stdout
rust_get_stderr rust_get_stderr
rust_str_push rust_str_push
rust_list_files rust_list_files
rust_port_size
rust_process_wait rust_process_wait
rust_ptr_eq rust_ptr_eq
rust_run_program rust_run_program

View File

@ -0,0 +1,9 @@
// xfail-win32 Requires unwinding
use std;
import std::comm;
fn main() {
let p = comm::port();
let c = comm::chan(p);
comm::send(c, "coffee");
}

View File

@ -0,0 +1,9 @@
// xfail-win32 Requires unwinding
use std;
import std::comm;
fn main() {
let p = comm::port();
let c = comm::chan(p);
comm::send(c, ~"coffee");
}