class Hutch::Broker
Attributes
api_client[RW]
channel[RW]
connection[RW]
exchange[RW]
Public Class Methods
new(config = nil)
click to toggle source
# File lib/hutch/broker.rb, line 13 def initialize(config = nil) @config = config || Hutch::Config end
Public Instance Methods
ack(delivery_tag)
click to toggle source
# File lib/hutch/broker.rb, line 187 def ack(delivery_tag) @channel.ack(delivery_tag, false) end
bind_queue(queue, routing_keys)
click to toggle source
Bind a queue to the broker’s exchange on the routing keys provided. Any existing bindings on the queue that aren’t present in the array of routing keys will be unbound.
# File lib/hutch/broker.rb, line 142 def bind_queue(queue, routing_keys) if http_api_use_enabled? # Find the existing bindings, and unbind any redundant bindings queue_bindings = bindings.select { |dest, keys| dest == queue.name } queue_bindings.each do |dest, keys| keys.reject { |key| routing_keys.include?(key) }.each do |key| logger.debug "removing redundant binding #{queue.name} <--> #{key}" queue.unbind(@exchange, routing_key: key) end end end # Ensure all the desired bindings are present routing_keys.each do |routing_key| logger.debug "creating binding #{queue.name} <--> #{routing_key}" queue.bind(@exchange, routing_key: routing_key) end end
bindings()
click to toggle source
Return a mapping of queue names to the routing keys they’re bound to.
# File lib/hutch/broker.rb, line 128 def bindings results = Hash.new { |hash, key| hash[key] = [] } @api_client.bindings.each do |binding| next if binding['destination'] == binding['routing_key'] next unless binding['source'] == @config[:mq_exchange] next unless binding['vhost'] == @config[:mq_vhost] results[binding['destination']] << binding['routing_key'] end results end
confirm_select(*args)
click to toggle source
# File lib/hutch/broker.rb, line 216 def confirm_select(*args) @channel.confirm_select(*args) end
connect(options = {}) { || ... }
click to toggle source
# File lib/hutch/broker.rb, line 17 def connect(options = {}) @options = options set_up_amqp_connection if http_api_use_enabled? logger.info "HTTP API use is enabled" set_up_api_connection else logger.info "HTTP API use is disabled" end if tracing_enabled? logger.info "tracing is enabled using #{@config[:tracer]}" else logger.info "tracing is disabled" end if block_given? begin yield ensure disconnect end end end
disconnect()
click to toggle source
# File lib/hutch/broker.rb, line 42 def disconnect @channel.close if @channel @connection.close if @connection @channel, @connection, @exchange, @api_client = nil, nil, nil, nil end
http_api_use_enabled?()
click to toggle source
# File lib/hutch/broker.rb, line 103 def http_api_use_enabled? op = @options.fetch(:enable_http_api_use, true) cf = if @config[:enable_http_api_use].nil? true else @config[:enable_http_api_use] end op && cf end
nack(delivery_tag)
click to toggle source
# File lib/hutch/broker.rb, line 191 def nack(delivery_tag) @channel.nack(delivery_tag, false, false) end
open_channel!()
click to toggle source
# File lib/hutch/broker.rb, line 76 def open_channel! logger.info "opening rabbitmq channel with pool size #{consumer_pool_size}" @channel = connection.create_channel(nil, consumer_pool_size).tap do |ch| ch.prefetch(@config[:channel_prefetch]) if @config[:channel_prefetch] if @config[:publisher_confirms] || @config[:force_publisher_confirms] logger.info 'enabling publisher confirms' ch.confirm_select end end end
open_connection!()
click to toggle source
# File lib/hutch/broker.rb, line 63 def open_connection! logger.info "connecting to rabbitmq (#{sanitized_uri})" @connection = Bunny.new(connection_params) with_bunny_connection_handler(sanitized_uri) do @connection.start end logger.info "connected to RabbitMQ at #{connection_params[:host]} as #{connection_params[:username]}" @connection end
publish(routing_key, message, properties = {})
click to toggle source
# File lib/hutch/broker.rb, line 195 def publish(routing_key, message, properties = {}) ensure_connection!(routing_key, message) non_overridable_properties = { routing_key: routing_key, timestamp: Time.now.to_i, content_type: 'application/json' } properties[:message_id] ||= generate_id json = JSON.dump(message) logger.info("publishing message '#{json}' to #{routing_key}") response = @exchange.publish(json, {persistent: true}. merge(properties). merge(global_properties). merge(non_overridable_properties)) channel.wait_for_confirms if @config[:force_publisher_confirms] response end
queue(name, arguments = {})
click to toggle source
Create / get a durable queue and apply namespace if it exists.
# File lib/hutch/broker.rb, line 119 def queue(name, arguments = {}) with_bunny_precondition_handler('queue') do namespace = @config[:namespace].to_s.downcase.gsub(/[^-_:\.\w]/, "") name = name.prepend(namespace + ":") unless namespace.empty? channel.queue(name, durable: true, arguments: arguments) end end
reject(delivery_tag, requeue=false)
click to toggle source
# File lib/hutch/broker.rb, line 183 def reject(delivery_tag, requeue=false) @channel.reject(delivery_tag, requeue) end
requeue(delivery_tag)
click to toggle source
# File lib/hutch/broker.rb, line 179 def requeue(delivery_tag) @channel.reject(delivery_tag, true) end
set_up_amqp_connection()
click to toggle source
Connect to RabbitMQ via AMQP. This sets up the main connection and channel we use for talking to RabbitMQ. It also ensures the existance of the exchange we’ll be using.
# File lib/hutch/broker.rb, line 51 def set_up_amqp_connection open_connection! open_channel! exchange_name = @config[:mq_exchange] logger.info "using topic exchange '#{exchange_name}'" with_bunny_precondition_handler('exchange') do @exchange = @channel.topic(exchange_name, durable: true) end end
set_up_api_connection()
click to toggle source
Set up the connection to the RabbitMQ management API. Unfortunately, this is necessary to do a few things that are impossible over AMQP. E.g. listing queues and bindings.
# File lib/hutch/broker.rb, line 90 def set_up_api_connection logger.info "connecting to rabbitmq HTTP API (#{api_config.sanitized_uri})" with_authentication_error_handler do with_connection_error_handler do @api_client = CarrotTop.new(host: api_config.host, port: api_config.port, user: api_config.username, password: api_config.password, ssl: api_config.ssl) @api_client.exchanges end end end
stop()
click to toggle source
# File lib/hutch/broker.rb, line 170 def stop # Enqueue a failing job that kills the consumer loop channel_work_pool.shutdown # Give `timeout` seconds to jobs that are still being processed channel_work_pool.join(@config[:graceful_exit_timeout]) # If after `timeout` they are still running, they are killed channel_work_pool.kill end
tracing_enabled?()
click to toggle source
# File lib/hutch/broker.rb, line 114 def tracing_enabled? @config[:tracer] && @config[:tracer] != Hutch::Tracers::NullTracer end
using_publisher_confirmations?()
click to toggle source
# File lib/hutch/broker.rb, line 224 def using_publisher_confirmations? @channel.using_publisher_confirmations? end
wait_for_confirms()
click to toggle source
# File lib/hutch/broker.rb, line 220 def wait_for_confirms @channel.wait_for_confirms end
wait_on_threads(timeout)
click to toggle source
Each subscriber is run in a thread. This calls Thread#join on each of the subscriber threads.
# File lib/hutch/broker.rb, line 163 def wait_on_threads(timeout) # Thread#join returns nil when the timeout is hit. If any return nil, # the threads didn't all join so we return false. per_thread_timeout = timeout.to_f / work_pool_threads.length work_pool_threads.none? { |thread| thread.join(per_thread_timeout).nil? } end
Private Instance Methods
api_config()
click to toggle source
# File lib/hutch/broker.rb, line 241 def api_config @api_config ||= OpenStruct.new.tap do |config| config.host = @config[:mq_api_host] config.port = @config[:mq_api_port] config.username = @config[:mq_username] config.password = @config[:mq_password] config.ssl = @config[:mq_api_ssl] config.protocol = config.ssl ? "https://" : "http://" config.sanitized_uri = "#{config.protocol}#{config.username}@#{config.host}:#{config.port}/" end end
channel_work_pool()
click to toggle source
# File lib/hutch/broker.rb, line 339 def channel_work_pool @channel.work_pool end
connection_params()
click to toggle source
# File lib/hutch/broker.rb, line 253 def connection_params parse_uri {}.tap do |params| params[:host] = @config[:mq_host] params[:port] = @config[:mq_port] params[:vhost] = if @config[:mq_vhost] && "" != @config[:mq_vhost] @config[:mq_vhost] else Bunny::Session::DEFAULT_VHOST end params[:username] = @config[:mq_username] params[:password] = @config[:mq_password] params[:tls] = @config[:mq_tls] params[:tls_key] = @config[:mq_tls_key] params[:tls_cert] = @config[:mq_tls_cert] params[:heartbeat] = @config[:heartbeat] params[:connection_timeout] = @config[:connection_timeout] params[:read_timeout] = @config[:read_timeout] params[:write_timeout] = @config[:write_timeout] params[:automatically_recover] = true params[:network_recovery_interval] = 1 params[:client_logger] = @config[:client_logger] if @config[:client_logger] end end
consumer_pool_size()
click to toggle source
# File lib/hutch/broker.rb, line 343 def consumer_pool_size @config[:consumer_pool_size] end
ensure_connection!(routing_key, message)
click to toggle source
# File lib/hutch/broker.rb, line 236 def ensure_connection!(routing_key, message) raise_publish_error('no connection to broker', routing_key, message) unless @connection raise_publish_error('connection is closed', routing_key, message) unless @connection.open? end
generate_id()
click to toggle source
# File lib/hutch/broker.rb, line 347 def generate_id SecureRandom.uuid end
global_properties()
click to toggle source
# File lib/hutch/broker.rb, line 351 def global_properties Hutch.global_properties.respond_to?(:call) ? Hutch.global_properties.call : Hutch.global_properties end
parse_uri()
click to toggle source
# File lib/hutch/broker.rb, line 282 def parse_uri return unless @config[:uri] && !@config[:uri].empty? u = URI.parse(@config[:uri]) @config[:mq_host] = u.host @config[:mq_port] = u.port @config[:mq_vhost] = u.path.sub(/^\//, "") @config[:mq_username] = u.user @config[:mq_password] = u.password end
raise_publish_error(reason, routing_key, message)
click to toggle source
# File lib/hutch/broker.rb, line 230 def raise_publish_error(reason, routing_key, message) msg = "unable to publish - #{reason}. Message: #{JSON.dump(message)}, Routing key: #{routing_key}." logger.error(msg) raise PublishError, msg end
sanitized_uri()
click to toggle source
# File lib/hutch/broker.rb, line 294 def sanitized_uri p = connection_params scheme = p[:tls] ? "amqps" : "amqp" "#{scheme}://#{p[:username]}@#{p[:host]}:#{p[:port]}/#{p[:vhost].sub(/^\//, '')}" end
with_authentication_error_handler() { || ... }
click to toggle source
# File lib/hutch/broker.rb, line 301 def with_authentication_error_handler yield rescue Net::HTTPServerException => ex logger.error "HTTP API connection error: #{ex.message.downcase}" if ex.response.code == '401' raise AuthenticationError.new('invalid HTTP API credentials') else raise end end
with_bunny_connection_handler(uri) { || ... }
click to toggle source
# File lib/hutch/broker.rb, line 328 def with_bunny_connection_handler(uri) yield rescue Bunny::TCPConnectionFailed => ex logger.error "amqp connection error: #{ex.message.downcase}" raise ConnectionError.new("couldn't connect to rabbitmq at #{uri}. Check your configuration, network connectivity and RabbitMQ logs.") end
with_bunny_precondition_handler(item) { || ... }
click to toggle source
# File lib/hutch/broker.rb, line 319 def with_bunny_precondition_handler(item) yield rescue Bunny::PreconditionFailed => ex logger.error ex.message s = "RabbitMQ responded with 406 Precondition Failed when creating this #{item}. " + "Perhaps it is being redeclared with non-matching attributes" raise WorkerSetupError.new(s) end
with_connection_error_handler() { || ... }
click to toggle source
# File lib/hutch/broker.rb, line 312 def with_connection_error_handler yield rescue Errno::ECONNREFUSED => ex logger.error "HTTP API connection error: #{ex.message.downcase}" raise ConnectionError.new("couldn't connect to HTTP API at #{api_config.sanitized_uri}") end
work_pool_threads()
click to toggle source
# File lib/hutch/broker.rb, line 335 def work_pool_threads channel_work_pool.threads || [] end