diff --git a/src/Makefile b/src/Makefile index 086995385ef..3de9b38ce0f 100644 --- a/src/Makefile +++ b/src/Makefile @@ -34,7 +34,7 @@ DSYMUTIL := true ifeq ($(CFG_OSTYPE), Linux) CFG_RUNTIME := librustrt.so CFG_STDLIB := libstd.so - CFG_GCC_CFLAGS += -fPIC + CFG_GCC_CFLAGS += -fPIC -march=i686 CFG_GCC_LINK_FLAGS += -shared -fPIC -ldl -lpthread -lrt ifeq ($(CFG_CPUTYPE), x86_64) CFG_GCC_CFLAGS += -m32 @@ -107,7 +107,7 @@ ifdef CFG_UNIXY endif CFG_OBJ_SUFFIX := .o CFG_EXE_SUFFIX := .exe - CFG_GCC_CFLAGS := + CFG_GCC_CFLAGS := -march=i686 CFG_GCC_LINK_FLAGS := -shared ifeq ($(CFG_CPUTYPE), x86_64) CFG_GCC_CFLAGS += -m32 @@ -248,7 +248,6 @@ BOOT_CMIS := $(BOOT_MLS:.ml=.cmi) RUNTIME_CS := rt/sync/timer.cpp \ rt/sync/sync.cpp \ rt/sync/spin_lock.cpp \ - rt/sync/lock_free_queue.cpp \ rt/sync/condition_variable.cpp \ rt/rust.cpp \ rt/rust_builtin.cpp \ @@ -286,6 +285,7 @@ RUNTIME_HDR := rt/globals.h \ rt/util/hash_map.h \ rt/sync/sync.h \ rt/sync/timer.h \ + rt/sync/lock_free_queue.h \ rt/rust_srv.h \ rt/memory_region.h \ rt/memory.h @@ -394,7 +394,9 @@ TASK_XFAILS := test/run-pass/acyclic-unwind.rs \ test/run-pass/task-life-0.rs \ test/run-pass/task-comm.rs \ test/run-pass/threads.rs \ - test/run-pass/yield.rs + test/run-pass/yield.rs \ + test/run-pass/task-comm-15.rs \ + test/run-pass/task-life-0.rs TEST_XFAILS_X86 := $(TASK_XFAILS) \ test/run-pass/child-outlives-parent.rs \ @@ -537,6 +539,11 @@ TEST_XFAILS_LLVM := $(TASK_XFAILS) \ task-comm-9.rs \ task-comm-10.rs \ task-comm-11.rs \ + task-comm-12.rs \ + task-comm-13.rs \ + task-comm-13-thread.rs \ + task-comm-14.rs \ + task-comm-15.rs \ task-life-0.rs \ threads.rs \ type-sizes.rs \ diff --git a/src/rt/rust_dom.cpp b/src/rt/rust_dom.cpp index ce6bebf3a8c..dc42286da8a 100644 --- a/src/rt/rust_dom.cpp +++ b/src/rt/rust_dom.cpp @@ -301,12 +301,14 @@ void rust_dom::send_message(rust_message *message) { /** * Drains and processes incoming pending messages. */ -void rust_dom::drain_incoming_message_queue() { +void rust_dom::drain_incoming_message_queue(bool process) { rust_message *message; - while ((message = (rust_message *) _incoming_message_queue.dequeue())) { + while (_incoming_message_queue.dequeue(&message)) { log(rust_log::COMM, "<== processing incoming message \"%s\" 0x%" PRIxPTR, message->label, message); - message->process(); + if (process) { + message->process(); + } message->~rust_message(); this->synchronized_region.free(message); } @@ -454,7 +456,7 @@ rust_dom::start_main_loop() while (n_live_tasks() > 0) { A(this, is_deadlocked() == false, "deadlock"); - drain_incoming_message_queue(); + drain_incoming_message_queue(true); rust_task *scheduled_task = schedule_task(); @@ -519,7 +521,7 @@ rust_dom::start_main_loop() } sync::yield(); } else { - drain_incoming_message_queue(); + drain_incoming_message_queue(true); } reap_dead_tasks(); } diff --git a/src/rt/rust_dom.h b/src/rt/rust_dom.h index 34d8c69462f..44f56cc41bb 100644 --- a/src/rt/rust_dom.h +++ b/src/rt/rust_dom.h @@ -41,7 +41,7 @@ struct rust_dom hash_map *> _port_proxies; // Incoming messages from other domains. - lock_free_queue _incoming_message_queue; + lock_free_queue _incoming_message_queue; #ifndef __WIN32__ pthread_attr_t attr; @@ -71,7 +71,7 @@ struct rust_dom void free(void *mem, memory_region::memory_region_type type); void send_message(rust_message *message); - void drain_incoming_message_queue(); + void drain_incoming_message_queue(bool process); rust_proxy *get_task_proxy(rust_task *task); void delete_proxies(); rust_proxy *get_port_proxy_synchronized(rust_port *port); diff --git a/src/rt/rust_message.h b/src/rt/rust_message.h index 6d986acf5d2..7aee6d9f6d0 100644 --- a/src/rt/rust_message.h +++ b/src/rt/rust_message.h @@ -9,7 +9,7 @@ /** * Abstract base class for all message types. */ -class rust_message : public lock_free_queue_node { +class rust_message { public: const char* label; private: diff --git a/src/rt/sync/interrupt_transparent_queue.cpp b/src/rt/sync/interrupt_transparent_queue.cpp new file mode 100644 index 00000000000..064b25f17ba --- /dev/null +++ b/src/rt/sync/interrupt_transparent_queue.cpp @@ -0,0 +1,56 @@ +/* + * Interrupt transparent queue, Schoen et. al, "On Interrupt-Transparent + * Synchronization in an Embedded Object-Oriented Operating System", 2000. + * enqueue() is allowed to interrupt enqueue() and dequeue(), however, + * dequeue() is not allowed to interrupt itself. + */ + +#include "../globals.h" +#include "interrupt_transparent_queue.h" + +interrupt_transparent_queue_node::interrupt_transparent_queue_node() : + next(NULL) { + +} + +interrupt_transparent_queue::interrupt_transparent_queue() : _tail(this) { + +} + +void +interrupt_transparent_queue::enqueue(interrupt_transparent_queue_node *item) { + lock.lock(); + item->next = (interrupt_transparent_queue_node *) NULL; + interrupt_transparent_queue_node *last = _tail; + _tail = item; + while (last->next) { + last = last->next; + } + last->next = item; + lock.unlock(); +} + +interrupt_transparent_queue_node * +interrupt_transparent_queue::dequeue() { + lock.lock(); + interrupt_transparent_queue_node *item = next; + if (item && !(next = item->next)) { + _tail = (interrupt_transparent_queue_node *) this; + if (item->next) { + interrupt_transparent_queue_node *lost = item->next; + interrupt_transparent_queue_node *help; + do { + help = lost->next; + enqueue(lost); + } while ((lost = help) != + (interrupt_transparent_queue_node *) NULL); + } + } + lock.unlock(); + return item; +} + +bool +interrupt_transparent_queue::is_empty() { + return next == NULL; +} diff --git a/src/rt/sync/interrupt_transparent_queue.h b/src/rt/sync/interrupt_transparent_queue.h new file mode 100644 index 00000000000..7c02d0c8388 --- /dev/null +++ b/src/rt/sync/interrupt_transparent_queue.h @@ -0,0 +1,22 @@ +#ifndef INTERRUPT_TRANSPARENT_QUEUE_H +#define INTERRUPT_TRANSPARENT_QUEUE_H + +#include "spin_lock.h" + +class interrupt_transparent_queue_node { +public: + interrupt_transparent_queue_node *next; + interrupt_transparent_queue_node(); +}; + +class interrupt_transparent_queue : interrupt_transparent_queue_node { + spin_lock lock; + interrupt_transparent_queue_node *_tail; +public: + interrupt_transparent_queue(); + void enqueue(interrupt_transparent_queue_node *item); + interrupt_transparent_queue_node *dequeue(); + bool is_empty(); +}; + +#endif /* INTERRUPT_TRANSPARENT_QUEUE_H */ diff --git a/src/rt/sync/lock_free_queue.h b/src/rt/sync/lock_free_queue.h index c1a309b8817..ac0c5b046a0 100644 --- a/src/rt/sync/lock_free_queue.h +++ b/src/rt/sync/lock_free_queue.h @@ -1,22 +1,210 @@ #ifndef LOCK_FREE_QUEUE_H #define LOCK_FREE_QUEUE_H -#include "spin_lock.h" +/** + * How and why this lock free queue works: + * + * Adapted from the paper titled "Simple, Fast, and Practical Non-Blocking + * and Blocking Concurrent Queue Algorithms" by Maged M. Michael, + * Michael L. Scott. + * + * Safety Properties: + * + * 1. The linked list is always connected. + * 2. Nodes are only inserted after the last node in the linked list. + * 3. Nodes are only deleted from the beginning of the linked list. + * 4. Head always points to the first node in the linked list. + * 5. Tail always points to a node in the linked list. + * + * + * 1. The linked list is always connected because the next pointer is not set + * to null before the node is freed, and no node is freed until deleted + * from the linked list. + * + * 2. Nodes are only inserted at the end of the linked list because they are + * linked through the tail pointer which always points to a node in the + * linked list (5) and an inserted node is only linked to a node that has + * a null next pointer, and the only such node is the last one (1). + * + * 3. Nodes are deleted from the beginning of the list because they are + * deleted only when they are pointed to by head which always points to the + * first node (4). + * + * 4. Head always points to the first node in the list because it only changes + * its value to the next node atomically. The new value of head cannot be + * null because if there is only one node in the list the dequeue operation + * returns without deleting any nodes. + * + * 5. Tail always points to a node in the linked list because it never lags + * behind head, so it can never point to a deleted node. Also, when tail + * changes its value it always swings to the next node in the list and it + * never tires to change its value if the next pointer is NULL. + */ -class lock_free_queue_node { -public: - lock_free_queue_node *next; - lock_free_queue_node(); -}; +#include +template +class lock_free_queue { + + struct node_t; + struct pointer_t { + node_t *node; + uint32_t count; + pointer_t() : node(NULL), count(0) { + // Nop. + } + pointer_t(node_t *node, uint32_t count) { + this->node = node; + this->count = count; + } + bool equals(pointer_t &other) { + return node == other.node && count == other.count; + } + }; + + struct node_t { + T value; + pointer_t next; + + node_t() { + next.node = NULL; + next.count = 0; + } + + node_t(pointer_t next, T value) { + this->next = next; + this->value = value; + } + }; + + // Always points to the first node in the list. + pointer_t head; + + // Always points to a node in the list, (not necessarily the last). + pointer_t tail; + + // Compare and swap counted pointers, we can only do this if pointr_t is + // 8 bytes or less since that the maximum size CAS can handle. + bool compare_and_swap(pointer_t *address, + pointer_t *oldValue, + pointer_t newValue) { + + if (sync::compare_and_swap( + (uint64_t*) address, + *(uint64_t*) oldValue, + *(uint64_t*) &newValue)) { + return true; + } + return false; + } -class lock_free_queue : lock_free_queue_node { - spin_lock lock; - lock_free_queue_node *_tail; public: - lock_free_queue(); - void enqueue(lock_free_queue_node *item); - lock_free_queue_node *dequeue(); - bool is_empty(); + lock_free_queue() { + // We can only handle 64bit CAS for counted pointers, so this will + // not work with 64bit pointers. + assert (sizeof(pointer_t) == sizeof(uint64_t)); + + // Allocate a dummy node to be used as the first node in the list. + node_t *node = new node_t(); + + // Head and tail both start out pointing to the dummy node. + head.node = node; + tail.node = node; + } + + virtual ~lock_free_queue() { + // Delete dummy node. + delete head.node; + } + + bool is_empty() { + return head.node == tail.node; + } + + void enqueue(T value) { + + // Create a new node to be inserted in the linked list, and set the + // next node to NULL. + node_t *node = new node_t(); + node->value = value; + node->next.node = NULL; + pointer_t tail; + + // Keep trying until enqueue is done. + while (true) { + // Read the current tail which may either point to the last node + // or to the second to last node (not sure why second to last, + // and not any other node). + tail = this->tail; + + // Reads the next node after the tail which will be the last node + // if null. + pointer_t next; + if (tail.node != NULL) { + next = tail.node->next; + } + + // Loop if another thread changed the tail since we last read it. + if (tail.equals(this->tail) == false) { + continue; + } + + // If next is not pointing to the last node try to swing tail to + // the last node and loop. + if (next.node != NULL) { + compare_and_swap(&this->tail, &tail, + pointer_t(next.node, tail.count + 1)); + continue; + } + + // Try to link node at the end of the linked list. + if (compare_and_swap(&tail.node->next, &next, + pointer_t(node, next.count + 1))) { + // Enqueueing is done. + break; + } + } + + // Enqueue is done, try to swing tail to the inserted node. + compare_and_swap(&this->tail, &tail, + pointer_t(node, tail.count + 1)); + } + + bool dequeue(T *value) { + pointer_t head; + + // Keep trying until dequeue is done. + while(true) { + head = this->head; + pointer_t tail = this->tail; + pointer_t next = head.node->next; + + if (head.equals(this->head) == false) { + continue; + } + + // If queue is empty, or if tail is falling behind. + if (head.node == tail.node) { + // If queue is empty. + if (next.node == NULL) { + return false; + } + // Tail is falling behind, advance it. + compare_and_swap(&this->tail, + &tail, + pointer_t(next.node, tail.count + 1)); + } else { + // Read value before CAS, otherwise another + // dequeue might advance it. + *value = next.node->value; + if (compare_and_swap(&this->head, &head, + pointer_t(next.node, head.count + 1))) { + break; + } + } + } + delete head.node; + return true; + } }; #endif /* LOCK_FREE_QUEUE_H */ diff --git a/src/rt/sync/sync.h b/src/rt/sync/sync.h index 902b5661be4..8c9a13f0f4a 100644 --- a/src/rt/sync/sync.h +++ b/src/rt/sync/sync.h @@ -4,6 +4,11 @@ class sync { public: static void yield(); + template + static bool compare_and_swap(T *address, + T oldValue, T newValue) { + return __sync_bool_compare_and_swap(address, oldValue, newValue); + } }; #endif /* SYNC_H */ diff --git a/src/test/run-fail/task-comm-14.rs b/src/test/run-fail/task-comm-14.rs new file mode 100644 index 00000000000..f5fa27ac649 --- /dev/null +++ b/src/test/run-fail/task-comm-14.rs @@ -0,0 +1,26 @@ +io fn main() { + let port[int] po = port(); + + // Spawn 10 tasks each sending us back one int. + let int i = 10; + while (i > 0) { + log i; + spawn "child" child(i, chan(po)); + i = i - 1; + } + + i = 10; + let int value = 0; + while (i > 0) { + log i; + value <- po; + i = i - 1; + } + + log "main thread exiting"; +} + +io fn child(int x, chan[int] ch) { + log x; + ch <| x; +} diff --git a/src/test/run-pass/task-comm-12.rs b/src/test/run-pass/task-comm-12.rs new file mode 100644 index 00000000000..ab7c25e8a62 --- /dev/null +++ b/src/test/run-pass/task-comm-12.rs @@ -0,0 +1,23 @@ +use std; +import std._task; + +fn main() -> () { + test00(); +} + +fn start(int task_number) { + log "Started / Finished Task."; +} + +fn test00() { + let int i = 0; + let task t = spawn thread start(i); + + // Sleep long enough for the task to finish. + _task.sleep(10000u); + + // Try joining tasks that have already finished. + join t; + + log "Joined Task."; +} \ No newline at end of file diff --git a/src/test/run-pass/task-comm-13-thread.rs b/src/test/run-pass/task-comm-13-thread.rs new file mode 100644 index 00000000000..0dab20ed895 --- /dev/null +++ b/src/test/run-pass/task-comm-13-thread.rs @@ -0,0 +1,18 @@ +use std; +import std._task; + +io fn start(chan[int] c, int start, int number_of_messages) { + let int i = 0; + while (i < number_of_messages) { + c <| start + i; + i += 1; + } +} + +fn main() -> () { + log "Check that we don't deadlock."; + let port[int] p = port(); + let task a = spawn thread "start" start(chan(p), 0, 10); + join a; + log "Joined Task"; +} \ No newline at end of file diff --git a/src/test/run-pass/task-comm-13.rs b/src/test/run-pass/task-comm-13.rs new file mode 100644 index 00000000000..97bdcb6ac87 --- /dev/null +++ b/src/test/run-pass/task-comm-13.rs @@ -0,0 +1,18 @@ +use std; +import std._task; + +io fn start(chan[int] c, int start, int number_of_messages) { + let int i = 0; + while (i < number_of_messages) { + c <| start + i; + i += 1; + } +} + +fn main() -> () { + log "Check that we don't deadlock."; + let port[int] p = port(); + let task a = spawn "start" start(chan(p), 0, 10); + join a; + log "Joined Task"; +} \ No newline at end of file diff --git a/src/test/run-pass/task-comm-15.rs b/src/test/run-pass/task-comm-15.rs new file mode 100644 index 00000000000..8d748f59538 --- /dev/null +++ b/src/test/run-pass/task-comm-15.rs @@ -0,0 +1,14 @@ +io fn start(chan[int] c, int n) { + let int i = n; + + while(i > 0) { + c <| 0; + i = i - 1; + } +} + +io fn main() { + let port[int] p = port(); + auto child = spawn thread "child" start(chan(p), 10); + auto c <- p; +} \ No newline at end of file