class Hutch::Broker

Attributes

api_client[RW]
channel[RW]
connection[RW]
exchange[RW]

Public Class Methods

new(config = nil) click to toggle source
# File lib/hutch/broker.rb, line 13
def initialize(config = nil)
  @config = config || Hutch::Config
end

Public Instance Methods

ack(delivery_tag) click to toggle source
# File lib/hutch/broker.rb, line 187
def ack(delivery_tag)
  @channel.ack(delivery_tag, false)
end
bind_queue(queue, routing_keys) click to toggle source

Bind a queue to the broker’s exchange on the routing keys provided. Any existing bindings on the queue that aren’t present in the array of routing keys will be unbound.

# File lib/hutch/broker.rb, line 142
def bind_queue(queue, routing_keys)
  if http_api_use_enabled?
    # Find the existing bindings, and unbind any redundant bindings
    queue_bindings = bindings.select { |dest, keys| dest == queue.name }
    queue_bindings.each do |dest, keys|
      keys.reject { |key| routing_keys.include?(key) }.each do |key|
        logger.debug "removing redundant binding #{queue.name} <--> #{key}"
        queue.unbind(@exchange, routing_key: key)
      end
    end
  end

  # Ensure all the desired bindings are present
  routing_keys.each do |routing_key|
    logger.debug "creating binding #{queue.name} <--> #{routing_key}"
    queue.bind(@exchange, routing_key: routing_key)
  end
end
bindings() click to toggle source

Return a mapping of queue names to the routing keys they’re bound to.

# File lib/hutch/broker.rb, line 128
def bindings
  results = Hash.new { |hash, key| hash[key] = [] }
  @api_client.bindings.each do |binding|
    next if binding['destination'] == binding['routing_key']
    next unless binding['source'] == @config[:mq_exchange]
    next unless binding['vhost'] == @config[:mq_vhost]
    results[binding['destination']] << binding['routing_key']
  end
  results
end
confirm_select(*args) click to toggle source
# File lib/hutch/broker.rb, line 216
def confirm_select(*args)
  @channel.confirm_select(*args)
end
connect(options = {}) { || ... } click to toggle source
# File lib/hutch/broker.rb, line 17
def connect(options = {})
  @options = options
  set_up_amqp_connection
  if http_api_use_enabled?
    logger.info "HTTP API use is enabled"
    set_up_api_connection
  else
    logger.info "HTTP API use is disabled"
  end

  if tracing_enabled?
    logger.info "tracing is enabled using #{@config[:tracer]}"
  else
    logger.info "tracing is disabled"
  end

  if block_given?
    begin
      yield
    ensure
      disconnect
    end
  end
end
disconnect() click to toggle source
# File lib/hutch/broker.rb, line 42
def disconnect
  @channel.close    if @channel
  @connection.close if @connection
  @channel, @connection, @exchange, @api_client = nil, nil, nil, nil
end
http_api_use_enabled?() click to toggle source
# File lib/hutch/broker.rb, line 103
def http_api_use_enabled?
  op = @options.fetch(:enable_http_api_use, true)
  cf = if @config[:enable_http_api_use].nil?
         true
       else
         @config[:enable_http_api_use]
       end

  op && cf
end
nack(delivery_tag) click to toggle source
# File lib/hutch/broker.rb, line 191
def nack(delivery_tag)
  @channel.nack(delivery_tag, false, false)
end
open_channel!() click to toggle source
# File lib/hutch/broker.rb, line 76
def open_channel!
  logger.info "opening rabbitmq channel with pool size #{consumer_pool_size}"
  @channel = connection.create_channel(nil, consumer_pool_size).tap do |ch|
    ch.prefetch(@config[:channel_prefetch]) if @config[:channel_prefetch]
    if @config[:publisher_confirms] || @config[:force_publisher_confirms]
      logger.info 'enabling publisher confirms'
      ch.confirm_select
    end
  end
end
open_connection!() click to toggle source
# File lib/hutch/broker.rb, line 63
def open_connection!
  logger.info "connecting to rabbitmq (#{sanitized_uri})"

  @connection = Bunny.new(connection_params)

  with_bunny_connection_handler(sanitized_uri) do
    @connection.start
  end

  logger.info "connected to RabbitMQ at #{connection_params[:host]} as #{connection_params[:username]}"
  @connection
