class LogStash::Inputs::Elasticsearch::SearchAfter
Constants
- PIT_JOB
- SEARCH_AFTER_JOB
Public Instance Methods
clear(pit_id)
click to toggle source
# File lib/logstash/inputs/elasticsearch/paginated_search.rb, line 221 def clear(pit_id) logger.info("Closing point in time (PIT)") @client.close_point_in_time(:body => {:id => pit_id} ) if pit?(pit_id) rescue => e logger.debug("Ignoring close_point_in_time exception", message: e.message, exception: e.class) end
create_pit()
click to toggle source
# File lib/logstash/inputs/elasticsearch/paginated_search.rb, line 129 def create_pit logger.info("Create point in time (PIT)") r = @client.open_point_in_time(index: @index, keep_alive: @scroll) r['id'] end
next_page(pit_id: , search_after: nil, slice_id: nil)
click to toggle source
# File lib/logstash/inputs/elasticsearch/paginated_search.rb, line 157 def next_page(pit_id: , search_after: nil, slice_id: nil) options = search_options(pit_id: pit_id, search_after: search_after, slice_id: slice_id) logger.trace("search options", options) @client.search(options) end
pit?(id)
click to toggle source
# File lib/logstash/inputs/elasticsearch/paginated_search.rb, line 125 def pit?(id) !!id&.is_a?(String) end
process_page(output_queue) { || ... }
click to toggle source
# File lib/logstash/inputs/elasticsearch/paginated_search.rb, line 163 def process_page(output_queue) r = yield r['hits']['hits'].each { |hit| @plugin.push_hit(hit, output_queue) } has_hits = r['hits']['hits'].any? search_after = r['hits']['hits'][-1]['sort'] rescue nil logger.warn("Query got data but the sort value is empty") if has_hits && search_after.nil? [ has_hits, search_after ] end
retryable_search(output_queue)
click to toggle source
# File lib/logstash/inputs/elasticsearch/paginated_search.rb, line 198 def retryable_search(output_queue) with_pit do |pit_id| retryable(SEARCH_AFTER_JOB) do search(output_queue: output_queue, pit_id: pit_id) end end end
retryable_slice_search(output_queue)
click to toggle source
# File lib/logstash/inputs/elasticsearch/paginated_search.rb, line 206 def retryable_slice_search(output_queue) with_pit do |pit_id| @slices.times.map do |slice_id| Thread.new do LogStash::Util::set_thread_name("[#{@pipeline_id}]|input|elasticsearch|slice_#{slice_id}") retryable(SEARCH_AFTER_JOB) do search(output_queue: output_queue, slice_id: slice_id, pit_id: pit_id) end end end.map(&:join) end logger.trace("#{@slices} slices completed") end
search(output_queue:, slice_id: nil, pit_id:)
click to toggle source
# File lib/logstash/inputs/elasticsearch/paginated_search.rb, line 180 def search(output_queue:, slice_id: nil, pit_id:) log_details = {} log_details = log_details.merge({ slice_id: slice_id, slices: @slices }) unless slice_id.nil? logger.info("Query start", log_details) has_hits = true search_after = nil while has_hits && !@plugin.stop? logger.debug("Query progress", log_details) has_hits, search_after = process_page(output_queue) do next_page(pit_id: pit_id, search_after: search_after, slice_id: slice_id) end end logger.info("Query completed", log_details) end
search_options(pit_id: , search_after: nil, slice_id: nil)
click to toggle source
# File lib/logstash/inputs/elasticsearch/paginated_search.rb, line 135 def search_options(pit_id: , search_after: nil, slice_id: nil) body = @query.merge({ :pit => { :id => pit_id, :keep_alive => @scroll } }) # search_after requires at least a sort field explicitly # we add default sort "_shard_doc": "asc" if the query doesn't have any sort field # by default, ES adds the same implicitly on top of the provided "sort" # https://www.elastic.co/guide/en/elasticsearch/reference/8.10/paginate-search-results.html#CO201-2 body = body.merge(:sort => {"_shard_doc": "asc"}) if @query&.dig("sort").nil? body = body.merge(:search_after => search_after) unless search_after.nil? body = body.merge(:slice => {:id => slice_id, :max => @slices}) unless slice_id.nil? { :size => @size, :body => body } end
with_pit() { |pit_id| ... }
click to toggle source
# File lib/logstash/inputs/elasticsearch/paginated_search.rb, line 173 def with_pit pit_id = retryable(PIT_JOB) { create_pit } yield pit_id if pit?(pit_id) ensure clear(pit_id) end