class Bunny::Concurrent::ContinuationQueue

Continuation queue implementation for MRI and Rubinius

@private

Public Class Methods

new() click to toggle source
# File lib/bunny/concurrent/continuation_queue.rb, line 9
def initialize
  @q    = []
  @lock = ::Mutex.new
  @cond = ::ConditionVariable.new
end

Public Instance Methods

<<(item)
Alias for: push
clear() click to toggle source
# File lib/bunny/concurrent/continuation_queue.rb, line 46
def clear
  @lock.synchronize do
    @q.clear
  end
end
empty?() click to toggle source
# File lib/bunny/concurrent/continuation_queue.rb, line 52
def empty?
  @q.empty?
end
length()
Alias for: size
poll(timeout_in_ms = nil) click to toggle source
# File lib/bunny/concurrent/continuation_queue.rb, line 27
def poll(timeout_in_ms = nil)
  timeout = timeout_in_ms ? timeout_in_ms / 1000.0 : nil

  @lock.synchronize do
    timeout_strikes_at = Time.now.utc + (timeout || 0)
    while @q.empty?
      wait = if timeout
               timeout_strikes_at - Time.now.utc
             else
               nil
             end
      @cond.wait(@lock, wait)
      raise ::Timeout::Error if wait && Time.now.utc >= timeout_strikes_at
    end
    item = @q.shift
    item
  end
end
pop() click to toggle source
# File lib/bunny/concurrent/continuation_queue.rb, line 23
def pop
  poll
end
push(item) click to toggle source
# File lib/bunny/concurrent/continuation_queue.rb, line 15
def push(item)
  @lock.synchronize do
    @q.push(item)
    @cond.signal
  end
end
Also aliased as: <<
size() click to toggle source
# File lib/bunny/concurrent/continuation_queue.rb, line 56
def size
  @q.size
end
Also aliased as: length