class EventMachine::Queue

A cross thread, reactor scheduled, linear queue.

This class provides a simple queue abstraction on top of the reactor scheduler. It services two primary purposes:

@example

q = EM::Queue.new
q.push('one', 'two', 'three')
3.times do
  q.pop { |msg| puts(msg) }
end

Public Class Methods

new() click to toggle source
# File lib/em/queue.rb, line 19
def initialize
  @sink  = []
  @drain = []
  @popq  = []
end

Public Instance Methods

<<(*items)
Alias for: push
empty?() click to toggle source

@return [Boolean] @note This is a peek, it’s not thread safe, and may only tend toward accuracy.

# File lib/em/queue.rb, line 63
def empty?
  @drain.empty? && @sink.empty?
end
num_waiting() click to toggle source

@return [Integer] Waiting size @note This is a peek at the number of jobs that are currently waiting on the Queue

# File lib/em/queue.rb, line 75
def num_waiting
  @popq.size
end
pop(*a, &b) click to toggle source

Pop items off the queue, running the block on the reactor thread. The pop will not happen immediately, but at some point in the future, either in the next tick, if the queue has data, or when the queue is populated.

@return [NilClass] nil

# File lib/em/queue.rb, line 30
def pop(*a, &b)
  cb = EM::Callback(*a, &b)
  EM.schedule do
    if @drain.empty?
      @drain = @sink
      @sink = []
    end
    if @drain.empty?
      @popq << cb
    else
      cb.call @drain.shift
    end
  end
  nil # Always returns nil
end
push(*items) click to toggle source

Push items onto the queue in the reactor thread. The items will not appear in the queue immediately, but will be scheduled for addition during the next reactor tick.

# File lib/em/queue.rb, line 49
def push(*items)
  EM.schedule do
    @sink.push(*items)
    unless @popq.empty?
      @drain = @sink
      @sink = []
      @popq.shift.call @drain.shift until @drain.empty? || @popq.empty?
    end
  end
end
Also aliased as: <<
size() click to toggle source

@return [Integer] Queue size @note This is a peek, it’s not thread safe, and may only tend toward accuracy.

# File lib/em/queue.rb, line 69
def size
  @drain.size + @sink.size
end