class Logcli::Actions::Elasticsearch

Constants

BULK_SIZE
INDEX_NAME

Attributes

buffer[R]
elasticsearch_url[R]
filenames[R]
id[R]
mapping_file[R]
total_records[R]

Public Class Methods

new(opts) click to toggle source
# File lib/logcli/actions/elasticsearch.rb, line 11
def initialize opts
  @filenames = opts.fetch(:filenames)
  @mapping_file = opts.fetch(:mapping_file)
  @elasticsearch_url = opts.fetch(:elasticsearch_url)
  @buffer = []
  @id = 1
  @total_records = 0
end

Public Instance Methods

call() click to toggle source
# File lib/logcli/actions/elasticsearch.rb, line 20
def call
  create_mapping if mapping_file

  filenames.each do |filename|
    File.open(filename).each do |line|
      @buffer << line
      if @buffer.count > BULK_SIZE
        flush
      end
    end
  end
  if @buffer.count > 0
    flush
  end
end

Private Instance Methods

client() click to toggle source
# File lib/logcli/actions/elasticsearch.rb, line 58
def client
  @client ||= ::Elasticsearch::Client.new url: elasticsearch_url, log: true
end
create_mapping() click to toggle source
# File lib/logcli/actions/elasticsearch.rb, line 53
def create_mapping
  payload = JSON.parse(File.read(mapping_file))
  client.indices.create index: INDEX_NAME, body: payload
end
flush() click to toggle source
# File lib/logcli/actions/elasticsearch.rb, line 38
def flush
  payload = @buffer.flat_map do |item|
    hash = JSON.parse(item)
    @id += 1
    @total_records += 1
    [
        { index: { _index: INDEX_NAME, _id: @id } },
        hash
    ]
  end
  client.bulk body: payload
  puts "records uploaded #{@total_records}"
  @buffer = []
end