rt: Remove ports

This commit is contained in:
Brian Anderson 2013-02-01 18:50:40 -08:00
parent 542bf20414
commit 4f6516969e
12 changed files with 2 additions and 539 deletions

View File

@ -57,13 +57,11 @@ RUNTIME_CXXS_$(1) := \
rt/rust_sched_reaper.cpp \
rt/rust_task.cpp \
rt/rust_stack.cpp \
rt/rust_port.cpp \
rt/rust_upcall.cpp \
rt/rust_uv.cpp \
rt/rust_crate_map.cpp \
rt/rust_log.cpp \
rt/rust_gc_metadata.cpp \
rt/rust_port_selector.cpp \
rt/rust_util.cpp \
rt/circular_buffer.cpp \
rt/isaac/randport.cpp \

View File

@ -88,7 +88,6 @@ struct Task {
kernel: *Kernel, // 112
name: *c_char, // 116
list_index: i32, // 120
rendezvous_ptr: *uintptr_t, // 124
boxed_region: BoxedRegion // 128
}
@ -107,7 +106,6 @@ struct Task {
kernel: *Kernel,
name: *c_char,
list_index: i32,
rendezvous_ptr: *uintptr_t,
boxed_region: BoxedRegion
}

View File

@ -17,7 +17,6 @@
#include "sync/timer.h"
#include "sync/rust_thread.h"
#include "rust_abi.h"
#include "rust_port.h"
#include <time.h>
@ -694,68 +693,6 @@ rust_sched_threads() {
return task->sched->max_number_of_threads();
}
extern "C" CDECL rust_port*
rust_port_take(rust_port_id id) {
rust_task *task = rust_get_current_task();
return task->kernel->get_port_by_id(id);
}
extern "C" CDECL void
rust_port_drop(rust_port *p) {
assert(p != NULL);
p->deref();
}
extern "C" CDECL rust_task_id
rust_port_task(rust_port *p) {
assert(p != NULL);
return p->task->id;
}
extern "C" CDECL rust_port*
new_port(size_t unit_sz) {
rust_task *task = rust_get_current_task();
LOG(task, comm, "new_port(task=0x%" PRIxPTR " (%s), unit_sz=%d)",
(uintptr_t) task, task->name, unit_sz);
// port starts with refcount == 1
return new (task->kernel, "rust_port") rust_port(task, unit_sz);
}
extern "C" CDECL void
rust_port_begin_detach(rust_port *port, uintptr_t *yield) {
rust_task *task = rust_get_current_task();
LOG(task, comm, "rust_port_detach(0x%" PRIxPTR ")", (uintptr_t) port);
port->begin_detach(yield);
}
extern "C" CDECL void
rust_port_end_detach(rust_port *port) {
port->end_detach();
}
extern "C" CDECL void
del_port(rust_port *port) {
rust_task *task = rust_get_current_task();
LOG(task, comm, "del_port(0x%" PRIxPTR ")", (uintptr_t) port);
delete port;
}
extern "C" CDECL size_t
rust_port_size(rust_port *port) {
return port->size();
}
extern "C" CDECL rust_port_id
get_port_id(rust_port *port) {
return port->id;
}
extern "C" CDECL uintptr_t
rust_port_id_send(rust_port_id target_port_id, void *sptr) {
rust_task *task = rust_get_current_task();
return (uintptr_t)task->kernel->send_to_port(target_port_id, sptr);
}
// This is called by an intrinsic on the Rust stack and must run
// entirely in the red zone. Do not call on the C stack.
extern "C" CDECL MUST_CHECK bool
@ -763,19 +700,6 @@ rust_task_yield(rust_task *task, bool *killed) {
return task->yield();
}
extern "C" CDECL void
port_recv(uintptr_t *dptr, rust_port *port, uintptr_t *yield) {
port->receive(dptr, yield);
}
extern "C" CDECL void
rust_port_select(rust_port **dptr, rust_port **ports,
size_t n_ports, uintptr_t *yield) {
rust_task *task = rust_get_current_task();
rust_port_selector *selector = task->get_port_selector();
selector->select(task, dptr, ports, n_ports, yield);
}
extern "C" CDECL void
rust_set_exit_status(intptr_t code) {
rust_task *task = rust_get_current_task();

View File

@ -11,7 +11,6 @@
#include "rust_kernel.h"
#include "rust_port.h"
#include "rust_util.h"
#include "rust_scheduler.h"
#include "rust_sched_launcher.h"
@ -26,7 +25,6 @@ rust_kernel::rust_kernel(rust_env *env) :
_region(env, true),
_log(NULL),
max_task_id(INIT_TASK_ID-1), // sync_add_and_fetch increments first
max_port_id(1),
rval(0),
max_sched_id(1),
killed(false),
@ -264,47 +262,6 @@ rust_kernel::generate_task_id() {
return id;
}
rust_port_id
rust_kernel::register_port(rust_port *port) {
uintptr_t new_live_ports;
rust_port_id new_port_id;
{
scoped_lock with(port_lock);
new_port_id = max_port_id++;
port_table.put(new_port_id, port);
new_live_ports = port_table.count();
}
assert(new_port_id != INTPTR_MAX && "Hit the maximum port id");
KLOG_("Registered port %" PRIdPTR, new_port_id);
KLOG_("Total outstanding ports: %d", new_live_ports);
return new_port_id;
}
void
rust_kernel::release_port_id(rust_port_id id) {
KLOG_("Releasing port %" PRIdPTR, id);
uintptr_t new_live_ports;
{
scoped_lock with(port_lock);
port_table.remove(id);
new_live_ports = port_table.count();
}
KLOG_("Total outstanding ports: %d", new_live_ports);
}
rust_port *
rust_kernel::get_port_by_id(rust_port_id id) {
assert(id != 0 && "invalid port id");
scoped_lock with(port_lock);
rust_port *port = NULL;
// get leaves port unchanged if not found.
port_table.get(id, &port);
if(port) {
port->ref();
}
return port;
}
#ifdef __WIN32__
void
rust_kernel::win32_require(LPCTSTR fn, BOOL ok) {
@ -400,21 +357,6 @@ rust_kernel::begin_shutdown() {
allow_scheduler_exit();
}
bool
rust_kernel::send_to_port(rust_port_id chan, void *sptr) {
KLOG_("rust_port_id*_send port: 0x%" PRIxPTR, (uintptr_t) chan);
rust_port *port = get_port_by_id(chan);
if(port) {
port->send(sptr);
port->deref();
return true;
} else {
KLOG_("didn't get the port");
return false;
}
}
void
rust_kernel::register_exit_function(spawn_fn runner, fn_env_pair *f) {
scoped_lock with(at_exit_lock);

View File

@ -15,8 +15,7 @@
The kernel is primarily responsible for managing the lifetime of
schedulers, which in turn run rust tasks. It provides a memory
allocator and logging service for use by other runtime components,
it creates unique task and port ids and provides global access
to ports by id.
it creates unique task ids.
The kernel runs until there are no live schedulers.
@ -56,13 +55,11 @@ class rust_scheduler;
class rust_sched_driver;
class rust_sched_launcher_factory;
struct rust_task_thread;
class rust_port;
// Scheduler, task, and port handles. These uniquely identify within a
// Scheduler, task handles. These uniquely identify within a
// single kernel instance the objects they represent.
typedef intptr_t rust_sched_id;
typedef intptr_t rust_task_id;
typedef intptr_t rust_port_id;
typedef std::map<rust_sched_id, rust_scheduler*> sched_map;
@ -80,12 +77,6 @@ class rust_kernel {
// The next task id
rust_task_id max_task_id;
// Protects max_port_id and port_table
lock_and_signal port_lock;
// The next port id
rust_task_id max_port_id;
hash_map<rust_port_id, rust_port *> port_table;
lock_and_signal rval_lock;
int rval;
@ -163,10 +154,6 @@ public:
rust_task_id generate_task_id();
rust_port_id register_port(rust_port *port);
rust_port *get_port_by_id(rust_port_id id);
void release_port_id(rust_port_id tid);
void set_exit_status(int code);
rust_sched_id main_sched_id() { return main_scheduler; }
@ -177,8 +164,6 @@ public:
void inc_weak_task_count();
void dec_weak_task_count();
bool send_to_port(rust_port_id chan, void *sptr);
void register_exit_function(spawn_fn runner, fn_env_pair *f);
};

View File

@ -1,153 +0,0 @@
// Copyright 2012 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
#include "rust_port.h"
#include "rust_task.h"
rust_port::rust_port(rust_task *task, size_t unit_sz)
: ref_count(1), kernel(task->kernel), task(task),
unit_sz(unit_sz), buffer(kernel, unit_sz) {
LOG(task, comm,
"new rust_port(task=0x%" PRIxPTR ", unit_sz=%d) -> port=0x%"
PRIxPTR, (uintptr_t)task, unit_sz, (uintptr_t)this);
id = kernel->register_port(this);
}
rust_port::~rust_port() {
LOG(task, comm, "~rust_port 0x%" PRIxPTR, (uintptr_t) this);
}
void rust_port::ref() {
scoped_lock with(ref_lock);
ref_count++;
}
void rust_port::deref() {
scoped_lock with(ref_lock);
ref_count--;
if (!ref_count) {
// The port owner is waiting for the port to be detached (if it
// hasn't already been killed)
scoped_lock with(task->lifecycle_lock);
if (task->blocked_on(&detach_cond)) {
task->wakeup_inner(&detach_cond);
}
}
}
void rust_port::begin_detach(uintptr_t *yield) {
*yield = false;
kernel->release_port_id(id);
scoped_lock with(ref_lock);
ref_count--;
if (ref_count != 0) {
task->block(&detach_cond, "waiting for port detach");
*yield = true;
}
}
void rust_port::end_detach() {
// Just take the lock to make sure that the thread that signaled
// the detach_cond isn't still holding it
scoped_lock with(ref_lock);
assert(ref_count == 0);
}
void rust_port::send(void *sptr) {
bool did_rendezvous = false;
{
scoped_lock with(lock);
buffer.enqueue(sptr);
assert(!buffer.is_empty() &&
"rust_chan::transmit with nothing to send.");
{
scoped_lock with(task->lifecycle_lock);
if (task->blocked_on(this)) {
KLOG(kernel, comm, "dequeued in rendezvous_ptr");
buffer.dequeue(task->rendezvous_ptr);
task->rendezvous_ptr = 0;
task->wakeup_inner(this);
did_rendezvous = true;
}
}
}
if (!did_rendezvous) {
// If the task wasn't waiting specifically on this port,
// it may be waiting on a group of ports
rust_port_selector *port_selector = task->get_port_selector();
// The port selector will check if the task is blocked, not us.
port_selector->msg_sent_on(this);
}
}
void rust_port::receive(void *dptr, uintptr_t *yield) {
LOG(task, comm, "port: 0x%" PRIxPTR ", dptr: 0x%" PRIxPTR
", size: 0x%" PRIxPTR,
(uintptr_t) this, (uintptr_t) dptr, unit_sz);
scoped_lock with(lock);
*yield = false;
if (buffer.is_empty() == false) {
buffer.dequeue(dptr);
LOG(task, comm, "<=== read data ===");
return;
}
memset(dptr, 0, buffer.unit_sz);
// No data was buffered on any incoming channel, so block this task on
// the port. Remember the rendezvous location so that any sender task
// can write to it before waking up this task.
LOG(task, comm, "<=== waiting for rendezvous data ===");
task->rendezvous_ptr = (uintptr_t*) dptr;
task->block(this, "waiting for rendezvous data");
// Blocking the task might fail if the task has already been killed, but
// in the event of both failure and success the task needs to yield. On
// success, it yields and waits to be unblocked. On failure it yields and
// is then fails the task.
*yield = true;
}
size_t rust_port::size() {
scoped_lock with(lock);
return buffer.size();
}
void rust_port::log_state() {
LOG(task, comm,
"port size: %d",
buffer.size());
}
//
// Local Variables:
// mode: C++
// fill-column: 78;
// indent-tabs-mode: nil
// c-basic-offset: 4
// buffer-file-coding-system: utf-8-unix
// compile-command: "make -k -C $RBUILD 2>&1 | sed -e 's/\\/x\\//x:\\//g'";
// End:
//

View File

@ -1,64 +0,0 @@
// Copyright 2012 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
#ifndef RUST_PORT_H
#define RUST_PORT_H
#include "rust_globals.h"
#include "circular_buffer.h"
class port_detach_cond : public rust_cond { };
class rust_port : public kernel_owned<rust_port>, public rust_cond {
private:
// Protects ref_count and detach_cond
lock_and_signal ref_lock;
intptr_t ref_count;
port_detach_cond detach_cond;
public:
void ref();
void deref();
public:
rust_port_id id;
rust_kernel *kernel;
rust_task *task;
size_t unit_sz;
circular_buffer buffer;
lock_and_signal lock;
public:
rust_port(rust_task *task, size_t unit_sz);
~rust_port();
void log_state();
void send(void *sptr);
void receive(void *dptr, uintptr_t *yield);
size_t size();
void begin_detach(uintptr_t *yield);
void end_detach();
};
//
// Local Variables:
// mode: C++
// fill-column: 78;
// indent-tabs-mode: nil
// c-basic-offset: 4
// buffer-file-coding-system: utf-8-unix
// compile-command: "make -k -C $RBUILD 2>&1 | sed -e 's/\\/x\\//x:\\//g'";
// End:
//
#endif /* RUST_PORT_H */

View File

@ -1,103 +0,0 @@
// Copyright 2012 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
#include "rust_port.h"
#include "rust_port_selector.h"
#include "rust_task.h"
rust_port_selector::rust_port_selector()
: ports(NULL), n_ports(0) {
}
void
rust_port_selector::select(rust_task *task, rust_port **dptr,
rust_port **ports,
size_t n_ports, uintptr_t *yield) {
assert(this->ports == NULL);
assert(this->n_ports == 0);
assert(dptr != NULL);
assert(ports != NULL);
assert(n_ports != 0);
assert(yield != NULL);
*yield = false;
size_t locks_taken = 0;
bool found_msg = false;
// Take each port's lock as we iterate through them because
// if none of them contain a usable message then we need to
// block the task before any of them can try to send another
// message.
// Start looking for ports from a different index each time.
size_t j = isaac_rand(&task->sched_loop->rctx);
for (size_t i = 0; i < n_ports; i++) {
size_t k = (i + j) % n_ports;
rust_port *port = ports[k];
assert(port != NULL);
port->lock.lock();
locks_taken++;
if (port->buffer.size() > 0) {
*dptr = port;
found_msg = true;
break;
}
}
if (!found_msg) {
this->ports = ports;
this->n_ports = n_ports;
assert(task->rendezvous_ptr == NULL);
task->rendezvous_ptr = (uintptr_t*)dptr;
task->block(this, "waiting for select rendezvous");
// Blocking the task might fail if the task has already been
// killed, but in the event of both failure and success the
// task needs to yield. On success, it yields and waits to be
// unblocked. On failure it yields and is then fails the task.
*yield = true;
}
for (size_t i = 0; i < locks_taken; i++) {
size_t k = (i + j) % n_ports;
rust_port *port = ports[k];
port->lock.unlock();
}
}
void
rust_port_selector::msg_sent_on(rust_port *port) {
rust_task *task = port->task;
port->lock.must_not_have_lock();
// Prevent two ports from trying to wake up the task
// simultaneously
scoped_lock with(task->lifecycle_lock);
if (task->blocked_on(this)) {
for (size_t i = 0; i < n_ports; i++) {
if (port == ports[i]) {
// This was one of the ports we were waiting on
ports = NULL;
n_ports = 0;
*task->rendezvous_ptr = (uintptr_t) port;
task->rendezvous_ptr = NULL;
task->wakeup_inner(this);
return;
}
}
}
}

View File

@ -1,35 +0,0 @@
// Copyright 2012 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
#ifndef RUST_PORT_SELECTOR_H
#define RUST_PORT_SELECTOR_H
#include "rust_globals.h"
class rust_port;
class rust_port_selector : public rust_cond {
private:
rust_port **ports;
size_t n_ports;
public:
rust_port_selector();
void select(rust_task *task,
rust_port **dptr,
rust_port **ports,
size_t n_ports,
uintptr_t *yield);
void msg_sent_on(rust_port *port);
};
#endif /* RUST_PORT_SELECTOR_H */

View File

@ -21,7 +21,6 @@
#include "rust_task.h"
#include "rust_env.h"
#include "rust_port.h"
#include "rust_globals.h"
#include "rust_crate_map.h"
@ -37,7 +36,6 @@ rust_task::rust_task(rust_sched_loop *sched_loop, rust_task_state state,
kernel(sched_loop->kernel),
name(name),
list_index(-1),
rendezvous_ptr(0),
boxed(sched_loop->kernel->env, &local_region),
local_region(&sched_loop->local_region),
unwinding(false),

View File

@ -116,7 +116,6 @@
#include "rust_kernel.h"
#include "boxed_region.h"
#include "rust_stack.h"
#include "rust_port_selector.h"
#include "rust_type.h"
#include "rust_sched_loop.h"
@ -219,14 +218,6 @@ rust_task : public kernel_owned<rust_task>
const char *const name;
int32_t list_index;
// Rendezvous pointer for receiving data when blocked on a port. If we're
// trying to read data and no data is available on any incoming channel,
// we block on the port, and yield control to the scheduler. Since, we
// were not able to read anything, we remember the location where the
// result should go in the rendezvous_ptr, and let the sender write to
// that location before waking us up.
uintptr_t* rendezvous_ptr;
boxed_region boxed;
memory_region local_region;
@ -270,8 +261,6 @@ private:
uintptr_t next_c_sp;
uintptr_t next_rust_sp;
rust_port_selector port_selector;
// Called when the atomic refcount reaches zero
void delete_this();
@ -302,8 +291,6 @@ private:
char const *file,
size_t line);
friend class rust_port;
friend class rust_port_selector;
bool block_inner(rust_cond *on, const char* name);
void wakeup_inner(rust_cond *from);
bool blocked_on(rust_cond *cond);
@ -381,8 +368,6 @@ public:
void call_on_rust_stack(void *args, void *fn_ptr);
bool have_c_stack() { return c_stack != NULL; }
rust_port_selector *get_port_selector() { return &port_selector; }
rust_task_state get_state() { return state; }
rust_cond *get_cond() { return cond; }
const char *get_cond_name() { return cond_name; }

View File

@ -1,14 +1,12 @@
debug_box
debug_fn
debug_opaque
del_port
debug_ptrcast
debug_tag
debug_tydesc
debug_get_stk_seg
debug_abi_1
debug_abi_2
get_port_id
get_task_id
get_time
rust_tzset
@ -17,12 +15,8 @@ rust_localtime
rust_timegm
rust_mktime
last_os_error
new_port
new_task
port_recv
precise_time_ns
rust_port_id_send
rust_port_select
rand_free
rand_new
rand_new_seeded
@ -45,9 +39,6 @@ rust_list_files
rust_list_files2
rust_log_console_on
rust_log_console_off
rust_port_begin_detach
rust_port_end_detach
rust_port_size
rust_process_wait
rust_ptr_eq
rust_run_program
@ -171,9 +162,6 @@ rust_dbg_do_nothing
rust_dbg_breakpoint
rust_osmain_sched_id
rust_compare_and_swap_ptr
rust_port_take
rust_port_drop
rust_port_task
rust_task_inhibit_kill
rust_task_allow_kill
rust_task_inhibit_yield