end
publish(routing_key, message, properties = {}) click to toggle source
# File lib/hutch/broker.rb, line 195
def publish(routing_key, message, properties = {})
  ensure_connection!(routing_key, message)

  non_overridable_properties = {
    routing_key: routing_key,
    timestamp: Time.now.to_i,
    content_type: 'application/json'
  }
  properties[:message_id] ||= generate_id

  json = JSON.dump(message)
  logger.info("publishing message '#{json}' to #{routing_key}")
  response = @exchange.publish(json, {persistent: true}.
    merge(properties).
    merge(global_properties).
    merge(non_overridable_properties))

  channel.wait_for_confirms if @config[:force_publisher_confirms]
  response
end
queue(name, arguments = {}) click to toggle source

Create / get a durable queue and apply namespace if it exists.

# File lib/hutch/broker.rb, line 119
def queue(name, arguments = {})
  with_bunny_precondition_handler('queue') do
    namespace = @config[:namespace].to_s.downcase.gsub(/[^-_:\.\w]/, "")
    name = name.prepend(namespace + ":") unless namespace.empty?
    channel.queue(name, durable: true, arguments: arguments)
  end
end
reject(delivery_tag, requeue=false) click to toggle source
# File lib/hutch/broker.rb, line 183
def reject(delivery_tag, requeue=false)
  @channel.reject(delivery_tag, requeue)
end
requeue(delivery_tag) click to toggle source
# File lib/hutch/broker.rb, line 179
def requeue(delivery_tag)
  @channel.reject(delivery_tag, true)
end
set_up_amqp_connection() click to toggle source

Connect to RabbitMQ via AMQP. This sets up the main connection and channel we use for talking to RabbitMQ. It also ensures the existance of the exchange we’ll be using.

# File lib/hutch/broker.rb, line 51
def set_up_amqp_connection
  open_connection!
  open_channel!

  exchange_name = @config[:mq_exchange]
  logger.info "using topic exchange '#{exchange_name}'"

  with_bunny_precondition_handler('exchange') do
    @exchange = @channel.topic(exchange_name, durable: true)
  end
end
set_up_api_connection() click to toggle source

Set up the connection to the RabbitMQ management API. Unfortunately, this is necessary to do a few things that are impossible over AMQP. E.g. listing queues and bindings.

# File lib/hutch/broker.rb, line 90
def set_up_api_connection
  logger.info "connecting to rabbitmq HTTP API (#{api_config.sanitized_uri})"

  with_authentication_error_handler do
    with_connection_error_handler do
      @api_client = CarrotTop.new(host: api_config.host, port: api_config.port,
                                  user: api_config.username, password: api_config.password,
                                  ssl: api_config.ssl)
      @api_client.exchanges
    end
  end
end
stop() click to toggle source
# File lib/hutch/broker.rb, line 170
def stop
  # Enqueue a failing job that kills the consumer loop
  channel_work_pool.shutdown
  # Give `timeout` seconds to jobs that are still being processed
  channel_work_pool.join(@config[:graceful_exit_timeout])
  # If after `timeout` they are still running, they are killed
  channel_work_pool.kill
end
tracing_enabled?() click to toggle source
# File lib/hutch/broker.rb, line 114
def tracing_enabled?
  @config[:tracer] && @config[:tracer] != Hutch::Tracers::NullTracer
end
using_publisher_confirmations?() click to toggle source
# File lib/hutch/broker.rb, line 224
def using_publisher_confirmations?
  @channel.using_publisher_confirmations?
end
wait_for_confirms() click to toggle source
# File lib/hutch/broker.rb, line 220
def wait_for_confirms
  @channel.wait_for_confirms
end
wait_on_threads(timeout) click to toggle source

Each subscriber is run in a thread. This calls Thread#join on each of the subscriber threads.

# File lib/hutch/broker.rb, line 163
def wait_on_threads(timeout)
  # Thread#join returns nil when the timeout is hit. If any return nil,
  # the threads didn't all join so we return false.
  per_thread_timeout = timeout.to_f / work_pool_threads.length
  work_pool_threads.none? { |thread| thread.join(per_thread_timeout).nil? }
end

Private Instance Methods

api_config() click to toggle source
# File lib/hutch/broker.rb, line 241
def api_config
  @api_config ||= OpenStruct.new.tap do |config|
    config.host = @config[:mq_api_host]
    config.port = @config[:mq_api_port]
    config.username = @config[:mq_username]
    config.password = @config[:mq_password]
    config.ssl = @config[:mq_api_ssl]
    config.protocol = config.ssl ? "https://" : "http://"
    config.sanitized_uri = "#{config.protocol}#{config.username}@#{config.host}:#{config.port}/"
  end
end
channel_work_pool() click to toggle source
# File lib/hutch/broker.rb, line 339
def channel_work_pool
  @channel.work_pool
