class Vx::Lib::Consumer::Session
Attributes
conn[R]
Public Instance Methods
allocate_pub_channel() { || ... }
click to toggle source
# File lib/vx/lib/consumer/session.rb, line 109 def allocate_pub_channel assert_connection_is_open key = :vx_consumer_session_pub_channel if Thread.current[key] yield else ch = conn.create_channel assign_error_handlers_to_channel(ch) Thread.current[key] = ch begin yield ensure ch = Thread.current[key] ch.close if ch.open? Thread.current[key] = nil end end end
assign_error_handlers_to_channel(ch)
click to toggle source
# File lib/vx/lib/consumer/session.rb, line 146 def assign_error_handlers_to_channel(ch) ch.on_uncaught_exception {|e, c| ::Vx::Lib::Consumer.exception_handler(e, consumer: c) } ch.on_error {|e, c| ::Vx::Lib::Consumer.exception_handler(e, consumer: c) } end
close()
click to toggle source
# File lib/vx/lib/consumer/session.rb, line 39 def close if open? @@session_lock.synchronize do begin conn.close while conn.status != :closed sleep 0.01 end rescue Bunny::ChannelError, Bunny::ClientTimeout => e Consumer.exception_handler(e, {}) end @conn = nil end end end
conn_info()
click to toggle source
# File lib/vx/lib/consumer/session.rb, line 90 def conn_info if conn "amqp://#{conn.user}@#{conn.host}:#{conn.port}/#{conn.vhost}" else "not connected" end end
declare_exchange(ch, name, options = nil)
click to toggle source
# File lib/vx/lib/consumer/session.rb, line 130 def declare_exchange(ch, name, options = nil) assert_connection_is_open options ||= {} options = {} if name == ''.freeze ch.exchange name, options end
declare_queue(ch, name, options = nil)
click to toggle source
# File lib/vx/lib/consumer/session.rb, line 139 def declare_queue(ch, name, options = nil) assert_connection_is_open options ||= {} ch.queue name, options end
live?()
click to toggle source
# File lib/vx/lib/consumer/session.rb, line 24 def live? @@live end
open(options = {})
click to toggle source
# File lib/vx/lib/consumer/session.rb, line 55 def open(options = {}) return self if open? @@session_lock.synchronize do unless open? resume bunny_options = { heartbeat: Consumer.configuration.heartbeat, automatically_recover: false, } if Consumer.configuration.logger bunny_options.merge!(logger: Consumer.configuration.logger) end @conn ||= Bunny.new( nil, # from ENV['RABBITMQ_URL'] bunny_options ) conn.start while conn.connecting? sleep 0.01 end end end self end
open?()
click to toggle source
# File lib/vx/lib/consumer/session.rb, line 86 def open? conn && conn.open? && conn.status == :open end
resume()
click to toggle source
# File lib/vx/lib/consumer/session.rb, line 28 def resume @@live = true end
shutdown()
click to toggle source
# File lib/vx/lib/consumer/session.rb, line 17 def shutdown @@shutdown_lock.synchronize do @@live = false @@shutdown.broadcast end end
wait_shutdown(timeout = nil)
click to toggle source
# File lib/vx/lib/consumer/session.rb, line 32 def wait_shutdown(timeout = nil) @@shutdown_lock.synchronize do @@shutdown.wait(@@shutdown_lock, timeout) not live? end end
with_pub_channel() { |ch| ... }
click to toggle source
# File lib/vx/lib/consumer/session.rb, line 98 def with_pub_channel key = :vx_consumer_session_pub_channel if ch = Thread.current[key] yield ch else conn.with_channel do |c| yield c end end end
Private Instance Methods
assert_connection_is_open()
click to toggle source
# File lib/vx/lib/consumer/session.rb, line 153 def assert_connection_is_open open? || raise(ConnectionDoesNotExistError) end
config()
click to toggle source
# File lib/vx/lib/consumer/session.rb, line 157 def config Consumer.configuration end