class Polyphony::Queue

Public Class Methods

new(*args) click to toggle source
static VALUE Queue_initialize(int argc, VALUE *argv, VALUE self) {
  Queue_t *queue;
  GetQueue(self, queue);

  ring_buffer_init(&queue->values);
  ring_buffer_init(&queue->shift_queue);
  ring_buffer_init(&queue->push_queue);
  queue->capacity = (argc == 1) ?  NUM2UINT(argv[0]) : 0;

  return self;
}

Public Instance Methods

<<(p1)
Alias for: push
cap(p1) click to toggle source
VALUE Queue_cap(VALUE self, VALUE cap) {
  unsigned int new_capacity = NUM2UINT(cap);
  Queue_t *queue;
  GetQueue(self, queue);
  queue->capacity = new_capacity;
  
  if (queue->capacity)
    queue_schedule_blocked_fibers_to_capacity(queue);
  else
    queue_schedule_all_blocked_fibers(&queue->push_queue);
  
  return self;
}
capped?() click to toggle source
VALUE Queue_capped_p(VALUE self) {
  Queue_t *queue;
  GetQueue(self, queue);

  return queue->capacity ? UINT2NUM(queue->capacity) : Qnil;
}
clear() click to toggle source
VALUE Queue_clear(VALUE self) {
  Queue_t *queue;
  GetQueue(self, queue);

  ring_buffer_clear(&queue->values);
  if (queue->capacity) queue_schedule_blocked_fibers_to_capacity(queue);

  return self;
}
delete(p1) click to toggle source
VALUE Queue_delete(VALUE self, VALUE value) {
  Queue_t *queue;
  GetQueue(self, queue);

  ring_buffer_delete(&queue->values, value);

  if (queue->capacity && (queue->capacity > queue->values.count))
    queue_schedule_first_blocked_fiber(&queue->push_queue);

  return self;
}
empty?() click to toggle source
VALUE Queue_empty_p(VALUE self) {
  Queue_t *queue;
  GetQueue(self, queue);

  return (!queue->values.count) ? Qtrue : Qfalse;
}
flush_waiters(p1) click to toggle source
VALUE Queue_flush_waiters(VALUE self, VALUE value) {
  Queue_t *queue;
  GetQueue(self, queue);

  while(1) {
    VALUE fiber = ring_buffer_shift(&queue->shift_queue);
    if (fiber == Qnil) return self;

    Fiber_make_runnable(fiber, value);
  }
}
pending?() click to toggle source
VALUE Queue_pending_p(VALUE self) {
  Queue_t *queue;
  GetQueue(self, queue);

  return (queue->shift_queue.count) ? Qtrue : Qfalse;
}
pop()
Alias for: shift
push(p1) click to toggle source
VALUE Queue_push(VALUE self, VALUE value) {
  Queue_t *queue;
  GetQueue(self, queue);

  if (queue->capacity) capped_queue_block_push(queue);

  queue_schedule_first_blocked_fiber(&queue->shift_queue);
  ring_buffer_push(&queue->values, value);

  return self;
}
Also aliased as: <<
shift() click to toggle source
VALUE Queue_shift(VALUE self) {
  Queue_t *queue;
  GetQueue(self, queue);

  VALUE fiber = rb_fiber_current();
  VALUE thread = rb_thread_current();
  VALUE backend = rb_ivar_get(thread, ID_ivar_backend);

  while (1) {
    if (queue->values.count) Fiber_make_runnable(fiber, Qnil);

    ring_buffer_push(&queue->shift_queue, fiber);
    VALUE switchpoint_result = Backend_wait_event(backend, Qnil);
    ring_buffer_delete(&queue->shift_queue, fiber);

    RAISE_IF_EXCEPTION(switchpoint_result);
    RB_GC_GUARD(switchpoint_result);
    if (queue->values.count) break;
  }
  VALUE value = ring_buffer_shift(&queue->values);
  if ((queue->capacity) && (queue->capacity > queue->values.count))
    queue_schedule_first_blocked_fiber(&queue->push_queue);
  RB_GC_GUARD(value);
  return value;
}
Also aliased as: pop
shift_all() click to toggle source
VALUE Queue_shift_all(VALUE self) {
  Queue_t *queue;
  GetQueue(self, queue);

  VALUE result = ring_buffer_shift_all(&queue->values);
  if (queue->capacity) queue_schedule_blocked_fibers_to_capacity(queue);
  return result;
}
shift_each() click to toggle source
VALUE Queue_shift_each(VALUE self) {
  Queue_t *queue;
  GetQueue(self, queue);

  ring_buffer_shift_each(&queue->values);
  if (queue->capacity) queue_schedule_blocked_fibers_to_capacity(queue);
  return self;
}
size() click to toggle source
VALUE Queue_size_m(VALUE self) {
  Queue_t *queue;
  GetQueue(self, queue);

  return INT2NUM(queue->values.count);
}
unshift(p1) click to toggle source
VALUE Queue_unshift(VALUE self, VALUE value) {
  Queue_t *queue;
  GetQueue(self, queue);

  if (queue->capacity) capped_queue_block_push(queue);

  queue_schedule_first_blocked_fiber(&queue->shift_queue);
  ring_buffer_unshift(&queue->values, value);

  return self;
}