module ActionSubscriber::RabbitConnection

Constants

SUBSCRIBER_CONNECTION_MUTEX

Public Class Methods

subscriber_connected?() click to toggle source
# File lib/action_subscriber/rabbit_connection.rb, line 7
def self.subscriber_connected?
  with_connection{|connection| connection.connected? }
end
subscriber_disconnect!() click to toggle source
# File lib/action_subscriber/rabbit_connection.rb, line 11
def self.subscriber_disconnect!
  SUBSCRIBER_CONNECTION_MUTEX.synchronize do
    @subscriber_connection.close if @subscriber_connection
    @subscriber_connection = nil
  end
end
with_connection() { |subscriber_connection| ... } click to toggle source
# File lib/action_subscriber/rabbit_connection.rb, line 18
def self.with_connection
  SUBSCRIBER_CONNECTION_MUTEX.synchronize do
    @subscriber_connection ||= create_connection
    yield(@subscriber_connection)
  end
end

Private Class Methods

connection_options() click to toggle source
# File lib/action_subscriber/rabbit_connection.rb, line 54
def self.connection_options
  {
    :automatically_recover         => true,
    :continuation_timeout          => ::ActionSubscriber.configuration.timeout * 1_000.0, #convert sec to ms
    :heartbeat                     => ::ActionSubscriber.configuration.heartbeat,
    :hosts                         => ::ActionSubscriber.configuration.hosts,
    :network_recovery_interval     => ::ActionSubscriber.configuration.network_recovery_interval,
    :pass                          => ::ActionSubscriber.configuration.password,
    :port                          => ::ActionSubscriber.configuration.port,
    :recover_from_connection_close => true,
    :threadpool_size               => ::ActionSubscriber.configuration.threadpool_size,
    :tls                           => ::ActionSubscriber.configuration.tls,
    :tls_ca_certificates           => ::ActionSubscriber.configuration.tls_ca_certificates,
    :tls_cert                      => ::ActionSubscriber.configuration.tls_cert,
    :tls_key                       => ::ActionSubscriber.configuration.tls_key,
    :user                          => ::ActionSubscriber.configuration.username,
    :verify_peer                   => ::ActionSubscriber.configuration.verify_peer,
    :vhost                         => ::ActionSubscriber.configuration.virtual_host,
  }
end
create_connection() click to toggle source

Private API

# File lib/action_subscriber/rabbit_connection.rb, line 26
def self.create_connection
  options = connection_options
  if ::RUBY_PLATFORM == "java"
    options[:executor_factory] = ::Proc.new do
      ::MarchHare::ThreadPools.fixed_of_size(options[:threadpool_size])
    end
    connection = ::MarchHare.connect(options)
    connection.on_blocked do |reason|
      on_blocked(reason)
    end
    connection.on_unblocked do
      on_unblocked
    end
    connection
  else
    connection = ::Bunny.new(options)
    connection.start
    connection.on_blocked do |blocked_message|
      on_blocked(blocked_message.reason)
    end
    connection.on_unblocked do
      on_unblocked
    end
    connection
  end
end
on_blocked(reason) click to toggle source
# File lib/action_subscriber/rabbit_connection.rb, line 76
def self.on_blocked(reason)
  ::ActiveSupport::Notifications.instrument("connection_blocked.action_subscriber", :reason => reason)
end
on_unblocked() click to toggle source
# File lib/action_subscriber/rabbit_connection.rb, line 81
def self.on_unblocked
  ::ActiveSupport::Notifications.instrument("connection_unblocked.action_subscriber")
end