class Embulk::Output::Elasticsearch

Constants

ENABLE_MODE

Public Class Methods

cleanup(task, schema, count, task_reports) click to toggle source
# File lib/embulk/output/elasticsearch_ruby.rb, line 66
def self.cleanup(task, schema, count, task_reports)
  if task['mode'] == 'replace'
    connection = Connection.new(task)
    connection.create_aliases
    connection.delete_aliases
  end
end
transaction(config, schema, count) { |task| ... } click to toggle source
# File lib/embulk/output/elasticsearch_ruby.rb, line 10
def self.transaction(config, schema, count, &control)
  task = {
    "nodes" => config.param("nodes", :array, default: [{ 'host' => 'localhost', 'port' => 9200 }]),
    "request_timeout" => config.param("request_timeout", :integer, default: 60),
    "index_type" => config.param("index_type", :string),
    "mode" => config.param("mode", :string, default: 'normal'),
    "reload_connections" => config.param("reload_connections", :bool, default: true),
    "reload_on_failure" => config.param("reload_on_failure", :bool, default: false),
    "delete_old_index" => config.param("delete_old_index", :bool, default: false),
    "delete_old_alias" => config.param("delete_old_alias", :bool, default: true),
    "id_keys" => config.param("id_keys", :array, default: nil),
    "id_format" => config.param("id_format", :string, default: nil),
    "array_columns" => config.param("array_columns", :array, default: nil),
    "bulk_actions" => config.param("bulk_actions", :integer, default: 1000),
    "retry_on_failure" => config.param("retry_on_failure", :integer, default: 5),
  }

  unless ENABLE_MODE.include?(task['mode'])
    raise ConfigError.new "`mode` must be one of #{ENABLE_MODE.join(', ')}"
  end
  Embulk.logger.info("mode => #{task['mode']}")

  current_index_name = config.param("current_index_name", :string, default: nil)
  index = config.param("index", :string, default: 'logstash-%Y.%m.%d')
  if task['mode'] == 'replace'
    task['alias'] = index
    task['index'] = if current_index_name
      current_index_name
    else
      "#{index}-#{task['index_type']}-#{Time.now.strftime('%Y.%m.%d.%H.%M.%S')}"
    end
  else
    task['index'] = Time.now.strftime(index)
  end
  Embulk.logger.info("nodes => #{task['nodes']}")
  Embulk.logger.info("index => #{task['index']}")
  Embulk.logger.info("index_type => #{task['index_type']}")
  Embulk.logger.info("alias => #{task['alias']}")

  connection = Connection.new(task)
  before_delete_index = config.param("before_delete_index", :bool, default: false)
  if before_delete_index
    connection.delete_index(task['index'])
  end

  before_template_name = config.param("before_template_name", :string, default: nil)
  before_template = config.param("before_template", :hash, default: nil)
  if before_template_name && before_template
    connection.put_template(before_template_name, before_template)
  end

  task_reports = yield(task)
  next_config_diff = {}
  return next_config_diff
end

Public Instance Methods

abort() click to toggle source
# File lib/embulk/output/elasticsearch_ruby.rb, line 114
def abort
end
add(page) click to toggle source
# File lib/embulk/output/elasticsearch_ruby.rb, line 90
def add(page)
  page.each do |record|
    hash = Hash[schema.names.zip(record)]
    meta = @connection.generate_meta(hash)
    source = @connection.generate_source(hash)

    Embulk.logger.debug("meta => #{meta}")
    Embulk.logger.debug("source => #{source}")

    @bulk_message << meta
    @bulk_message << source
    if @bulk_actions * 2 <= @bulk_message.size
      @connection.send(@bulk_message)
      @bulk_message.clear
    end
  end
end
close() click to toggle source
# File lib/embulk/output/elasticsearch_ruby.rb, line 87
def close
end
commit() click to toggle source
# File lib/embulk/output/elasticsearch_ruby.rb, line 117
def commit
  task_report = {}
  return task_report
end
finish() click to toggle source
# File lib/embulk/output/elasticsearch_ruby.rb, line 108
def finish
  if @bulk_message.size > 0
    @connection.send(@bulk_message)
  end
end
init() click to toggle source

def self.resume(task, schema, count, &control)

task_reports = yield(task)

next_config_diff = {}
return next_config_diff

end

# File lib/embulk/output/elasticsearch_ruby.rb, line 81
def init
  @connection = Connection.new(task)
  @bulk_actions = task["bulk_actions"]
  @bulk_message = []
end