class Franz::Output::RabbitMQ
RabbitMQ
output for Franz
. You must declare an x-consistent-hash type exchange, as we generate random Integers for routing keys.
Public Class Methods
new(opts={})
click to toggle source
Start a new output in the background. We’ll consume from the input queue and ship events to the configured RabbitMQ
cluster.
@param [Hash] opts options for the output @option opts [Queue] :input ([]) “input” queue @option opts [Hash] :output ({}) “output” configuration
# File lib/franz/output/rabbitmq.rb, line 20 def initialize opts={} opts = { logger: Logger.new(STDOUT), tags: [], input: [], output: { exchange: { name: 'test', durable: true }, connection: { host: 'localhost', port: 5672 } } }.deep_merge!(opts) @statz = opts[:statz] || Franz::Stats.new @statz.create :num_output, 0 @logger = opts[:logger] rabbit = Bunny.new opts[:output][:connection].merge({ network_recovery_interval: 10.0, continuation_timeout: 10_000, threaded: false, logger: @logger }) rabbit.start channel = rabbit.create_channel exchange = opts[:output][:exchange].delete(:name) exchange = channel.exchange exchange, \ { type: 'x-consistent-hash' }.merge(opts[:output][:exchange]) @stop = false @foreground = opts[:foreground] @thread = Thread.new do rand = Random.new until @stop event = opts[:input].shift unless opts[:tags].empty? event['tags'] ||= [] event['tags'] += opts[:tags] end log.debug \ event: 'publish', raw: event exchange.publish \ JSON::generate(event), routing_key: rand.rand(10_000), persistent: false @statz.inc :num_output end end log.info event: 'output started' @thread.join if @foreground end
Public Instance Methods
join()
click to toggle source
Join the Output
thread. Effectively only once.
# File lib/franz/output/rabbitmq.rb, line 88 def join return if @foreground @foreground = true @thread.join end
stop()
click to toggle source
Stop the Output
thread. Effectively only once.
# File lib/franz/output/rabbitmq.rb, line 95 def stop return if @foreground @foreground = true @thread.kill log.info event: 'output stopped' end
Private Instance Methods
log()
click to toggle source
# File lib/franz/output/rabbitmq.rb, line 103 def log ; @logger end