class Fluent::Plugin::RabbitMQInput
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_rabbitmq.rb, line 75 def initialize super require "bunny" end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_rabbitmq.rb, line 80 def configure(conf) compat_parameters_convert(conf, :parser) super bunny_options = {} bunny_options[:host] = @host if @host bunny_options[:hosts] = @hosts if @hosts bunny_options[:port] = @port if @port bunny_options[:user] = @user if @user bunny_options[:pass] = @pass if @pass bunny_options[:vhost] = @vhost if @vhost bunny_options[:connection_timeout] = @connection_timeout if @connection_timeout bunny_options[:continuation_timeout] = @continuation_timeout if @continuation_timeout bunny_options[:automatically_recover] = @automatically_recover if @automatically_recover bunny_options[:network_recovery_interval] = @network_recovery_interval if @network_recovery_interval bunny_options[:recovery_attempts] = @recovery_attempts bunny_options[:auth_mechanism] = @auth_mechanism if @auth_mechanism bunny_options[:heartbeat] = @heartbeat if @heartbeat bunny_options[:tls] = @tls bunny_options[:tls_cert] = @tls_cert if @tls_cert bunny_options[:tls_key] = @tls_key if @tls_key bunny_options[:tls_ca_certificates] = @tls_ca_certificates if @tls_ca_certificates bunny_options[:verify_peer] = @verify_peer @parser = parser_create @routing_key ||= @tag @bunny = Bunny.new(bunny_options) end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/in_rabbitmq.rb, line 156 def multi_workers_ready? true end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_rabbitmq.rb, line 160 def shutdown @bunny.close super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_rabbitmq.rb, line 112 def start super @bunny.start channel = @bunny.create_channel(nil, @consumer_pool_size) channel.prefetch(@prefetch_count) if @prefetch_count if @create_exchange exchange_options = { durable: @exchange_durable, auto_delete: @auto_delete } @bunny_exchange = Bunny::Exchange.new(channel, @exchange_type, @exchange, exchange_options) if @exchange_to_bind @bunny_exchange.bind(@exchange_to_bind, routing_key: @exchange_routing_key) end end queue_arguments = {"x-message-ttl" => @ttl} if @ttl queue = channel.queue( @queue, durable: @durable, exclusive: @exclusive, auto_delete: @auto_delete, arguments: queue_arguments ) if @exchange queue.bind(@exchange, routing_key: @routing_key) end queue.subscribe do |delivery_info, properties, payload| @parser.parse(payload) do |time, record| time = if properties[:timestamp] Fluent::EventTime.from_time(properties[:timestamp]) else time end if @include_headers record[@headers_key] = properties.headers end if @include_delivery_info record[@delivery_info_key] = delivery_info end router.emit(@tag, time, record) end end end