class Embulk::Input::Elasticsearch

Constants

ADD_QUERY_TO_RECORD_KEY

Public Class Methods

resume(task, columns, count) { |task, columns, count| ... } click to toggle source
# File lib/embulk/input/elasticsearch.rb, line 45
def self.resume(task, columns, count, &control)
  task_reports = yield(task, columns, count)

  next_config_diff = {}
  return next_config_diff
end
transaction(config, &control) click to toggle source
# File lib/embulk/input/elasticsearch.rb, line 13
def self.transaction(config, &control)
  task = {
    "nodes" => config.param("nodes", :array),
    "request_timeout" => config.param("request_timeout", :integer, default: 60),
    "index" => config.param("index", :string),
    "reload_connections" => config.param("reload_connections", :bool, default: true),
    "reload_on_failure" => config.param("reload_on_failure", :bool, default: false),
    "index_type" => config.param("index_type", :string, default: nil),
    "retry_on_failure" => config.param("retry_on_failure", :integer, default: 5),
    "per_size" => config.param("per_size", :integer, default: 1000),
    "limit_size" => config.param("limit_size", :integer, default: nil),
    "fields" => config.param("fields", :array, default: nil),
    "queries" => config.param("queries", :array),
    "sort" => config.param("sort", :hash, default: nil),
    "add_query_to_record" => config.param("add_query_to_record", :bool, default: false),
    "scroll" => config.param("scroll", :string, default: '1m')
  }
  # TODO: want max_threads
  define_num_threads = config.param("num_threads", :integer, default: 1)
  task['slice_queries'] = InputThread.get_slice_from_num_threads(task['queries'], define_num_threads)

  columns = []
  task['fields'].each_with_index{ |field, i|
    columns << Column.new(i, field['name'], field['type'].to_sym)
  }
  if task['add_query_to_record']
    columns << Column.new(task['fields'].size, ADD_QUERY_TO_RECORD_KEY, :string)
  end

  resume(task, columns, task['slice_queries'].size, &control)
end

Public Instance Methods

init() click to toggle source
# File lib/embulk/input/elasticsearch.rb, line 52
def init
  @queries = task['slice_queries'][@index]
  Embulk.logger.info("this thread queries => #{@queries}")
  @add_query_to_record = task['add_query_to_record']
  @connection = Connection.new(task)
end
run() click to toggle source
# File lib/embulk/input/elasticsearch.rb, line 59
def run
  @queries.each do |query|
    @connection.search_with_query(query) { |result|
      if @add_query_to_record
        result << query
      end
      page_builder.add(result)
    }
  end
  page_builder.finish

  task_report = {}
  return task_report
end