class Anschel::Output::Elasticsearch
Public Class Methods
new(config, stats, log)
click to toggle source
# File lib/anschel/output/elasticsearch.rb, line 13 def initialize config, stats, log default_index = config.delete(:default_index) || '.anschel' qsize = config.delete(:queue_size) || 2000 bsize = config.delete(:bulk_size) || 500 timeout = config.delete(:bulk_timeout) || 2.0 slice = timeout / bsize client = ::Elasticsearch::Client.new config client.transport.reload_connections! @queue = SizedQueue.new qsize @thread = Thread.new do loop do events = [] count = 0 start = Time.now.to_f until (Time.now.to_f - start > timeout) || (count > bsize) begin events.push @queue.shift(true) count += 1 rescue # shift returned immediately sleep slice end end next if events.empty? body = events.map do |e| id = e.delete(:@id) type = e.delete(:@type) || e[:type] index = e.delete(:@index) routing = e.delete(:@routing) if index.nil? log_event = { event: 'elasticsearch-output-error', reason: 'event was not indexed', remediation: "sending to default index '#{default_index}'" } log_event[:raw_event] = event if log.debug? log.error log_event index = default_index end item = { _index: index, _type: type, data: e } item[:_routing] = routing if routing item[:_id] = id if id { index: item } end response = client.bulk body: body if response['errors'] log.error \ event: 'elasticsearch-output-error', reason: 'response contained errors', body_size: body.size, response_size: response['items'].size end stats.inc 'output', body.size end end end