class Fluent::Plugin::RabbitMQOutput
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_rabbitmq.rb, line 68 def initialize super require "bunny" end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_rabbitmq.rb, line 73 def configure(conf) compat_parameters_convert(conf, :inject, :formatter, default_chunk_key: "time") super bunny_options = {} bunny_options[:host] = @host if @host bunny_options[:hosts] = @hosts if @hosts bunny_options[:port] = @port if @port bunny_options[:user] = @user if @user bunny_options[:pass] = @pass if @pass bunny_options[:vhost] = @vhost if @vhost bunny_options[:connection_timeout] = @connection_timeout if @connection_timeout bunny_options[:continuation_timeout] = @continuation_timeout if @continuation_timeout bunny_options[:automatically_recover] = @automatically_recover if @automatically_recover bunny_options[:network_recovery_interval] = @network_recovery_interval if @network_recovery_interval bunny_options[:recovery_attempts] = @recovery_attempts bunny_options[:auth_mechanism] = @auth_mechanism if @auth_mechanism bunny_options[:heartbeat] = @heartbeat if @heartbeat bunny_options[:frame_max] = @frame_max if @frame_max bunny_options[:tls] = @tls bunny_options[:tls_cert] = @tls_cert if @tls_cert bunny_options[:tls_key] = @tls_key if @tls_key bunny_options[:tls_ca_certificates] = @tls_ca_certificates if @tls_ca_certificates bunny_options[:verify_peer] = @verify_peer @bunny = Bunny.new(bunny_options) @publish_options = {} @publish_options[:content_type] = @content_type if @content_type @publish_options[:content_encoding] = @content_encoding if @content_encoding @publish_options[:persistent] = @persistent if @persistent @publish_options[:mandatory] = @mandatory if @mandatory @publish_options[:expiration] = @expiration if @expiration @publish_options[:message_type] = @message_type if @message_type @publish_options[:priority] = @priority if @priority @publish_options[:app_id] = @app_id if @app_id @formatter = formatter_create(default_type: @type) end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_rabbitmq.rb, line 115 def multi_workers_ready? true end
prefer_buffered_processing()
click to toggle source
# File lib/fluent/plugin/out_rabbitmq.rb, line 119 def prefer_buffered_processing false end
process(tag, es)
click to toggle source
# File lib/fluent/plugin/out_rabbitmq.rb, line 152 def process(tag, es) es.each do |time, record| set_publish_options(tag, time, record) record = inject_values_to_record(tag, time, record) buf = @formatter.format(tag, time, record) @bunny_exchange.publish(buf, @publish_options) end end
set_publish_options(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_rabbitmq.rb, line 139 def set_publish_options(tag, time, record) @publish_options[:timestamp] = time.to_i if @timestamp if @exchange_type != "fanout" @publish_options[:routing_key] = @routing_key ? @routing_key : tag end if @id_key id = record[@id_key] @publish_options[:message_id] = id if id end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_rabbitmq.rb, line 134 def shutdown @bunny.close super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_rabbitmq.rb, line 123 def start super @bunny.start @channel = @bunny.create_channel exchange_options = { durable: @exchange_durable, auto_delete: @exchange_auto_delete } @bunny_exchange = Bunny::Exchange.new(@channel, @exchange_type, @exchange, exchange_options) end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_rabbitmq.rb, line 161 def write(chunk) tag = chunk.metadata.tag chunk.each do |time, record| set_publish_options(tag, time, record) record = inject_values_to_record(tag, time, record) buf = @formatter.format(tag, time, record) @bunny_exchange.publish(buf, @publish_options) end end