class Daybreak::Queue

A queue for ruby implementations with a GIL

HACK: Dangerous optimization on MRI which has a global interpreter lock and makes the @queue array thread safe.

@api private

A queue for threaded implementations of ruby without a GIL @api private

Public Class Methods

new() click to toggle source
# File lib/daybreak/queue/mri.rb, line 10
def initialize
  @queue, @full, @empty = [], [], []
  @stop = false
  @heartbeat = Thread.new(&method(:heartbeat))
  @heartbeat.priority = -9
end

Public Instance Methods

<<(x) click to toggle source
# File lib/daybreak/queue/mri.rb, line 17
def <<(x)
  @queue << x
  thread = @full.first
  thread.wakeup if thread
end
close() click to toggle source
# File lib/daybreak/queue/mri.rb, line 56
def close
  @stop = true
  @heartbeat.join
end
first() click to toggle source
# File lib/daybreak/queue/mri.rb, line 31
def first
  while @queue.empty?
    begin
      @full << Thread.current
      # If a push happens before Thread.stop, the thread won't be woken up
      Thread.stop while @queue.empty?
    ensure
      @full.delete(Thread.current)
    end
  end
  @queue.first
end
flush() click to toggle source
# File lib/daybreak/queue/mri.rb, line 44
def flush
  until @queue.empty?
    begin
      @empty << Thread.current
      # If a pop happens before Thread.stop, the thread won't be woken up
      Thread.stop until @queue.empty?
    ensure
      @empty.delete(Thread.current)
    end
  end
end
pop() click to toggle source
# File lib/daybreak/queue/mri.rb, line 23
def pop
  @queue.shift
  if @queue.empty?
    thread = @empty.first
    thread.wakeup if thread
  end
end

Private Instance Methods

heartbeat() click to toggle source

Check threads 10 times per second to avoid deadlocks since there is a race condition above

# File lib/daybreak/queue/mri.rb, line 65
def heartbeat
  until @stop
    @empty.each(&:wakeup)
    @full.each(&:wakeup)
    sleep 0.1
  end
end