class LogStash::Inputs::ElasticJdbc
This plugin is a simple extension of Elasticsearch input plugin. We added tracking_column property for search in elasticsearch query all hits that contains the 'last_update' value bigger that the value_tracker. The value_tracker contains the last consult to that index stored in a last run file created. We build the query based on the above described. This is a sample of elastic_jdbc plugin statement: input {
# Read all documents from Elasticsearch matching the given query elastic_jdbc { hosts => "localhost" tracking_column => "last_update" last_run_metadata_path => "/opt/logstash/last_run/index_name" } }
Public Instance Methods
build_query()
click to toggle source
# File lib/logstash/inputs/elastic_jdbc.rb, line 60 def build_query input_query = @base_query # Remove sort tag from base query. We only sort by tracking column input_query.delete("sort") time_now = Time.now.utc last_value = @value_tracker ? Time.parse(@value_tracker.value.to_s).iso8601 : Time.parse(time_now).iso8601 column = @tracking_column.to_s query_default = {query: { bool: { must: [ {range: {column => {gt: last_value.to_s}}} ]}}} if !input_query.nil? and !input_query.empty? query_conditions = input_query["query"] if query_conditions must_statement = query_default[:query][:bool][:must] final_must_cond = must_statement.append(query_conditions) query_default[:query][:bool][:must] = final_must_cond end end sort_condition = [{column => {order: "asc"}}] query_default[:sort] = sort_condition @base_query = LogStash::Json.load(query_default.to_json) end
do_run_slice(output_queue, slice_id=nil)
click to toggle source
# File lib/logstash/inputs/elastic_jdbc.rb, line 85 def do_run_slice(output_queue, slice_id=nil) slice_query = @base_query slice_query = slice_query.merge('slice' => { 'id' => slice_id, 'max' => @slices}) unless slice_id.nil? slice_options = @options.merge(:body => LogStash::Json.dump(slice_query) ) logger.info("Slice starting", slice_id: slice_id, slices: @slices) unless slice_id.nil? r = search_request(slice_options) r['hits']['hits'].each { |hit| push_hit(hit, output_queue) } logger.debug("Slice progress", slice_id: slice_id, slices: @slices) unless slice_id.nil? has_hits = r['hits']['hits'].any? while has_hits && r['_scroll_id'] && !stop? r = process_next_scroll(output_queue, r['_scroll_id']) logger.debug("Slice progress", slice_id: slice_id, slices: @slices) unless slice_id.nil? has_hits = r['has_hits'] end logger.info("Slice complete", slice_id: slice_id, slices: @slices) unless slice_id.nil? end
push_hit(hit, output_queue)
click to toggle source
# File lib/logstash/inputs/elastic_jdbc.rb, line 106 def push_hit(hit, output_queue) event = LogStash::Event.new(hit['_source']) if @docinfo # do not assume event[@docinfo_target] to be in-place updatable. first get it, update it, then at the end set it in the event. docinfo_target = event.get(@docinfo_target) || {} unless docinfo_target.is_a?(Hash) @logger.error("Elasticsearch Input: Incompatible Event, incompatible type for the docinfo_target=#{@docinfo_target} field in the `_source` document, expected a hash got:", :docinfo_target_type => docinfo_target.class, :event => event) # TODO: (colin) I am not sure raising is a good strategy here? raise Exception.new("Elasticsearch input: incompatible event") end @docinfo_fields.each do |field| docinfo_target[field] = hit[field] end event.set(@docinfo_target, docinfo_target) end decorate(event) output_queue << event # Write in the file the last_update value register in the event. @value_tracker.set_value(event.get(@tracking_column)) @value_tracker.write end
register()
click to toggle source
endregion
Calls superclass method
# File lib/logstash/inputs/elastic_jdbc.rb, line 47 def register if @tracking_column.nil? raise(LogStash::ConfigurationError, "Must set :tracking_column if :use_column_value is true.") end @value_tracker = ValueTracking.build_last_value_tracker(self) super build_query end
run(output_queue)
click to toggle source
Calls superclass method
# File lib/logstash/inputs/elastic_jdbc.rb, line 81 def run(output_queue) super end
set_value_tracker(instance)
click to toggle source
# File lib/logstash/inputs/elastic_jdbc.rb, line 56 def set_value_tracker(instance) @value_tracker = instance end
stop()
click to toggle source
Calls superclass method
# File lib/logstash/inputs/elastic_jdbc.rb, line 134 def stop super end