#include <ruby.h>
#include <intern.h>
#include <rubysig.h>
#include <node.h>
enum rb_thread_status rb_thread_status _((VALUE));
static VALUE rb_cMutex;
static VALUE rb_cConditionVariable;
static VALUE rb_cQueue;
static VALUE rb_cSizedQueue;
static VALUE set_critical(VALUE value);
static VALUE
thread_exclusive(VALUE (*func)(ANYARGS), VALUE arg)
{
VALUE critical = rb_thread_critical;
rb_thread_critical = 1;
return rb_ensure(func, arg, set_critical, (VALUE)critical);
}
static VALUE
rb_thread_exclusive(void)
{
return thread_exclusive(rb_yield, Qundef);
}
typedef struct _Entry {
VALUE value;
struct _Entry *next;
} Entry;
typedef struct _List {
Entry *entries;
Entry *last_entry;
Entry *entry_pool;
unsigned long size;
} List;
static void
init_list(List *list)
{
list->entries = NULL;
list->last_entry = NULL;
list->entry_pool = NULL;
list->size = 0;
}
static void
mark_list(List *list)
{
Entry *entry;
for (entry = list->entries; entry; entry = entry->next) {
rb_gc_mark(entry->value);
}
}
static void
free_entries(Entry *first)
{
Entry *next;
while (first) {
next = first->next;
xfree(first);
first = next;
}
}
static void
finalize_list(List *list)
{
free_entries(list->entries);
free_entries(list->entry_pool);
}
static void
push_list(List *list, VALUE value)
{
Entry *entry;
if (list->entry_pool) {
entry = list->entry_pool;
list->entry_pool = entry->next;
} else {
entry = ALLOC(Entry);
}
entry->value = value;
entry->next = NULL;
if (list->last_entry) {
list->last_entry->next = entry;
} else {
list->entries = entry;
}
list->last_entry = entry;
++list->size;
}
static void
push_multiple_list(List *list, VALUE *values, unsigned count)
{
unsigned i;
for (i = 0; i < count; i++) {
push_list(list, values[i]);
}
}
static void
recycle_entries(List *list, Entry *first_entry, Entry *last_entry)
{
#ifdef USE_MEM_POOLS
last_entry->next = list->entry_pool;
list->entry_pool = first_entry;
#else
last_entry->next = NULL;
free_entries(first_entry);
#endif
}
static VALUE
shift_list(List *list)
{
Entry *entry;
VALUE value;
entry = list->entries;
if (!entry) return Qnil;
list->entries = entry->next;
if (entry == list->last_entry) {
list->last_entry = NULL;
}
--list->size;
value = entry->value;
recycle_entries(list, entry, entry);
return value;
}
static void
remove_one(List *list, VALUE value)
{
Entry **ref;
Entry *prev;
Entry *entry;
for (ref = &list->entries, prev = NULL, entry = list->entries;
entry != NULL;
ref = &entry->next, prev = entry, entry = entry->next) {
if (entry->value == value) {
*ref = entry->next;
list->size--;
if (!entry->next) {
list->last_entry = prev;
}
recycle_entries(list, entry, entry);
break;
}
}
}
static void
clear_list(List *list)
{
if (list->last_entry) {
recycle_entries(list, list->entries, list->last_entry);
list->entries = NULL;
list->last_entry = NULL;
list->size = 0;
}
}
static VALUE
array_from_list(List const *list)
{
VALUE ary;
Entry *entry;
ary = rb_ary_new();
for (entry = list->entries; entry; entry = entry->next) {
rb_ary_push(ary, entry->value);
}
return ary;
}
static VALUE
wake_thread(VALUE thread)
{
return rb_thread_wakeup_alive(thread);
}
static VALUE
run_thread(VALUE thread)
{
thread = wake_thread(thread);
if (RTEST(thread) && !rb_thread_critical)
rb_thread_schedule();
return thread;
}
static VALUE
wake_one(List *list)
{
VALUE waking;
waking = Qnil;
while (list->entries && !RTEST(waking)) {
waking = shift_list(list);
if (waking == Qundef) break;
waking = wake_thread(waking);
}
return waking;
}
static VALUE
wake_all(List *list)
{
while (list->entries) {
wake_one(list);
}
return Qnil;
}
static VALUE
wait_list_inner(List *list)
{
push_list(list, rb_thread_current());
rb_thread_stop();
return Qnil;
}
static VALUE
wait_list_cleanup(List *list)
{
remove_one(list, rb_thread_current());
return Qnil;
}
static void
wait_list(List *list)
{
rb_ensure(wait_list_inner, (VALUE)list, wait_list_cleanup, (VALUE)list);
}
static void
kill_waiting_threads(List *waiting)
{
Entry *entry;
for (entry = waiting->entries; entry; entry = entry->next) {
rb_thread_kill(entry->value);
}
}
typedef struct _Mutex {
VALUE owner;
List waiting;
} Mutex;
#define MUTEX_LOCKED_P(mutex) (RTEST((mutex)->owner) && rb_thread_alive_p((mutex)->owner))
static void
mark_mutex(Mutex *mutex)
{
rb_gc_mark(mutex->owner);
mark_list(&mutex->waiting);
}
static void
finalize_mutex(Mutex *mutex)
{
finalize_list(&mutex->waiting);
}
static void
free_mutex(Mutex *mutex)
{
kill_waiting_threads(&mutex->waiting);
finalize_mutex(mutex);
xfree(mutex);
}
static void
init_mutex(Mutex *mutex)
{
mutex->owner = Qnil;
init_list(&mutex->waiting);
}
static VALUE
rb_mutex_alloc(VALUE klass)
{
Mutex *mutex;
mutex = ALLOC(Mutex);
init_mutex(mutex);
return Data_Wrap_Struct(klass, mark_mutex, free_mutex, mutex);
}
static VALUE
rb_mutex_locked_p(VALUE self)
{
Mutex *mutex;
Data_Get_Struct(self, Mutex, mutex);
return MUTEX_LOCKED_P(mutex) ? Qtrue : Qfalse;
}
static VALUE
rb_mutex_try_lock(VALUE self)
{
Mutex *mutex;
Data_Get_Struct(self, Mutex, mutex);
if (MUTEX_LOCKED_P(mutex))
return Qfalse;
mutex->owner = rb_thread_current();
return Qtrue;
}
static VALUE
lock_mutex(Mutex *mutex)
{
VALUE current;
current = rb_thread_current();
rb_thread_critical = 1;
if (!MUTEX_LOCKED_P(mutex)) {
mutex->owner = current;
}
else {
do {
wait_list(&mutex->waiting);
rb_thread_critical = 1;
if (!MUTEX_LOCKED_P(mutex)) {
mutex->owner = current;
break;
}
} while (mutex->owner != current);
}
rb_thread_critical = 0;
return Qnil;
}
static VALUE
rb_mutex_lock(VALUE self)
{
Mutex *mutex;
Data_Get_Struct(self, Mutex, mutex);
lock_mutex(mutex);
return self;
}
static VALUE
relock_mutex(Mutex *mutex)
{
VALUE current = rb_thread_current();
switch (rb_thread_status(current)) {
case THREAD_RUNNABLE:
case THREAD_STOPPED:
lock_mutex(mutex);
break;
default:
break;
}
return Qundef;
}
static VALUE
unlock_mutex_inner(Mutex *mutex)
{
VALUE waking;
if (mutex->owner != rb_thread_current()) {
rb_raise(rb_eThreadError, "not owner");
}
waking = wake_one(&mutex->waiting);
mutex->owner = waking;
return waking;
}
static VALUE
set_critical(VALUE value)
{
rb_thread_critical = (int)value;
return Qundef;
}
static VALUE
unlock_mutex(Mutex *mutex)
{
VALUE waking = thread_exclusive(unlock_mutex_inner, (VALUE)mutex);
if (!RTEST(waking)) {
return Qfalse;
}
run_thread(waking);
return Qtrue;
}
static VALUE
rb_mutex_unlock(VALUE self)
{
Mutex *mutex;
Data_Get_Struct(self, Mutex, mutex);
if (RTEST(unlock_mutex(mutex))) {
return self;
} else {
return Qnil;
}
}
static VALUE
rb_mutex_exclusive_unlock_inner(Mutex *mutex)
{
VALUE waking;
waking = unlock_mutex_inner(mutex);
rb_yield(Qundef);
return waking;
}
static VALUE
rb_mutex_exclusive_unlock(VALUE self)
{
Mutex *mutex;
VALUE waking;
Data_Get_Struct(self, Mutex, mutex);
waking = thread_exclusive(rb_mutex_exclusive_unlock_inner, (VALUE)mutex);
if (!RTEST(waking)) {
return Qnil;
}
run_thread(waking);
return self;
}
static VALUE
rb_mutex_synchronize(VALUE self)
{
rb_mutex_lock(self);
return rb_ensure(rb_yield, Qundef, rb_mutex_unlock, self);
}
typedef struct _ConditionVariable {
List waiting;
} ConditionVariable;
static void
mark_condvar(ConditionVariable *condvar)
{
mark_list(&condvar->waiting);
}
static void
finalize_condvar(ConditionVariable *condvar)
{
finalize_list(&condvar->waiting);
}
static void
free_condvar(ConditionVariable *condvar)
{
kill_waiting_threads(&condvar->waiting);
finalize_condvar(condvar);
xfree(condvar);
}
static void
init_condvar(ConditionVariable *condvar)
{
init_list(&condvar->waiting);
}
static VALUE
rb_condvar_alloc(VALUE klass)
{
ConditionVariable *condvar;
condvar = ALLOC(ConditionVariable);
init_condvar(condvar);
return Data_Wrap_Struct(klass, mark_condvar, free_condvar, condvar);
}
static void
wait_condvar(ConditionVariable *condvar, Mutex *mutex)
{
VALUE waking;
rb_thread_critical = 1;
if (rb_thread_current() != mutex->owner) {
rb_thread_critical = 0;
rb_raise(rb_eThreadError, "not owner of the synchronization mutex");
}
waking = unlock_mutex_inner(mutex);
if (RTEST(waking)) {
wake_thread(waking);
}
rb_ensure(wait_list, (VALUE)&condvar->waiting, relock_mutex, (VALUE)mutex);
}
static VALUE
legacy_exclusive_unlock(VALUE mutex)
{
return rb_funcall(mutex, rb_intern("exclusive_unlock"), 0);
}
typedef struct {
ConditionVariable *condvar;
VALUE mutex;
} legacy_wait_args;
static VALUE
legacy_wait(VALUE unused, legacy_wait_args *args)
{
wait_list(&args->condvar->waiting);
rb_funcall(args->mutex, rb_intern("lock"), 0);
return Qnil;
}
static VALUE
rb_condvar_wait(VALUE self, VALUE mutex_v)
{
ConditionVariable *condvar;
Data_Get_Struct(self, ConditionVariable, condvar);
if (CLASS_OF(mutex_v) != rb_cMutex) {
legacy_wait_args args;
args.condvar = condvar;
args.mutex = mutex_v;
rb_iterate(legacy_exclusive_unlock, mutex_v, legacy_wait, (VALUE)&args);
} else {
Mutex *mutex;
Data_Get_Struct(mutex_v, Mutex, mutex);
wait_condvar(condvar, mutex);
}
return self;
}
static VALUE
rb_condvar_broadcast(VALUE self)
{
ConditionVariable *condvar;
Data_Get_Struct(self, ConditionVariable, condvar);
thread_exclusive(wake_all, (VALUE)&condvar->waiting);
rb_thread_schedule();
return self;
}
static void
signal_condvar(ConditionVariable *condvar)
{
VALUE waking = thread_exclusive(wake_one, (VALUE)&condvar->waiting);
if (RTEST(waking)) {
run_thread(waking);
}
}
static VALUE
rb_condvar_signal(VALUE self)
{
ConditionVariable *condvar;
Data_Get_Struct(self, ConditionVariable, condvar);
signal_condvar(condvar);
return self;
}
typedef struct _Queue {
Mutex mutex;
ConditionVariable value_available;
ConditionVariable space_available;
List values;
unsigned long capacity;
} Queue;
static void
mark_queue(Queue *queue)
{
mark_mutex(&queue->mutex);
mark_condvar(&queue->value_available);
mark_condvar(&queue->space_available);
mark_list(&queue->values);
}
static void
finalize_queue(Queue *queue)
{
finalize_mutex(&queue->mutex);
finalize_condvar(&queue->value_available);
finalize_condvar(&queue->space_available);
finalize_list(&queue->values);
}
static void
free_queue(Queue *queue)
{
kill_waiting_threads(&queue->mutex.waiting);
kill_waiting_threads(&queue->space_available.waiting);
kill_waiting_threads(&queue->value_available.waiting);
finalize_queue(queue);
xfree(queue);
}
static void
init_queue(Queue *queue)
{
init_mutex(&queue->mutex);
init_condvar(&queue->value_available);
init_condvar(&queue->space_available);
init_list(&queue->values);
queue->capacity = 0;
}
static VALUE
rb_queue_alloc(VALUE klass)
{
Queue *queue;
queue = ALLOC(Queue);
init_queue(queue);
return Data_Wrap_Struct(klass, mark_queue, free_queue, queue);
}
static VALUE
rb_queue_marshal_load(VALUE self, VALUE data)
{
Queue *queue;
VALUE array;
Data_Get_Struct(self, Queue, queue);
array = rb_marshal_load(data);
if (TYPE(array) != T_ARRAY) {
rb_raise(rb_eTypeError, "expected Array of queue data");
}
if (RARRAY(array)->len < 1) {
rb_raise(rb_eArgError, "missing capacity value");
}
queue->capacity = NUM2ULONG(rb_ary_shift(array));
push_multiple_list(&queue->values, RARRAY(array)->ptr, (unsigned)RARRAY(array)->len);
return self;
}
static VALUE
rb_queue_marshal_dump(VALUE self)
{
Queue *queue;
VALUE array;
Data_Get_Struct(self, Queue, queue);
array = array_from_list(&queue->values);
rb_ary_unshift(array, ULONG2NUM(queue->capacity));
return rb_marshal_dump(array, Qnil);
}
static VALUE
rb_queue_clear(VALUE self)
{
Queue *queue;
Data_Get_Struct(self, Queue, queue);
lock_mutex(&queue->mutex);
clear_list(&queue->values);
signal_condvar(&queue->space_available);
unlock_mutex(&queue->mutex);
return self;
}
static VALUE
rb_queue_empty_p(VALUE self)
{
Queue *queue;
VALUE result;
Data_Get_Struct(self, Queue, queue);
lock_mutex(&queue->mutex);
result = queue->values.size == 0 ? Qtrue : Qfalse;
unlock_mutex(&queue->mutex);
return result;
}
static VALUE
rb_queue_length(VALUE self)
{
Queue *queue;
VALUE result;
Data_Get_Struct(self, Queue, queue);
lock_mutex(&queue->mutex);
result = ULONG2NUM(queue->values.size);
unlock_mutex(&queue->mutex);
return result;
}
static VALUE
rb_queue_num_waiting(VALUE self)
{
Queue *queue;
VALUE result;
Data_Get_Struct(self, Queue, queue);
lock_mutex(&queue->mutex);
result = ULONG2NUM(queue->value_available.waiting.size +
queue->space_available.waiting.size);
unlock_mutex(&queue->mutex);
return result;
}
static VALUE
rb_queue_pop(int argc, VALUE *argv, VALUE self)
{
Queue *queue;
int should_block;
VALUE result;
Data_Get_Struct(self, Queue, queue);
if (argc == 0) {
should_block = 1;
} else if (argc == 1) {
should_block = !RTEST(argv[0]);
} else {
rb_raise(rb_eArgError, "wrong number of arguments (%d for 1)", argc);
}
lock_mutex(&queue->mutex);
if (!queue->values.entries && !should_block) {
unlock_mutex(&queue->mutex);
rb_raise(rb_eThreadError, "queue empty");
}
while (!queue->values.entries) {
wait_condvar(&queue->value_available, &queue->mutex);
}
result = shift_list(&queue->values);
if (queue->capacity && queue->values.size < queue->capacity) {
signal_condvar(&queue->space_available);
}
unlock_mutex(&queue->mutex);
return result;
}
static VALUE
rb_queue_push(VALUE self, VALUE value)
{
Queue *queue;
Data_Get_Struct(self, Queue, queue);
lock_mutex(&queue->mutex);
while (queue->capacity && queue->values.size >= queue->capacity) {
wait_condvar(&queue->space_available, &queue->mutex);
}
push_list(&queue->values, value);
signal_condvar(&queue->value_available);
unlock_mutex(&queue->mutex);
return self;
}
static VALUE
rb_sized_queue_max(VALUE self)
{
Queue *queue;
VALUE result;
Data_Get_Struct(self, Queue, queue);
lock_mutex(&queue->mutex);
result = ULONG2NUM(queue->capacity);
unlock_mutex(&queue->mutex);
return result;
}
static VALUE
rb_sized_queue_max_set(VALUE self, VALUE value)
{
Queue *queue;
unsigned long new_capacity;
unsigned long difference;
Data_Get_Struct(self, Queue, queue);
new_capacity = NUM2ULONG(value);
if (new_capacity < 1) {
rb_raise(rb_eArgError, "value must be positive");
}
lock_mutex(&queue->mutex);
if (queue->capacity && new_capacity > queue->capacity) {
difference = new_capacity - queue->capacity;
} else {
difference = 0;
}
queue->capacity = new_capacity;
for (; difference > 0; --difference) {
signal_condvar(&queue->space_available);
}
unlock_mutex(&queue->mutex);
return self;
}
static VALUE
dummy_load(VALUE self, VALUE string)
{
return Qnil;
}
static VALUE
dummy_dump(VALUE self)
{
return rb_str_new2("");
}
void
Init_thread(void)
{
rb_define_singleton_method(rb_cThread, "exclusive", rb_thread_exclusive, 0);
rb_cMutex = rb_define_class("Mutex", rb_cObject);
rb_define_alloc_func(rb_cMutex, rb_mutex_alloc);
rb_define_method(rb_cMutex, "marshal_load", dummy_load, 1);
rb_define_method(rb_cMutex, "marshal_dump", dummy_dump, 0);
rb_define_method(rb_cMutex, "locked?", rb_mutex_locked_p, 0);
rb_define_method(rb_cMutex, "try_lock", rb_mutex_try_lock, 0);
rb_define_method(rb_cMutex, "lock", rb_mutex_lock, 0);
rb_define_method(rb_cMutex, "unlock", rb_mutex_unlock, 0);
rb_define_method(rb_cMutex, "exclusive_unlock", rb_mutex_exclusive_unlock, 0);
rb_define_method(rb_cMutex, "synchronize", rb_mutex_synchronize, 0);
rb_cConditionVariable = rb_define_class("ConditionVariable", rb_cObject);
rb_define_alloc_func(rb_cConditionVariable, rb_condvar_alloc);
rb_define_method(rb_cConditionVariable, "marshal_load", dummy_load, 1);
rb_define_method(rb_cConditionVariable, "marshal_dump", dummy_dump, 0);
rb_define_method(rb_cConditionVariable, "wait", rb_condvar_wait, 1);
rb_define_method(rb_cConditionVariable, "broadcast", rb_condvar_broadcast, 0);
rb_define_method(rb_cConditionVariable, "signal", rb_condvar_signal, 0);
rb_cQueue = rb_define_class("Queue", rb_cObject);
rb_define_alloc_func(rb_cQueue, rb_queue_alloc);
rb_define_method(rb_cQueue, "marshal_load", rb_queue_marshal_load, 1);
rb_define_method(rb_cQueue, "marshal_dump", rb_queue_marshal_dump, 0);
rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0);
rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
rb_define_method(rb_cQueue, "pop", rb_queue_pop, -1);
rb_define_method(rb_cQueue, "push", rb_queue_push, 1);
rb_alias(rb_cQueue, rb_intern("enq"), rb_intern("push"));
rb_alias(rb_cQueue, rb_intern("<<"), rb_intern("push"));
rb_alias(rb_cQueue, rb_intern("deq"), rb_intern("pop"));
rb_alias(rb_cQueue, rb_intern("shift"), rb_intern("pop"));
rb_alias(rb_cQueue, rb_intern("size"), rb_intern("length"));
rb_cSizedQueue = rb_define_class("SizedQueue", rb_cQueue);
rb_define_method(rb_cSizedQueue, "initialize", rb_sized_queue_max_set, 1);
rb_define_method(rb_cSizedQueue, "num_waiting", rb_queue_num_waiting, 0);
rb_define_method(rb_cSizedQueue, "pop", rb_queue_pop, -1);
rb_define_method(rb_cSizedQueue, "push", rb_queue_push, 1);
rb_define_method(rb_cSizedQueue, "max", rb_sized_queue_max, 0);
rb_define_method(rb_cSizedQueue, "max=", rb_sized_queue_max_set, 1);
rb_alias(rb_cSizedQueue, rb_intern("enq"), rb_intern("push"));
rb_alias(rb_cSizedQueue, rb_intern("<<"), rb_intern("push"));
rb_alias(rb_cSizedQueue, rb_intern("deq"), rb_intern("pop"));
rb_alias(rb_cSizedQueue, rb_intern("shift"), rb_intern("pop"));
}