class Anschel::Output::Elasticsearch

Public Class Methods

new(config, stats, log) click to toggle source
# File lib/anschel/output/elasticsearch.rb, line 13
def initialize config, stats, log
  default_index = config.delete(:default_index) || '.anschel'
  qsize   = config.delete(:queue_size) || 2000
  bsize   = config.delete(:bulk_size) || 500
  timeout = config.delete(:bulk_timeout) || 2.0
  slice   = timeout / bsize
  client  = ::Elasticsearch::Client.new config
  client.transport.reload_connections!

  @queue = SizedQueue.new qsize

  @thread = Thread.new do
    loop do
      events = []
      count  = 0
      start  = Time.now.to_f
      until (Time.now.to_f - start > timeout) || (count > bsize)
        begin
          events.push @queue.shift(true)
          count += 1
        rescue # shift returned immediately
          sleep slice
        end
      end

      next if events.empty?

      body = events.map do |e|
        id = e.delete(:@id)
        type = e.delete(:@type) || e[:type]
        index = e.delete(:@index)
        routing = e.delete(:@routing)

        if index.nil?
          log_event = {
            event: 'elasticsearch-output-error',
            reason: 'event was not indexed',
            remediation: "sending to default index '#{default_index}'"
          }
          log_event[:raw_event] = event if log.debug?
          log.error log_event
          index = default_index
        end

        item = { _index: index, _type: type, data: e }
        item[:_routing] = routing if routing
        item[:_id] = id if id
        { index: item }
      end

      response = client.bulk body: body

      if response['errors']
        log.error \
          event: 'elasticsearch-output-error',
          reason: 'response contained errors',
          body_size: body.size,
          response_size: response['items'].size
      end

      stats.inc 'output', body.size
    end
  end
end