module ActionSubscriber::RabbitConnection
Constants
- SUBSCRIBER_CONNECTION_MUTEX
Public Class Methods
subscriber_connected?()
click to toggle source
# File lib/action_subscriber/rabbit_connection.rb, line 7 def self.subscriber_connected? with_connection{|connection| connection.connected? } end
subscriber_disconnect!()
click to toggle source
# File lib/action_subscriber/rabbit_connection.rb, line 11 def self.subscriber_disconnect! SUBSCRIBER_CONNECTION_MUTEX.synchronize do @subscriber_connection.close if @subscriber_connection @subscriber_connection = nil end end
with_connection() { |subscriber_connection| ... }
click to toggle source
# File lib/action_subscriber/rabbit_connection.rb, line 18 def self.with_connection SUBSCRIBER_CONNECTION_MUTEX.synchronize do @subscriber_connection ||= create_connection yield(@subscriber_connection) end end
Private Class Methods
connection_options()
click to toggle source
# File lib/action_subscriber/rabbit_connection.rb, line 54 def self.connection_options { :automatically_recover => true, :continuation_timeout => ::ActionSubscriber.configuration.timeout * 1_000.0, #convert sec to ms :heartbeat => ::ActionSubscriber.configuration.heartbeat, :hosts => ::ActionSubscriber.configuration.hosts, :network_recovery_interval => ::ActionSubscriber.configuration.network_recovery_interval, :pass => ::ActionSubscriber.configuration.password, :port => ::ActionSubscriber.configuration.port, :recover_from_connection_close => true, :threadpool_size => ::ActionSubscriber.configuration.threadpool_size, :tls => ::ActionSubscriber.configuration.tls, :tls_ca_certificates => ::ActionSubscriber.configuration.tls_ca_certificates, :tls_cert => ::ActionSubscriber.configuration.tls_cert, :tls_key => ::ActionSubscriber.configuration.tls_key, :user => ::ActionSubscriber.configuration.username, :verify_peer => ::ActionSubscriber.configuration.verify_peer, :vhost => ::ActionSubscriber.configuration.virtual_host, } end
create_connection()
click to toggle source
Private API
# File lib/action_subscriber/rabbit_connection.rb, line 26 def self.create_connection options = connection_options if ::RUBY_PLATFORM == "java" options[:executor_factory] = ::Proc.new do ::MarchHare::ThreadPools.fixed_of_size(options[:threadpool_size]) end connection = ::MarchHare.connect(options) connection.on_blocked do |reason| on_blocked(reason) end connection.on_unblocked do on_unblocked end connection else connection = ::Bunny.new(options) connection.start connection.on_blocked do |blocked_message| on_blocked(blocked_message.reason) end connection.on_unblocked do on_unblocked end connection end end
on_blocked(reason)
click to toggle source
# File lib/action_subscriber/rabbit_connection.rb, line 76 def self.on_blocked(reason) ::ActiveSupport::Notifications.instrument("connection_blocked.action_subscriber", :reason => reason) end
on_unblocked()
click to toggle source
# File lib/action_subscriber/rabbit_connection.rb, line 81 def self.on_unblocked ::ActiveSupport::Notifications.instrument("connection_unblocked.action_subscriber") end