rt: Move rust_chan::send to rust_port::send

This commit is contained in:
Brian Anderson 2011-11-10 17:53:19 -08:00
parent 371574f64b
commit 793da65a8e
7 changed files with 33 additions and 36 deletions

View File

@ -41,7 +41,7 @@ struct socket_data : public task_owned<socket_data> {
}
void send_result(void *data) {
chan->send(&data);
chan->port->send(&data);
chan->deref();
chan = NULL;
}
@ -138,7 +138,7 @@ static void read_progress(uv_stream_t *socket, ssize_t nread, uv_buf_t buf) {
v->fill = nread;
break;
}
data->reader->send(v);
data->reader->port->send(v);
}
static void new_connection(uv_stream_t *socket, int status) {
@ -156,7 +156,7 @@ static void new_connection(uv_stream_t *socket, int status) {
server->task->fail();
return;
}
server->chan->send(&client);
server->chan->port->send(&client);
}
extern "C" CDECL socket_data *aio_serve(const char *ip, int port,
@ -201,7 +201,7 @@ static void free_socket(uv_handle_t *handle) {
bool closed = true;
I(data->task->sched, data->chan != NULL);
data->task->kill();
data->chan->send(&closed);
data->chan->port->send(&closed);
}
delete data;
}
@ -221,7 +221,7 @@ extern "C" CDECL void aio_close_server(socket_data *server,
// XXX: hax until rust_task::kill
// send null and the receiver knows to call back into native code to check
void* null_client = NULL;
server->chan->send(&null_client);
server->chan->port->send(&null_client);
server->chan->deref();
server->chan = chan->clone(iotask);
aio_close_socket(server);

View File

@ -498,7 +498,7 @@ chan_id_send(type_desc *t, rust_task_id target_task_id,
if(target_task) {
rust_port *port = target_task->get_port_by_id(target_port_id);
if(port) {
port->remote_chan->send(sptr);
port->send(sptr);
}
target_task->deref();
}

View File

@ -36,32 +36,6 @@ bool rust_chan::is_associated() {
return port != NULL;
}
/**
* Attempt to send data to the associated port.
*/
void rust_chan::send(void *sptr) {
if (!is_associated()) {
W(kernel, is_associated(),
"rust_chan::transmit with no associated port.");
return;
}
I(kernel, port != NULL);
scoped_lock with(port->lock);
buffer.enqueue(sptr);
A(kernel, !buffer.is_empty(),
"rust_chan::transmit with nothing to send.");
if (port->task->blocked_on(port)) {
KLOG(kernel, comm, "dequeued in rendezvous_ptr");
buffer.dequeue(port->task->rendezvous_ptr);
port->task->rendezvous_ptr = 0;
port->task->wakeup(port);
}
}
rust_chan *rust_chan::clone(rust_task *target) {
return new (target->kernel, "cloned chan")
rust_chan(kernel, port, buffer.unit_sz);

View File

@ -18,8 +18,6 @@ public:
bool is_associated();
void send(void *sptr);
rust_chan *clone(rust_task *target);
};

View File

@ -31,6 +31,28 @@ rust_port::~rust_port() {
task->release_port(id);
}
void rust_port::send(void *sptr) {
if (!remote_chan->is_associated()) {
W(kernel, remote_chan->is_associated(),
"rust_chan::transmit with no associated port.");
return;
}
scoped_lock with(lock);
remote_chan->buffer.enqueue(sptr);
A(kernel, !remote_chan->buffer.is_empty(),
"rust_chan::transmit with nothing to send.");
if (task->blocked_on(this)) {
KLOG(kernel, comm, "dequeued in rendezvous_ptr");
remote_chan->buffer.dequeue(task->rendezvous_ptr);
task->rendezvous_ptr = 0;
task->wakeup(this);
}
}
bool rust_port::receive(void *dptr) {
if (remote_chan->buffer.is_empty() == false) {
remote_chan->buffer.dequeue(dptr);

View File

@ -17,6 +17,7 @@ public:
rust_port(rust_task *task, size_t unit_sz);
~rust_port();
void log_state();
void send(void *sptr);
bool receive(void *dptr);
};

View File

@ -133,8 +133,10 @@ rust_task::~rust_task()
msg.id = user.id;
msg.result = failed ? tr_failure : tr_success;
target->send(&msg);
target->deref();
if (target->is_associated()) {
target->port->send(&msg);
target->deref();
}
}
}