class LogCourier::EventQueue
Attributes
max[R]
Returns the maximum size of the queue.
Public Class Methods
new(max)
click to toggle source
Creates a fixed-length queue with a maximum size of max
.
# File lib/log-courier/event_queue.rb, line 34 def initialize(max) raise ArgumentError, 'queue size must be positive' unless max.positive? @max = max @enque_cond = ConditionVariable.new @num_enqueue_waiting = 0 @que = [] @que.taint # enable tainted communication @num_waiting = 0 taint @mutex = Mutex.new @cond = ConditionVariable.new end
Public Instance Methods
clear()
click to toggle source
Removes all objects from the queue.
# File lib/log-courier/event_queue.rb, line 142 def clear @mutex.synchronize do @que.clear end self end
empty?()
click to toggle source
Returns true
if the queue is empty.
# File lib/log-courier/event_queue.rb, line 133 def empty? @mutex.synchronize do return @que.empty? end end
length()
click to toggle source
Returns the length of the queue.
# File lib/log-courier/event_queue.rb, line 152 def length @mutex.synchronize do return @que.length end end
Also aliased as: size
max=(max)
click to toggle source
Sets the maximum size of the queue.
# File lib/log-courier/event_queue.rb, line 57 def max=(max) raise ArgumentError, 'queue size must be positive' unless max.positive? @mutex.synchronize do if max <= @max @max = max else diff = max - @max @max = max diff.times do @enque_cond.signal end end end end
num_waiting()
click to toggle source
Returns the number of threads waiting on the queue.
# File lib/log-courier/event_queue.rb, line 166 def num_waiting @mutex.synchronize do return @num_waiting + @num_enqueue_waiting end end
pop(*args)
click to toggle source
Retrieves data from the queue and runs a waiting thread, if any.
# File lib/log-courier/event_queue.rb, line 112 def pop(*args) retval = pop_timeout(*args) @mutex.synchronize do @enque_cond.signal if @que.length < @max end retval end
push(obj, timeout = nil)
click to toggle source
Pushes obj
to the queue. If there is no space left in the queue, waits until space becomes available, up to a maximum of timeout
seconds.
# File lib/log-courier/event_queue.rb, line 77 def push(obj, timeout = nil) start = Time.now unless timeout.nil? @mutex.synchronize do loop do break if @que.length < @max @num_enqueue_waiting += 1 begin @enque_cond.wait @mutex, timeout ensure @num_enqueue_waiting -= 1 end raise TimeoutError if !timeout.nil? && Time.now - start >= timeout end @que.push obj @cond.signal end self end
Private Instance Methods
pop_timeout(timeout = nil)
click to toggle source
Retrieves data from the queue. If the queue is empty, the calling thread is suspended until data is pushed onto the queue or, if set, timeout
seconds passes. If timeout
is 0, the thread isn't suspended, and an exception is raised.
# File lib/log-courier/event_queue.rb, line 180 def pop_timeout(timeout = nil) start = Time.now unless timeout.nil? @mutex.synchronize do loop do return @que.shift unless @que.empty? raise TimeoutError if !timeout.nil? && timeout.zero? begin @num_waiting += 1 @cond.wait @mutex, timeout ensure @num_waiting -= 1 end raise TimeoutError if !timeout.nil? && Time.now - start >= timeout end end nil end