end
connection_params() click to toggle source
# File lib/hutch/broker.rb, line 253
def connection_params
  parse_uri

  {}.tap do |params|
    params[:host]               = @config[:mq_host]
    params[:port]               = @config[:mq_port]
    params[:vhost]              = if @config[:mq_vhost] && "" != @config[:mq_vhost]
                                    @config[:mq_vhost]
                                  else
                                    Bunny::Session::DEFAULT_VHOST
                                  end
    params[:username]           = @config[:mq_username]
    params[:password]           = @config[:mq_password]
    params[:tls]                = @config[:mq_tls]
    params[:tls_key]            = @config[:mq_tls_key]
    params[:tls_cert]           = @config[:mq_tls_cert]
    params[:heartbeat]          = @config[:heartbeat]
    params[:connection_timeout] = @config[:connection_timeout]
    params[:read_timeout]       = @config[:read_timeout]
    params[:write_timeout]      = @config[:write_timeout]


    params[:automatically_recover] = true
    params[:network_recovery_interval] = 1

    params[:client_logger] = @config[:client_logger] if @config[:client_logger]
  end
end
consumer_pool_size() click to toggle source
# File lib/hutch/broker.rb, line 343
def consumer_pool_size
  @config[:consumer_pool_size]
end
ensure_connection!(routing_key, message) click to toggle source
# File lib/hutch/broker.rb, line 236
def ensure_connection!(routing_key, message)
  raise_publish_error('no connection to broker', routing_key, message) unless @connection
  raise_publish_error('connection is closed', routing_key, message) unless @connection.open?
end
generate_id() click to toggle source
# File lib/hutch/broker.rb, line 347
def generate_id
  SecureRandom.uuid
end
global_properties() click to toggle source
# File lib/hutch/broker.rb, line 351
def global_properties
  Hutch.global_properties.respond_to?(:call) ? Hutch.global_properties.call : Hutch.global_properties
end
parse_uri() click to toggle source
# File lib/hutch/broker.rb, line 282
def parse_uri
  return unless @config[:uri] && !@config[:uri].empty?

  u = URI.parse(@config[:uri])

  @config[:mq_host]     = u.host
  @config[:mq_port]     = u.port
  @config[:mq_vhost]    = u.path.sub(/^\//, "")
  @config[:mq_username] = u.user
  @config[:mq_password] = u.password
end
raise_publish_error(reason, routing_key, message) click to toggle source
# File lib/hutch/broker.rb, line 230
def raise_publish_error(reason, routing_key, message)
  msg = "unable to publish - #{reason}. Message: #{JSON.dump(message)}, Routing key: #{routing_key}."
  logger.error(msg)
  raise PublishError, msg
end
sanitized_uri() click to toggle source
# File lib/hutch/broker.rb, line 294
def sanitized_uri
  p = connection_params
  scheme = p[:tls] ? "amqps" : "amqp"

  "#{scheme}://#{p[:username]}@#{p[:host]}:#{p[:port]}/#{p[:vhost].sub(/^\//, '')}"
end
with_authentication_error_handler() { || ... } click to toggle source
# File lib/hutch/broker.rb, line 301
def with_authentication_error_handler
  yield
rescue Net::HTTPServerException => ex
  logger.error "HTTP API connection error: #{ex.message.downcase}"
  if ex.response.code == '401'
    raise AuthenticationError.new('invalid HTTP API credentials')
  else
    raise
  end
end
with_bunny_connection_handler(uri) { || ... } click to toggle source
# File lib/hutch/broker.rb, line 328
def with_bunny_connection_handler(uri)
  yield
rescue Bunny::TCPConnectionFailed => ex
  logger.error "amqp connection error: #{ex.message.downcase}"
  raise ConnectionError.new("couldn't connect to rabbitmq at #{uri}. Check your configuration, network connectivity and RabbitMQ logs.")
end
with_bunny_precondition_handler(item) { || ... } click to toggle source
# File lib/hutch/broker.rb, line 319
def with_bunny_precondition_handler(item)
  yield
rescue Bunny::PreconditionFailed => ex
  logger.error ex.message
  s = "RabbitMQ responded with 406 Precondition Failed when creating this #{item}. " +
      "Perhaps it is being redeclared with non-matching attributes"
  raise WorkerSetupError.new(s)
end
with_connection_error_handler() { || ... } click to toggle source
# File lib/hutch/broker.rb, line 312
def with_connection_error_handler
  yield
rescue Errno::ECONNREFUSED => ex
  logger.error "HTTP API connection error: #{ex.message.downcase}"
  raise ConnectionError.new("couldn't connect to HTTP API at #{api_config.sanitized_uri}")
end
work_pool_threads() click to toggle source
# File lib/hutch/broker.rb, line 335
def work_pool_threads
  channel_work_pool.threads || []
end