class LogCourier::EventQueue

EventQueue

Attributes

max[R]

Returns the maximum size of the queue.

Public Class Methods

new(max) click to toggle source

Creates a fixed-length queue with a maximum size of max.

# File lib/log-courier/event_queue.rb, line 34
def initialize(max)
  raise ArgumentError, 'queue size must be positive' unless max.positive?

  @max = max
  @enque_cond = ConditionVariable.new
  @num_enqueue_waiting = 0

  @que = []
  @que.taint # enable tainted communication
  @num_waiting = 0
  taint
  @mutex = Mutex.new
  @cond = ConditionVariable.new
end

Public Instance Methods

<<(obj, timeout = nil)

Alias of push

Alias for: push
clear() click to toggle source

Removes all objects from the queue.

# File lib/log-courier/event_queue.rb, line 142
def clear
  @mutex.synchronize do
    @que.clear
  end
  self
end
deq(*args)

Alias of pop

Alias for: pop
empty?() click to toggle source

Returns true if the queue is empty.

# File lib/log-courier/event_queue.rb, line 133
def empty?
  @mutex.synchronize do
    return @que.empty?
  end
end
enq(obj, timeout = nil)

Alias of push

Alias for: push
length() click to toggle source

Returns the length of the queue.

# File lib/log-courier/event_queue.rb, line 152
def length
  @mutex.synchronize do
    return @que.length
  end
end
Also aliased as: size
max=(max) click to toggle source

Sets the maximum size of the queue.

# File lib/log-courier/event_queue.rb, line 57
def max=(max)
  raise ArgumentError, 'queue size must be positive' unless max.positive?

  @mutex.synchronize do
    if max <= @max
      @max = max
    else
      diff = max - @max
      @max = max
      diff.times do
        @enque_cond.signal
      end
    end
  end
end
num_waiting() click to toggle source

Returns the number of threads waiting on the queue.

# File lib/log-courier/event_queue.rb, line 166
def num_waiting
  @mutex.synchronize do
    return @num_waiting + @num_enqueue_waiting
  end
end
pop(*args) click to toggle source

Retrieves data from the queue and runs a waiting thread, if any.

# File lib/log-courier/event_queue.rb, line 112
def pop(*args)
  retval = pop_timeout(*args)
  @mutex.synchronize do
    @enque_cond.signal if @que.length < @max
  end
  retval
end
Also aliased as: shift, deq
push(obj, timeout = nil) click to toggle source

Pushes obj to the queue. If there is no space left in the queue, waits until space becomes available, up to a maximum of timeout seconds.

# File lib/log-courier/event_queue.rb, line 77
def push(obj, timeout = nil)
  start = Time.now unless timeout.nil?
  @mutex.synchronize do
    loop do
      break if @que.length < @max

      @num_enqueue_waiting += 1
      begin
        @enque_cond.wait @mutex, timeout
      ensure
        @num_enqueue_waiting -= 1
      end

      raise TimeoutError if !timeout.nil? && Time.now - start >= timeout
    end

    @que.push obj
    @cond.signal
  end
  self
end
Also aliased as: <<, enq
shift(*args)

Alias of pop

Alias for: pop
size()

Alias of length.

Alias for: length

Private Instance Methods

pop_timeout(timeout = nil) click to toggle source

Retrieves data from the queue. If the queue is empty, the calling thread is suspended until data is pushed onto the queue or, if set, timeout seconds passes. If timeout is 0, the thread isn't suspended, and an exception is raised.

# File lib/log-courier/event_queue.rb, line 180
def pop_timeout(timeout = nil)
  start = Time.now unless timeout.nil?
  @mutex.synchronize do
    loop do
      return @que.shift unless @que.empty?
      raise TimeoutError if !timeout.nil? && timeout.zero?

      begin
        @num_waiting += 1
        @cond.wait @mutex, timeout
      ensure
        @num_waiting -= 1
      end
      raise TimeoutError if !timeout.nil? && Time.now - start >= timeout
    end
  end
  nil
end