class Fluent::Plugin::RabbitMQInput

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_rabbitmq.rb, line 75
def initialize
  super
  require "bunny"
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_rabbitmq.rb, line 80
def configure(conf)
  compat_parameters_convert(conf, :parser)

  super

  bunny_options = {}
  bunny_options[:host] = @host if @host
  bunny_options[:hosts] = @hosts if @hosts
  bunny_options[:port] = @port if @port
  bunny_options[:user] = @user if @user
  bunny_options[:pass] = @pass if @pass
  bunny_options[:vhost] = @vhost if @vhost
  bunny_options[:connection_timeout] = @connection_timeout if @connection_timeout
  bunny_options[:continuation_timeout] = @continuation_timeout if @continuation_timeout
  bunny_options[:automatically_recover] = @automatically_recover if @automatically_recover
  bunny_options[:network_recovery_interval] = @network_recovery_interval if @network_recovery_interval
  bunny_options[:recovery_attempts] = @recovery_attempts
  bunny_options[:auth_mechanism] = @auth_mechanism if @auth_mechanism
  bunny_options[:heartbeat] = @heartbeat if @heartbeat

  bunny_options[:tls] = @tls
  bunny_options[:tls_cert] = @tls_cert if @tls_cert
  bunny_options[:tls_key] = @tls_key if @tls_key
  bunny_options[:tls_ca_certificates] = @tls_ca_certificates if @tls_ca_certificates
  bunny_options[:verify_peer] = @verify_peer

  @parser = parser_create

  @routing_key ||= @tag
  @bunny = Bunny.new(bunny_options)
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/in_rabbitmq.rb, line 156
def multi_workers_ready?
  true
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_rabbitmq.rb, line 160
def shutdown
  @bunny.close
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_rabbitmq.rb, line 112
def start
  super
  @bunny.start
  channel = @bunny.create_channel(nil, @consumer_pool_size)
  channel.prefetch(@prefetch_count) if @prefetch_count
  if @create_exchange
    exchange_options = {
        durable: @exchange_durable,
        auto_delete: @auto_delete
    }
    @bunny_exchange = Bunny::Exchange.new(channel, @exchange_type, @exchange, exchange_options)
    if @exchange_to_bind
      @bunny_exchange.bind(@exchange_to_bind, routing_key: @exchange_routing_key)
    end
  end
  queue_arguments = {"x-message-ttl" => @ttl} if @ttl
  queue = channel.queue(
    @queue,
    durable: @durable,
    exclusive: @exclusive,
    auto_delete: @auto_delete,
    arguments: queue_arguments
  )
  if @exchange
    queue.bind(@exchange, routing_key: @routing_key)
  end
  queue.subscribe do |delivery_info, properties, payload|
    @parser.parse(payload) do |time, record|
      time = if properties[:timestamp]
                Fluent::EventTime.from_time(properties[:timestamp])
             else
                time
             end
      if @include_headers
        record[@headers_key] = properties.headers
      end
      if @include_delivery_info
        record[@delivery_info_key] = delivery_info
      end
      router.emit(@tag, time, record)
    end
  end
end