class Fluent::ElasticsearchClusterOutput

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_elasticsearch_cluster.rb, line 25
def initialize
  super
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_elasticsearch_cluster.rb, line 29
def configure(conf)
  super
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_elasticsearch_cluster.rb, line 47
def format(tag, time, record)
  [tag, time, record].to_msgpack
end
get_hosts() click to toggle source
# File lib/fluent/plugin/out_elasticsearch_cluster.rb, line 39
def get_hosts
  if @hosts
      @hosts.split(',').map {|x| x.strip}.compact
   else
     ["#{@host}:#{@port}"]
   end
end
send(data) click to toggle source
# File lib/fluent/plugin/out_elasticsearch_cluster.rb, line 96
def send(data)
  @es.bulk body: data
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_elasticsearch_cluster.rb, line 51
def shutdown
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_elasticsearch_cluster.rb, line 33
def start
  super
  @es = Elasticsearch::Client.new :hosts => get_hosts, :reload_connections => true, :adapter => :patron, :retry_on_failure => 5
  raise "Can not reach Elasticsearch cluster (#{@host}:#{@port})!" unless @es.ping
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_elasticsearch_cluster.rb, line 55
def write(chunk)
  bulk_message = []

  chunk.msgpack_each do |tag, time, record|
    if @logstash_format
      record.merge!({"@timestamp" => Time.at(time).to_datetime.to_s})
      if @utc_index
        target_index = "#{@logstash_prefix}-#{Time.at(time).getutc.strftime("#{@logstash_dateformat}")}"
      else
        target_index = "#{@logstash_prefix}-#{Time.at(time).strftime("#{@logstash_dateformat}")}"
      end
    else
      target_index = @index_name
    end

    if @include_tag_key
      record.merge!(@tag_key => tag)
    end

    meta = { "index" => {"_index" => target_index, "_type" => type_name} }
    if @id_key && record[@id_key]
      meta['index']['_id'] = record[@id_key]
    end

    if @parent_key && record[@parent_key]
      meta['index']['_parent'] = record[@parent_key]
    end

    bulk_message << meta
    bulk_message << record

    if bulk_message.size >= @flush_size
      send(bulk_message)
      bulk_message.clear
    end
  end

  send(bulk_message) unless bulk_message.empty?
  bulk_message.clear
end