class Backports::FilteredQueue
Constants
- CONSUME_ON_ESCAPE
Attributes
num_waiting[R]
Like ::Queue, but with
-
filtering
-
timeout
-
raises on closed queues
Public Class Methods
new()
click to toggle source
Timeout processing based on spin.atomicobject.com/2017/06/28/queue-pop-with-timeout-fixed/
# File lib/backports/tools/filtered_queue.rb, line 31 def initialize @mutex = ::Mutex.new @queue = [] @closed = false @received = ::ConditionVariable.new @num_waiting = 0 end
Public Instance Methods
<<(x)
click to toggle source
# File lib/backports/tools/filtered_queue.rb, line 51 def <<(x) @mutex.synchronize do ensure_open @queue << Message.new(x) @received.signal end self end
Also aliased as: push
clear()
click to toggle source
# File lib/backports/tools/filtered_queue.rb, line 61 def clear @mutex.synchronize do @queue.clear end self end
close()
click to toggle source
# File lib/backports/tools/filtered_queue.rb, line 39 def close @mutex.synchronize do @closed = true @received.broadcast end self end
closed?()
click to toggle source
# File lib/backports/tools/filtered_queue.rb, line 47 def closed? @closed end
empty?()
click to toggle source
# File lib/backports/tools/filtered_queue.rb, line 82 def empty? avail = @mutex.synchronize do available! end !avail end
pop(timeout: nil, &block)
click to toggle source
# File lib/backports/tools/filtered_queue.rb, line 68 def pop(timeout: nil, &block) msg = nil exclude = [] if block # exclusion list of messages rejected by this call timeout_time = timeout + Time.now.to_f if timeout while true do @mutex.synchronize do reenter if reentrant? msg = acquire!(timeout_time, exclude) return consume!(msg).value unless block end return msg.value if filter?(msg, &block) end end
Protected Instance Methods
closed_queue_value()
click to toggle source
# File lib/backports/tools/filtered_queue.rb, line 94 def closed_queue_value ensure_open end
reenter()
click to toggle source
@return if outer message should be consumed or not
# File lib/backports/tools/filtered_queue.rb, line 99 def reenter true end
timeout_value()
click to toggle source
# File lib/backports/tools/filtered_queue.rb, line 90 def timeout_value raise self.class::TimeoutError, "timeout elapsed" end
Private Instance Methods
acquire!(timeout_time, exclude = nil)
click to toggle source
private methods assume @mutex synchonized adds to exclude list
# File lib/backports/tools/filtered_queue.rb, line 169 def acquire!(timeout_time, exclude = nil) while true do if (msg = available!(exclude)) msg.reserved = true exclude << msg if exclude return msg end return closed_queue_value if @closed # wait for element or timeout if timeout_time remaining_time = timeout_time - ::Time.now.to_f return timeout_value if remaining_time <= 0 end begin @num_waiting += 1 @received.wait(@mutex, remaining_time) ensure @num_waiting -= 1 end end end
available!(exclude = nil)
click to toggle source
# File lib/backports/tools/filtered_queue.rb, line 191 def available!(exclude = nil) @queue.find do |msg| next if exclude && exclude.include?(msg) !msg.reserved end end
commit(msg, consume)
click to toggle source
# File lib/backports/tools/filtered_queue.rb, line 132 def commit(msg, consume) @mutex.synchronize do if consume consume!(msg) else reject!(msg) end end end
consume!(msg)
click to toggle source
@returns msg
# File lib/backports/tools/filtered_queue.rb, line 123 def consume!(msg) @queue.delete(msg) end
consume_on_reentry(msg) { || ... }
click to toggle source
# File lib/backports/tools/filtered_queue.rb, line 142 def consume_on_reentry(msg) q_map = current_filtered_queues if (outer_msg = q_map[self]) commit(outer_msg, reenter) end q_map[self] = msg begin yield ensure reentered = !q_map.delete(self) end reentered end
current_filtered_queues()
click to toggle source
@returns Hash { FilteredQueue => Message }
# File lib/backports/tools/filtered_queue.rb, line 161 def current_filtered_queues t = Thread.current t.thread_variable_get(:backports_currently_filtered_queues) or t.thread_variable_set(:backports_currently_filtered_queues, {}.compare_by_identity) end
ensure_open()
click to toggle source
# File lib/backports/tools/filtered_queue.rb, line 198 def ensure_open raise self.class::ClosedQueueError, 'queue closed' if @closed end
filter?(msg) { |value)| ... }
click to toggle source
@returns:
-
true if message consumed (block result truthy or due to reentrant call)
-
false if rejected
# File lib/backports/tools/filtered_queue.rb, line 110 def filter?(msg) consume = self.class::CONSUME_ON_ESCAPE begin reentered = consume_on_reentry(msg) do consume = !!(yield msg.value) end reentered ? reenter : consume ensure commit(msg, consume) unless reentered end end
reentrant?()
click to toggle source
# File lib/backports/tools/filtered_queue.rb, line 156 def reentrant? !!current_filtered_queues[self] end
reject!(msg)
click to toggle source
# File lib/backports/tools/filtered_queue.rb, line 127 def reject!(msg) msg.reserved = false @received.broadcast end