diff --git a/src/rt/rust_aio.cpp b/src/rt/rust_aio.cpp index ed820b6ed31..3b2040476d0 100644 --- a/src/rt/rust_aio.cpp +++ b/src/rt/rust_aio.cpp @@ -41,7 +41,7 @@ struct socket_data : public task_owned { } void send_result(void *data) { - chan->send(&data); + chan->port->send(&data); chan->deref(); chan = NULL; } @@ -138,7 +138,7 @@ static void read_progress(uv_stream_t *socket, ssize_t nread, uv_buf_t buf) { v->fill = nread; break; } - data->reader->send(v); + data->reader->port->send(v); } static void new_connection(uv_stream_t *socket, int status) { @@ -156,7 +156,7 @@ static void new_connection(uv_stream_t *socket, int status) { server->task->fail(); return; } - server->chan->send(&client); + server->chan->port->send(&client); } extern "C" CDECL socket_data *aio_serve(const char *ip, int port, @@ -201,7 +201,7 @@ static void free_socket(uv_handle_t *handle) { bool closed = true; I(data->task->sched, data->chan != NULL); data->task->kill(); - data->chan->send(&closed); + data->chan->port->send(&closed); } delete data; } @@ -221,7 +221,7 @@ extern "C" CDECL void aio_close_server(socket_data *server, // XXX: hax until rust_task::kill // send null and the receiver knows to call back into native code to check void* null_client = NULL; - server->chan->send(&null_client); + server->chan->port->send(&null_client); server->chan->deref(); server->chan = chan->clone(iotask); aio_close_socket(server); diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index cfe3edcd24c..4086e3708fd 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -498,7 +498,7 @@ chan_id_send(type_desc *t, rust_task_id target_task_id, if(target_task) { rust_port *port = target_task->get_port_by_id(target_port_id); if(port) { - port->remote_chan->send(sptr); + port->send(sptr); } target_task->deref(); } diff --git a/src/rt/rust_chan.cpp b/src/rt/rust_chan.cpp index de26feac020..9a41dacc323 100644 --- a/src/rt/rust_chan.cpp +++ b/src/rt/rust_chan.cpp @@ -36,32 +36,6 @@ bool rust_chan::is_associated() { return port != NULL; } -/** - * Attempt to send data to the associated port. - */ -void rust_chan::send(void *sptr) { - if (!is_associated()) { - W(kernel, is_associated(), - "rust_chan::transmit with no associated port."); - return; - } - - I(kernel, port != NULL); - scoped_lock with(port->lock); - - buffer.enqueue(sptr); - - A(kernel, !buffer.is_empty(), - "rust_chan::transmit with nothing to send."); - - if (port->task->blocked_on(port)) { - KLOG(kernel, comm, "dequeued in rendezvous_ptr"); - buffer.dequeue(port->task->rendezvous_ptr); - port->task->rendezvous_ptr = 0; - port->task->wakeup(port); - } -} - rust_chan *rust_chan::clone(rust_task *target) { return new (target->kernel, "cloned chan") rust_chan(kernel, port, buffer.unit_sz); diff --git a/src/rt/rust_chan.h b/src/rt/rust_chan.h index 0736012e63c..4fb78e062c7 100644 --- a/src/rt/rust_chan.h +++ b/src/rt/rust_chan.h @@ -18,8 +18,6 @@ public: bool is_associated(); - void send(void *sptr); - rust_chan *clone(rust_task *target); }; diff --git a/src/rt/rust_port.cpp b/src/rt/rust_port.cpp index 7fb24dfccbc..a68b4417110 100644 --- a/src/rt/rust_port.cpp +++ b/src/rt/rust_port.cpp @@ -31,6 +31,28 @@ rust_port::~rust_port() { task->release_port(id); } +void rust_port::send(void *sptr) { + if (!remote_chan->is_associated()) { + W(kernel, remote_chan->is_associated(), + "rust_chan::transmit with no associated port."); + return; + } + + scoped_lock with(lock); + + remote_chan->buffer.enqueue(sptr); + + A(kernel, !remote_chan->buffer.is_empty(), + "rust_chan::transmit with nothing to send."); + + if (task->blocked_on(this)) { + KLOG(kernel, comm, "dequeued in rendezvous_ptr"); + remote_chan->buffer.dequeue(task->rendezvous_ptr); + task->rendezvous_ptr = 0; + task->wakeup(this); + } +} + bool rust_port::receive(void *dptr) { if (remote_chan->buffer.is_empty() == false) { remote_chan->buffer.dequeue(dptr); diff --git a/src/rt/rust_port.h b/src/rt/rust_port.h index 57b7492f711..551ad2e89da 100644 --- a/src/rt/rust_port.h +++ b/src/rt/rust_port.h @@ -17,6 +17,7 @@ public: rust_port(rust_task *task, size_t unit_sz); ~rust_port(); void log_state(); + void send(void *sptr); bool receive(void *dptr); }; diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp index 5043f9ec4b0..854c2c74462 100644 --- a/src/rt/rust_task.cpp +++ b/src/rt/rust_task.cpp @@ -133,8 +133,10 @@ rust_task::~rust_task() msg.id = user.id; msg.result = failed ? tr_failure : tr_success; - target->send(&msg); - target->deref(); + if (target->is_associated()) { + target->port->send(&msg); + target->deref(); + } } }