class Fasten::TimeoutQueue
Public Class Methods
new()
click to toggle source
# File lib/fasten/timeout_queue.rb, line 3 def initialize @mutex = Mutex.new @queue = [] @received = ConditionVariable.new end
Public Instance Methods
push(object)
click to toggle source
# File lib/fasten/timeout_queue.rb, line 9 def push(object) @mutex.synchronize do @queue << object @received.signal end end
receive_with_timeout(timeout = nil)
click to toggle source
# File lib/fasten/timeout_queue.rb, line 16 def receive_with_timeout(timeout = nil) # rubocop:disable Metrics/CyclomaticComplexity,Metrics/PerceivedComplexity @mutex.synchronize do if timeout.nil? # wait indefinitely until there is an element in the queue @received.wait(@mutex) while @queue.empty? elsif @queue.empty? && timeout != 0 # wait for element or timeout timeout_time = timeout + Time.now.to_f while @queue.empty? && (remaining_time = timeout_time - Time.now.to_f).positive? @received.wait(@mutex, remaining_time) end end items = [] items << @queue.shift until @queue.empty? items end end