class ConcurrentQueue
Public Class Methods
new()
click to toggle source
# File lib/concurrent_queue.rb, line 17 def initialize @listeners = [] @mutex = Mutex.new @queue = Array.new end
pop(arg)
click to toggle source
# File lib/concurrent_queue.rb, line 7 def self.pop(arg) arg = [arg] if arg.is_a?(ConcurrentQueue) unless arg.is_a?(Array) && arg.all? { |queue| queue.is_a?(ConcurrentQueue) } raise ArgumentError, 'must pass array of ConcurrentQueues' end queues = arg listener = Listener.new listener.pop(queues) end
Public Instance Methods
add_listener(listener)
click to toggle source
# File lib/concurrent_queue.rb, line 39 def add_listener(listener) @mutex.synchronize do @listeners << listener notify if @queue.any? end nil end
length()
click to toggle source
# File lib/concurrent_queue.rb, line 35 def length @mutex.synchronize { @queue.length } end
pop()
click to toggle source
# File lib/concurrent_queue.rb, line 23 def pop self.class.pop(self) end
push(item)
click to toggle source
# File lib/concurrent_queue.rb, line 27 def push(item) @mutex.synchronize do @queue.push(item) notify end nil end
remove_listener(listener)
click to toggle source
# File lib/concurrent_queue.rb, line 47 def remove_listener(listener) @mutex.synchronize do @listeners.delete(listener) end nil end
Private Instance Methods
notify()
click to toggle source
# File lib/concurrent_queue.rb, line 56 def notify @listeners.each do |listener| item = @queue.first was_accepted = listener.send(item) if was_accepted @queue.shift break end end nil end