class MultiOpQueue::Queue
Original Queue
implementation from Ruby-2.0.0 github.com/ruby/ruby/blob/ruby_2_0_0/lib/thread.rb
This class provides a way to synchronize communication between threads.
Example:
require 'thread' queue = Queue.new producer = Thread.new do 5.times do |i| sleep rand(i) # simulate expense queue << i puts "#{i} produced" end end consumer = Thread.new do 5.times do |i| value = queue.pop sleep rand(i/2) # simulate expense puts "consumed #{value}" end end consumer.join
Public Class Methods
Creates a new queue.
# File lib/multi_op_queue.rb, line 37 def initialize @que = [] @que.taint # enable tainted communication @num_waiting = 0 self.taint @mutex = Mutex.new @cond = ConditionVariable.new end
Public Instance Methods
Removes all objects from the queue.
# File lib/multi_op_queue.rb, line 167 def clear @que.clear end
Concatenates ary
onto the queue.
# File lib/multi_op_queue.rb, line 49 def concat(ary) handle_interrupt do @mutex.synchronize do @que.concat ary @cond.signal end end end
Returns true
if the queue is empty.
# File lib/multi_op_queue.rb, line 160 def empty? @que.empty? end
Returns the length of the queue.
# File lib/multi_op_queue.rb, line 174 def length @que.length end
Returns the number of threads waiting on the queue.
# File lib/multi_op_queue.rb, line 186 def num_waiting @num_waiting end
Retrieves data from the queue. If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block
is true, the thread isn't suspended, and an exception is raised.
# File lib/multi_op_queue.rb, line 85 def pop(non_block=false) handle_interrupt do @mutex.synchronize do while true if @que.empty? if non_block raise ThreadError, "queue empty" else begin @num_waiting += 1 @cond.wait @mutex ensure @num_waiting -= 1 end end else return @que.shift end end end end end
Retrieves data from the queue and returns array of contents. If num_to_pop
are available in the queue then multiple elements are returned in array response If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block
is true, the thread isn't suspended, and an exception is raised.
# File lib/multi_op_queue.rb, line 125 def pop_up_to(num_to_pop = 1, opts = {}) case opts when TrueClass, FalseClass non_bock = opts when Hash timeout = opts.fetch(:timeout, nil) non_block = opts.fetch(:non_block, false) end handle_interrupt do @mutex.synchronize do while true if @que.empty? if non_block raise ThreadError, "queue empty" else begin @num_waiting += 1 @cond.wait(@mutex, timeout) return nil if @que.empty? ensure @num_waiting -= 1 end end else return @que.shift(num_to_pop) end end end end end
Pushes obj
to the queue.
# File lib/multi_op_queue.rb, line 61 def push(obj) handle_interrupt do @mutex.synchronize do @que.push obj @cond.signal end end end
Private Instance Methods
# File lib/multi_op_queue.rb, line 192 def handle_interrupt @handle_interrupt = Thread.respond_to?(:handle_interrupt) if @handle_interrupt.nil? if @handle_interrupt Thread.handle_interrupt(StandardError => :on_blocking) do yield end else yield end end