class LogStash::Inputs::Elasticsearch::SearchAfter

Constants

PIT_JOB
SEARCH_AFTER_JOB

Public Instance Methods

clear(pit_id) click to toggle source
# File lib/logstash/inputs/elasticsearch/paginated_search.rb, line 221
def clear(pit_id)
  logger.info("Closing point in time (PIT)")
  @client.close_point_in_time(:body => {:id => pit_id} ) if pit?(pit_id)
rescue => e
  logger.debug("Ignoring close_point_in_time exception", message: e.message, exception: e.class)
end
create_pit() click to toggle source
# File lib/logstash/inputs/elasticsearch/paginated_search.rb, line 129
def create_pit
  logger.info("Create point in time (PIT)")
  r = @client.open_point_in_time(index: @index, keep_alive: @scroll)
  r['id']
end
next_page(pit_id: , search_after: nil, slice_id: nil) click to toggle source
# File lib/logstash/inputs/elasticsearch/paginated_search.rb, line 157
def next_page(pit_id: , search_after: nil, slice_id: nil)
  options = search_options(pit_id: pit_id, search_after: search_after, slice_id: slice_id)
  logger.trace("search options", options)
  @client.search(options)
end
pit?(id) click to toggle source
# File lib/logstash/inputs/elasticsearch/paginated_search.rb, line 125
def pit?(id)
  !!id&.is_a?(String)
end
process_page(output_queue) { || ... } click to toggle source
# File lib/logstash/inputs/elasticsearch/paginated_search.rb, line 163
def process_page(output_queue)
  r = yield
  r['hits']['hits'].each { |hit| @plugin.push_hit(hit, output_queue) }

  has_hits = r['hits']['hits'].any?
  search_after = r['hits']['hits'][-1]['sort'] rescue nil
  logger.warn("Query got data but the sort value is empty") if has_hits && search_after.nil?
  [ has_hits, search_after ]
end
search_options(pit_id: , search_after: nil, slice_id: nil) click to toggle source
# File lib/logstash/inputs/elasticsearch/paginated_search.rb, line 135
def search_options(pit_id: , search_after: nil, slice_id: nil)
  body = @query.merge({
                        :pit => {
                          :id => pit_id,
                          :keep_alive => @scroll
                        }
                      })

  # search_after requires at least a sort field explicitly
  # we add default sort "_shard_doc": "asc" if the query doesn't have any sort field
  # by default, ES adds the same implicitly on top of the provided "sort"
  # https://www.elastic.co/guide/en/elasticsearch/reference/8.10/paginate-search-results.html#CO201-2
  body = body.merge(:sort => {"_shard_doc": "asc"}) if @query&.dig("sort").nil?

  body = body.merge(:search_after => search_after) unless search_after.nil?
  body = body.merge(:slice => {:id => slice_id, :max => @slices}) unless slice_id.nil?
  {
    :size => @size,
    :body => body
  }
end
with_pit() { |pit_id| ... } click to toggle source
# File lib/logstash/inputs/elasticsearch/paginated_search.rb, line 173
def with_pit
  pit_id = retryable(PIT_JOB) { create_pit }
  yield pit_id if pit?(pit_id)
ensure
  clear(pit_id)
end