class LogStash::Filters::Elasticsearch
Attributes
Public Class Methods
@override to handle proxy => ” as if none was set @param value [Array<Object>] @param validator [nil,Array,Symbol] @return [Array(true,Object)]: if validation is a success, a tuple containing ‘true` and the coerced value @return [Array(false,String)]: if validation is a failure, a tuple containing `false` and the failure reason.
# File lib/logstash/filters/elasticsearch.rb, line 152 def self.validate_value(value, validator) return super unless validator == :uri_or_empty value = deep_replace(value) value = hash_or_array(value) return true, value.first if value.size == 1 && value.first.empty? return super(value, :uri) end
Public Instance Methods
# File lib/logstash/filters/elasticsearch.rb, line 185 def filter(event) matched = false begin params = { :index => event.sprintf(@index) } if @query_dsl query = LogStash::Json.load(event.sprintf(@query_dsl)) params[:body] = query else query = event.sprintf(@query) params[:q] = query params[:size] = result_size params[:sort] = @sort if @enable_sort end @logger.debug("Querying elasticsearch for lookup", :params => params) results = get_client.search(params) raise "Elasticsearch query error: #{results["_shards"]["failures"]}" if results["_shards"].include? "failures" event.set("[@metadata][total_hits]", extract_total_from_hits(results['hits'])) resultsHits = results["hits"]["hits"] if !resultsHits.nil? && !resultsHits.empty? matched = true @fields.each do |old_key, new_key| old_key_path = extract_path(old_key) set = resultsHits.map do |doc| extract_value(doc["_source"], old_key_path) end event.set(new_key, set.count > 1 ? set : set.first) end @docinfo_fields.each do |old_key, new_key| old_key_path = extract_path(old_key) set = resultsHits.map do |doc| extract_value(doc, old_key_path) end event.set(new_key, set.count > 1 ? set : set.first) end end resultsAggs = results["aggregations"] if !resultsAggs.nil? && !resultsAggs.empty? matched = true @aggregation_fields.each do |agg_name, ls_field| event.set(ls_field, resultsAggs[agg_name]) end end rescue => e if @logger.trace? @logger.warn("Failed to query elasticsearch for previous event", :index => @index, :query => query, :event => event.to_hash, :error => e.message, :backtrace => e.backtrace) elsif @logger.debug? @logger.warn("Failed to query elasticsearch for previous event", :index => @index, :error => e.message, :backtrace => e.backtrace) else @logger.warn("Failed to query elasticsearch for previous event", :index => @index, :error => e.message) end @tag_on_failure.each{|tag| event.tag(tag)} else filter_matched(event) if matched end end
public only to be reuse in testing
# File lib/logstash/filters/elasticsearch.rb, line 249 def prepare_user_agent os_name = java.lang.System.getProperty('os.name') os_version = java.lang.System.getProperty('os.version') os_arch = java.lang.System.getProperty('os.arch') jvm_vendor = java.lang.System.getProperty('java.vendor') jvm_version = java.lang.System.getProperty('java.version') plugin_version = Gem.loaded_specs['logstash-filter-elasticsearch'].version # example: logstash/7.14.1 (OS=Linux-5.4.0-84-generic-amd64; JVM=AdoptOpenJDK-11.0.11) logstash-output-elasticsearch/11.0.1 "logstash/#{LOGSTASH_VERSION} (OS=#{os_name}-#{os_version}-#{os_arch}; JVM=#{jvm_vendor}-#{jvm_version}) logstash-#{@plugin_type}-#{config_name}/#{plugin_version}" end
# File lib/logstash/filters/elasticsearch.rb, line 163 def register #Load query if it exists if @query_template if File.zero?(@query_template) raise "template is empty" end file = File.open(@query_template, 'r') @query_dsl = file.read end validate_query_settings fill_hosts_from_cloud_id setup_ssl_params! validate_authentication fill_user_password_from_cloud_auth @hosts = Array(@hosts).map { |host| host.to_s } # potential SafeURI#to_s test_connection! setup_serverless end
Private Instance Methods
# File lib/logstash/filters/elasticsearch.rb, line 263 def client_options @client_options ||= { :user => @user, :password => @password, :api_key => @api_key, :proxy => @proxy, :ssl => client_ssl_options, :retry_on_failure => @retry_on_failure, :retry_on_status => @retry_on_status, :user_agent => prepare_user_agent } end
# File lib/logstash/filters/elasticsearch.rb, line 276 def client_ssl_options ssl_options = {} ssl_options[:enabled] = @ssl_enabled # If the deprecated `ssl` option was explicitly provided, it keeps the same behavior # setting up all the client SSL configs even if ssl => false. Otherwise, it should use # the @ssl_enabled value as it was either explicitly set by the `ssl_enabled` option or # inferred from the hosts scheme. return ssl_options unless @ssl_enabled || original_params.include?('ssl') ssl_options[:enabled] = true ssl_certificate_authorities, ssl_truststore_path, ssl_certificate, ssl_keystore_path = params.values_at('ssl_certificate_authorities', 'ssl_truststore_path', 'ssl_certificate', 'ssl_keystore_path') if ssl_certificate_authorities && ssl_truststore_path raise LogStash::ConfigurationError, 'Use either "ssl_certificate_authorities/ca_file" or "ssl_truststore_path" when configuring the CA certificate' end if ssl_certificate && ssl_keystore_path raise LogStash::ConfigurationError, 'Use either "ssl_certificate" or "ssl_keystore_path/keystore" when configuring client certificates' end if ssl_certificate_authorities&.any? raise LogStash::ConfigurationError, 'Multiple values on "ssl_certificate_authorities" are not supported by this plugin' if ssl_certificate_authorities.size > 1 ssl_options[:ca_file] = ssl_certificate_authorities.first end setup_client_ssl_store(ssl_options, 'truststore', ssl_truststore_path) setup_client_ssl_store(ssl_options, 'keystore', ssl_keystore_path) logger.debug("Keystore for client certificate", :keystore => ssl_keystore_path) if ssl_keystore_path ssl_key = params["ssl_key"] if ssl_certificate raise LogStash::ConfigurationError, 'Using an "ssl_certificate" requires an "ssl_key"' unless ssl_key ssl_options[:client_cert] = ssl_certificate ssl_options[:client_key] = ssl_key elsif !ssl_key.nil? raise LogStash::ConfigurationError, 'An "ssl_certificate" is required when using an "ssl_key"' end ssl_verification_mode = params["ssl_verification_mode"] unless ssl_verification_mode.nil? case ssl_verification_mode when 'none' logger.warn "You have enabled encryption but DISABLED certificate verification, " + "to make sure your data is secure set `ssl_verification_mode => full`" ssl_options[:verify] = :disable else # Manticore's :default maps to Apache HTTP Client's DefaultHostnameVerifier, # which is the modern STRICT verifier that replaces the deprecated StrictHostnameVerifier ssl_options[:verify] = :default end end ssl_options[:cipher_suites] = params["ssl_cipher_suites"] if params.include?("ssl_cipher_suites") protocols = params['ssl_supported_protocols'] ssl_options[:protocols] = protocols if protocols&.any? ssl_options[:trust_strategy] = trust_strategy_for_ca_trusted_fingerprint ssl_options end
# File lib/logstash/filters/elasticsearch.rb, line 524 def effectively_ssl? return true if @ssl_enabled hosts = Array(@hosts) return false if hosts.nil? || hosts.empty? hosts.all? { |host| host && host.to_s.start_with?("https") } end
get an array of path elements from a path reference
# File lib/logstash/filters/elasticsearch.rb, line 359 def extract_path(path_reference) return [path_reference] unless path_reference.start_with?('[') && path_reference.end_with?(']') path_reference[1...-1].split('][') end
Given a “hits” object from an Elasticsearch
response, return the total number of hits in the result set. @param hits [Hash{String=>Object}] @return [Integer]
# File lib/logstash/filters/elasticsearch.rb, line 380 def extract_total_from_hits(hits) total = hits['total'] # Elasticsearch 7.x produces an object containing `value` and `relation` in order # to enable unambiguous reporting when the total is only a lower bound; if we get # an object back, return its `value`. return total['value'] if total.kind_of?(Hash) total end
given a Hash and an array of path fragments, returns the value at the path @param source [Hash{String=>Object}] @param path [Array{String}] @return [Object]
# File lib/logstash/filters/elasticsearch.rb, line 369 def extract_value(source, path) path.reduce(source) do |memo, old_key_fragment| break unless memo.include?(old_key_fragment) memo[old_key_fragment] end end
# File lib/logstash/filters/elasticsearch.rb, line 427 def fill_hosts_from_cloud_id return unless @cloud_id if @hosts && !hosts_default?(@hosts) raise LogStash::ConfigurationError, 'Both cloud_id and hosts specified, please only use one of those.' end @hosts = parse_host_uri_from_cloud_id(@cloud_id) end
# File lib/logstash/filters/elasticsearch.rb, line 420 def fill_user_password_from_cloud_auth return unless @cloud_auth @user, @password = parse_user_password_from_cloud_auth(@cloud_auth) params['user'], params['password'] = @user, @password end
# File lib/logstash/filters/elasticsearch.rb, line 352 def get_client @shared_client || synchronize do @shared_client ||= new_client end end
# File lib/logstash/filters/elasticsearch.rb, line 391 def hosts_default?(hosts) hosts.is_a?(Array) && hosts.size == 1 && !original_params.key?('hosts') end
# File lib/logstash/filters/elasticsearch.rb, line 518 def infer_ssl_enabled_from_hosts return if original_params.include?('ssl') || original_params.include?('ssl_enabled') @ssl_enabled = params['ssl_enabled'] = effectively_ssl? end
# File lib/logstash/filters/elasticsearch.rb, line 346 def new_client # NOTE: could pass cloud-id/cloud-auth to client but than we would need to be stricter on ES version requirement # and also LS parsing might differ from ES client's parsing so for consistency we do not pass cloud options ... LogStash::Filters::ElasticsearchClient.new(@logger, @hosts, client_options) end
# File lib/logstash/filters/elasticsearch.rb, line 436 def parse_host_uri_from_cloud_id(cloud_id) require 'logstash/util/safe_uri' begin # might not be available on older LS require 'logstash/util/cloud_setting_id' rescue LoadError raise LogStash::ConfigurationError, 'The cloud_id setting is not supported by your version of Logstash, ' + 'please upgrade your installation (or set hosts instead).' end begin cloud_id = LogStash::Util::CloudSettingId.new(cloud_id) # already does append ':{port}' to host rescue ArgumentError => e raise LogStash::ConfigurationError, e.message.to_s.sub(/Cloud Id/i, 'cloud_id') end cloud_uri = "#{cloud_id.elasticsearch_scheme}://#{cloud_id.elasticsearch_host}" LogStash::Util::SafeURI.new(cloud_uri) end
# File lib/logstash/filters/elasticsearch.rb, line 454 def parse_user_password_from_cloud_auth(cloud_auth) begin # might not be available on older LS require 'logstash/util/cloud_setting_auth' rescue LoadError raise LogStash::ConfigurationError, 'The cloud_auth setting is not supported by your version of Logstash, ' + 'please upgrade your installation (or set user/password instead).' end cloud_auth = cloud_auth.value if cloud_auth.is_a?(LogStash::Util::Password) begin cloud_auth = LogStash::Util::CloudSettingAuth.new(cloud_auth) rescue ArgumentError => e raise LogStash::ConfigurationError, e.message.to_s.sub(/Cloud Auth/i, 'cloud_auth') end [ cloud_auth.username, cloud_auth.password ] end
@param kind is a string [truststore|keystore]
# File lib/logstash/filters/elasticsearch.rb, line 338 def setup_client_ssl_store(ssl_options, kind, store_path) if store_path ssl_options[kind.to_sym] = store_path ssl_options["#{kind}_type".to_sym] = params["ssl_#{kind}_type"] if params.include?("ssl_#{kind}_type") ssl_options["#{kind}_password".to_sym] = params["ssl_#{kind}_password"].value if params.include?("ssl_#{kind}_password") end end
# File lib/logstash/filters/elasticsearch.rb, line 479 def setup_serverless if get_client.serverless? @client_options[:serverless] = true @shared_client = new_client get_client.info end rescue => e @logger.error("Failed to retrieve Elasticsearch info", message: e.message, exception: e.class, backtrace: e.backtrace) raise LogStash::ConfigurationError, "Could not connect to a compatible version of Elasticsearch" end
# File lib/logstash/filters/elasticsearch.rb, line 490 def setup_ssl_params! @ssl_enabled = normalize_config(:ssl_enabled) do |normalize| normalize.with_deprecated_alias(:ssl) end # Infer the value if neither the deprecate `ssl` and `ssl_enabled` were set infer_ssl_enabled_from_hosts @ssl_keystore_path = normalize_config(:ssl_keystore_path) do |normalize| normalize.with_deprecated_alias(:keystore) end @ssl_keystore_password = normalize_config(:ssl_keystore_password) do |normalize| normalize.with_deprecated_alias(:keystore_password) end @ssl_certificate_authorities = normalize_config(:ssl_certificate_authorities) do |normalize| normalize.with_deprecated_mapping(:ca_file) do |ca_file| [ca_file] end end params['ssl_enabled'] = @ssl_enabled params['ssl_keystore_path'] = @ssl_keystore_path unless @ssl_keystore_path.nil? params['ssl_keystore_password'] = @ssl_keystore_password unless @ssl_keystore_password.nil? params['ssl_certificate_authorities'] = @ssl_certificate_authorities unless @ssl_certificate_authorities.nil? end
# File lib/logstash/filters/elasticsearch.rb, line 471 def test_connection! begin get_client.client.ping rescue Elasticsearch::UnsupportedProductError raise LogStash::ConfigurationError, "Could not connect to a compatible version of Elasticsearch" end end
# File lib/logstash/filters/elasticsearch.rb, line 405 def validate_authentication authn_options = 0 authn_options += 1 if @cloud_auth authn_options += 1 if (@api_key && @api_key.value) authn_options += 1 if (@user || (@password && @password.value)) if authn_options > 1 raise LogStash::ConfigurationError, 'Multiple authentication options are specified, please only use one of user/password, cloud_auth or api_key' end if @api_key && @api_key.value && @ssl_enabled != true raise(LogStash::ConfigurationError, "Using api_key authentication requires SSL/TLS secured communication using the `ssl => true` option") end end
# File lib/logstash/filters/elasticsearch.rb, line 395 def validate_query_settings unless @query || @query_template raise LogStash::ConfigurationError, "Both `query` and `query_template` are empty. Require either `query` or `query_template`." end if @query && @query_template raise LogStash::ConfigurationError, "Both `query` and `query_template` are set. Use either `query` or `query_template`." end end