class ConcurrentQueue

Public Class Methods

new() click to toggle source
# File lib/concurrent_queue.rb, line 17
def initialize
  @listeners = []
  @mutex = Mutex.new
  @queue = Array.new
end
pop(arg) click to toggle source
# File lib/concurrent_queue.rb, line 7
def self.pop(arg)
  arg = [arg] if arg.is_a?(ConcurrentQueue)
  unless arg.is_a?(Array) && arg.all? { |queue| queue.is_a?(ConcurrentQueue) }
    raise ArgumentError, 'must pass array of ConcurrentQueues'
  end
  queues = arg
  listener = Listener.new
  listener.pop(queues)
end

Public Instance Methods

add_listener(listener) click to toggle source
# File lib/concurrent_queue.rb, line 39
def add_listener(listener)
  @mutex.synchronize do
    @listeners << listener
    notify if @queue.any?
  end
  nil
end
length() click to toggle source
# File lib/concurrent_queue.rb, line 35
def length
  @mutex.synchronize { @queue.length }
end
pop() click to toggle source
# File lib/concurrent_queue.rb, line 23
def pop
  self.class.pop(self)
end
push(item) click to toggle source
# File lib/concurrent_queue.rb, line 27
def push(item)
  @mutex.synchronize do
    @queue.push(item)
    notify
  end
  nil
end
remove_listener(listener) click to toggle source
# File lib/concurrent_queue.rb, line 47
def remove_listener(listener)
  @mutex.synchronize do
    @listeners.delete(listener)
  end
  nil
end

Private Instance Methods

notify() click to toggle source
# File lib/concurrent_queue.rb, line 56
def notify
  @listeners.each do |listener|
    item = @queue.first
    was_accepted = listener.send(item)
    if was_accepted
      @queue.shift
      break
    end
  end
  nil
end