class Polyphony::Queue
Public Class Methods
new(*args)
click to toggle source
static VALUE Queue_initialize(int argc, VALUE *argv, VALUE self) { Queue_t *queue; GetQueue(self, queue); ring_buffer_init(&queue->values); ring_buffer_init(&queue->shift_queue); ring_buffer_init(&queue->push_queue); queue->capacity = (argc == 1) ? NUM2UINT(argv[0]) : 0; return self; }
Public Instance Methods
cap(p1)
click to toggle source
VALUE Queue_cap(VALUE self, VALUE cap) { unsigned int new_capacity = NUM2UINT(cap); Queue_t *queue; GetQueue(self, queue); queue->capacity = new_capacity; if (queue->capacity) queue_schedule_blocked_fibers_to_capacity(queue); else queue_schedule_all_blocked_fibers(&queue->push_queue); return self; }
capped?()
click to toggle source
VALUE Queue_capped_p(VALUE self) { Queue_t *queue; GetQueue(self, queue); return queue->capacity ? UINT2NUM(queue->capacity) : Qnil; }
clear()
click to toggle source
VALUE Queue_clear(VALUE self) { Queue_t *queue; GetQueue(self, queue); ring_buffer_clear(&queue->values); if (queue->capacity) queue_schedule_blocked_fibers_to_capacity(queue); return self; }
delete(p1)
click to toggle source
VALUE Queue_delete(VALUE self, VALUE value) { Queue_t *queue; GetQueue(self, queue); ring_buffer_delete(&queue->values, value); if (queue->capacity && (queue->capacity > queue->values.count)) queue_schedule_first_blocked_fiber(&queue->push_queue); return self; }
empty?()
click to toggle source
VALUE Queue_empty_p(VALUE self) { Queue_t *queue; GetQueue(self, queue); return (!queue->values.count) ? Qtrue : Qfalse; }
flush_waiters(p1)
click to toggle source
VALUE Queue_flush_waiters(VALUE self, VALUE value) { Queue_t *queue; GetQueue(self, queue); while(1) { VALUE fiber = ring_buffer_shift(&queue->shift_queue); if (fiber == Qnil) return self; Fiber_make_runnable(fiber, value); } }
pending?()
click to toggle source
VALUE Queue_pending_p(VALUE self) { Queue_t *queue; GetQueue(self, queue); return (queue->shift_queue.count) ? Qtrue : Qfalse; }
push(p1)
click to toggle source
VALUE Queue_push(VALUE self, VALUE value) { Queue_t *queue; GetQueue(self, queue); if (queue->capacity) capped_queue_block_push(queue); queue_schedule_first_blocked_fiber(&queue->shift_queue); ring_buffer_push(&queue->values, value); return self; }
Also aliased as: <<
shift()
click to toggle source
VALUE Queue_shift(VALUE self) { Queue_t *queue; GetQueue(self, queue); VALUE fiber = rb_fiber_current(); VALUE thread = rb_thread_current(); VALUE backend = rb_ivar_get(thread, ID_ivar_backend); while (1) { if (queue->values.count) Fiber_make_runnable(fiber, Qnil); ring_buffer_push(&queue->shift_queue, fiber); VALUE switchpoint_result = Backend_wait_event(backend, Qnil); ring_buffer_delete(&queue->shift_queue, fiber); RAISE_IF_EXCEPTION(switchpoint_result); RB_GC_GUARD(switchpoint_result); if (queue->values.count) break; } VALUE value = ring_buffer_shift(&queue->values); if ((queue->capacity) && (queue->capacity > queue->values.count)) queue_schedule_first_blocked_fiber(&queue->push_queue); RB_GC_GUARD(value); return value; }
Also aliased as: pop
shift_all()
click to toggle source
VALUE Queue_shift_all(VALUE self) { Queue_t *queue; GetQueue(self, queue); VALUE result = ring_buffer_shift_all(&queue->values); if (queue->capacity) queue_schedule_blocked_fibers_to_capacity(queue); return result; }
shift_each()
click to toggle source
VALUE Queue_shift_each(VALUE self) { Queue_t *queue; GetQueue(self, queue); ring_buffer_shift_each(&queue->values); if (queue->capacity) queue_schedule_blocked_fibers_to_capacity(queue); return self; }
size()
click to toggle source
VALUE Queue_size_m(VALUE self) { Queue_t *queue; GetQueue(self, queue); return INT2NUM(queue->values.count); }
unshift(p1)
click to toggle source
VALUE Queue_unshift(VALUE self, VALUE value) { Queue_t *queue; GetQueue(self, queue); if (queue->capacity) capped_queue_block_push(queue); queue_schedule_first_blocked_fiber(&queue->shift_queue); ring_buffer_unshift(&queue->values, value); return self; }