class Fluent::Plugin::RabbitMQOutput

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_rabbitmq.rb, line 68
def initialize
  super
  require "bunny"
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_rabbitmq.rb, line 73
def configure(conf)
  compat_parameters_convert(conf, :inject, :formatter, default_chunk_key: "time")

  super

  bunny_options = {}
  bunny_options[:host] = @host if @host
  bunny_options[:hosts] = @hosts if @hosts
  bunny_options[:port] = @port if @port
  bunny_options[:user] = @user if @user
  bunny_options[:pass] = @pass if @pass
  bunny_options[:vhost] = @vhost if @vhost
  bunny_options[:connection_timeout] = @connection_timeout if @connection_timeout
  bunny_options[:continuation_timeout] = @continuation_timeout if @continuation_timeout
  bunny_options[:automatically_recover] = @automatically_recover if @automatically_recover
  bunny_options[:network_recovery_interval] = @network_recovery_interval if @network_recovery_interval
  bunny_options[:recovery_attempts] = @recovery_attempts
  bunny_options[:auth_mechanism] = @auth_mechanism if @auth_mechanism
  bunny_options[:heartbeat] = @heartbeat if @heartbeat
  bunny_options[:frame_max] = @frame_max if @frame_max

  bunny_options[:tls] = @tls
  bunny_options[:tls_cert] = @tls_cert if @tls_cert
  bunny_options[:tls_key] = @tls_key if @tls_key
  bunny_options[:tls_ca_certificates] = @tls_ca_certificates if @tls_ca_certificates
  bunny_options[:verify_peer] = @verify_peer

  @bunny = Bunny.new(bunny_options)

  @publish_options = {}
  @publish_options[:content_type] = @content_type if @content_type
  @publish_options[:content_encoding] = @content_encoding if @content_encoding
  @publish_options[:persistent] = @persistent if @persistent
  @publish_options[:mandatory] = @mandatory if @mandatory
  @publish_options[:expiration] = @expiration if @expiration
  @publish_options[:message_type] = @message_type if @message_type
  @publish_options[:priority] = @priority if @priority
  @publish_options[:app_id] = @app_id if @app_id

  @formatter = formatter_create(default_type: @type)
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_rabbitmq.rb, line 115
def multi_workers_ready?
  true
end
prefer_buffered_processing() click to toggle source
# File lib/fluent/plugin/out_rabbitmq.rb, line 119
def prefer_buffered_processing
  false
end
process(tag, es) click to toggle source
# File lib/fluent/plugin/out_rabbitmq.rb, line 152
def process(tag, es)
  es.each do |time, record|
    set_publish_options(tag, time, record)
    record = inject_values_to_record(tag, time, record)
    buf = @formatter.format(tag, time, record)
    @bunny_exchange.publish(buf, @publish_options)
  end
end
set_publish_options(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_rabbitmq.rb, line 139
def set_publish_options(tag, time, record)
  @publish_options[:timestamp] = time.to_i if @timestamp

  if @exchange_type != "fanout"
    @publish_options[:routing_key] = @routing_key ? @routing_key : tag
  end

  if @id_key
    id = record[@id_key]
    @publish_options[:message_id] = id if id
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_rabbitmq.rb, line 134
def shutdown
  @bunny.close
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_rabbitmq.rb, line 123
def start
  super
  @bunny.start
  @channel = @bunny.create_channel
  exchange_options = {
    durable: @exchange_durable,
    auto_delete: @exchange_auto_delete
  }
  @bunny_exchange = Bunny::Exchange.new(@channel, @exchange_type, @exchange, exchange_options)
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_rabbitmq.rb, line 161
def write(chunk)
  tag = chunk.metadata.tag

  chunk.each do |time, record|
    set_publish_options(tag, time, record)
    record = inject_values_to_record(tag, time, record)
    buf = @formatter.format(tag, time, record)
    @bunny_exchange.publish(buf, @publish_options)
  end
end