class MultiOpQueue::Queue

Original Queue implementation from Ruby-2.0.0 github.com/ruby/ruby/blob/ruby_2_0_0/lib/thread.rb

This class provides a way to synchronize communication between threads.

Example:

require 'thread'

queue = Queue.new

producer = Thread.new do
  5.times do |i|
    sleep rand(i) # simulate expense
    queue << i
    puts "#{i} produced"
  end
end

consumer = Thread.new do
  5.times do |i|
    value = queue.pop
    sleep rand(i/2) # simulate expense
    puts "consumed #{value}"
  end
end

consumer.join

Public Class Methods

new() click to toggle source

Creates a new queue.

# File lib/multi_op_queue.rb, line 37
def initialize
  @que = []
  @que.taint          # enable tainted communication
  @num_waiting = 0
  self.taint
  @mutex = Mutex.new
  @cond = ConditionVariable.new
end

Public Instance Methods

<<(obj)

Alias of push

Alias for: push
clear() click to toggle source

Removes all objects from the queue.

# File lib/multi_op_queue.rb, line 167
def clear
  @que.clear
end
concat(ary) click to toggle source

Concatenates ary onto the queue.

# File lib/multi_op_queue.rb, line 49
def concat(ary)
  handle_interrupt do
    @mutex.synchronize do
      @que.concat ary
      @cond.signal
    end
  end
end
deq(non_block=false)

Alias of pop

Alias for: pop
empty?() click to toggle source

Returns true if the queue is empty.

# File lib/multi_op_queue.rb, line 160
def empty?
  @que.empty?
end
enq(obj)

Alias of push

Alias for: push
length() click to toggle source

Returns the length of the queue.

# File lib/multi_op_queue.rb, line 174
def length
  @que.length
end
Also aliased as: size
num_waiting() click to toggle source

Returns the number of threads waiting on the queue.

# File lib/multi_op_queue.rb, line 186
def num_waiting
  @num_waiting
end
pop(non_block=false) click to toggle source

Retrieves data from the queue. If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block is true, the thread isn't suspended, and an exception is raised.

# File lib/multi_op_queue.rb, line 85
def pop(non_block=false)
  handle_interrupt do
    @mutex.synchronize do
      while true
        if @que.empty?
          if non_block
            raise ThreadError, "queue empty"
          else
            begin
              @num_waiting += 1
              @cond.wait @mutex
            ensure
              @num_waiting -= 1
            end
          end
        else
          return @que.shift
        end
      end
    end
  end
end
Also aliased as: shift, deq
pop_up_to(num_to_pop = 1, opts = {}) click to toggle source

Retrieves data from the queue and returns array of contents. If num_to_pop are available in the queue then multiple elements are returned in array response If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block is true, the thread isn't suspended, and an exception is raised.

# File lib/multi_op_queue.rb, line 125
def pop_up_to(num_to_pop = 1, opts = {})
  case opts
  when TrueClass, FalseClass
    non_bock = opts
  when Hash
    timeout = opts.fetch(:timeout, nil)
    non_block = opts.fetch(:non_block, false)
  end

  handle_interrupt do
    @mutex.synchronize do
      while true
        if @que.empty?
          if non_block
            raise ThreadError, "queue empty"
          else
            begin
              @num_waiting += 1
              @cond.wait(@mutex, timeout)
              return nil if @que.empty?
            ensure
              @num_waiting -= 1
            end
          end
        else
          return @que.shift(num_to_pop)
        end
      end
    end
  end
end
push(obj) click to toggle source

Pushes obj to the queue.

# File lib/multi_op_queue.rb, line 61
def push(obj)
  handle_interrupt do
    @mutex.synchronize do
      @que.push obj
      @cond.signal
    end
  end
end
Also aliased as: <<, enq
shift(non_block=false)

Alias of pop

Alias for: pop
size()

Alias of length.

Alias for: length

Private Instance Methods

handle_interrupt() { || ... } click to toggle source
# File lib/multi_op_queue.rb, line 192
def handle_interrupt
  @handle_interrupt = Thread.respond_to?(:handle_interrupt) if @handle_interrupt.nil?

  if @handle_interrupt
    Thread.handle_interrupt(StandardError => :on_blocking) do
      yield
    end
  else
    yield
  end
end