class Vx::Lib::Consumer::Subscriber
Attributes
queue_name[RW]
vx_consumer_name[RW]
Public Class Methods
new(*args)
click to toggle source
Calls superclass method
# File lib/vx/lib/consumer/subscriber.rb, line 11 def initialize(*args) super(*args) @lock = Mutex.new end
Public Instance Methods
call(*args)
click to toggle source
# File lib/vx/lib/consumer/subscriber.rb, line 45 def call(*args) in_progress do @on_delivery.call(*args) if @on_delivery sleep 0 end end
cancel()
click to toggle source
Calls superclass method
# File lib/vx/lib/consumer/subscriber.rb, line 52 def cancel unless closed? super channel.close unless closed? end end
closed?()
click to toggle source
# File lib/vx/lib/consumer/subscriber.rb, line 59 def closed? channel.closed? end
graceful_shutdown()
click to toggle source
# File lib/vx/lib/consumer/subscriber.rb, line 16 def graceful_shutdown in_progress do cancel end end
in_progress() { || ... }
click to toggle source
# File lib/vx/lib/consumer/subscriber.rb, line 35 def in_progress @lock.synchronize do yield end end
join()
click to toggle source
# File lib/vx/lib/consumer/subscriber.rb, line 63 def join channel.work_pool.join end
running?()
click to toggle source
# File lib/vx/lib/consumer/subscriber.rb, line 41 def running? @lock.locked? end
try_graceful_shutdown()
click to toggle source
# File lib/vx/lib/consumer/subscriber.rb, line 22 def try_graceful_shutdown if @lock.try_lock begin cancel ensure @lock.unlock end true else false end end
wait_shutdown()
click to toggle source
# File lib/vx/lib/consumer/subscriber.rb, line 67 def wait_shutdown Thread.new do Thread.current.abort_on_exception = true Consumer.wait_shutdown cancel end end