class LogStash::Filters::Elasticsearch

Attributes

shared_client[R]

Public Class Methods

validate_value(value, validator) click to toggle source

@override to handle proxy => ” as if none was set @param value [Array<Object>] @param validator [nil,Array,Symbol] @return [Array(true,Object)]: if validation is a success, a tuple containing ‘true` and the coerced value @return [Array(false,String)]: if validation is a failure, a tuple containing `false` and the failure reason.

Calls superclass method
# File lib/logstash/filters/elasticsearch.rb, line 152
def self.validate_value(value, validator)
  return super unless validator == :uri_or_empty

  value = deep_replace(value)
  value = hash_or_array(value)

  return true, value.first if value.size == 1 && value.first.empty?

  return super(value, :uri)
end

Public Instance Methods

filter(event) click to toggle source
# File lib/logstash/filters/elasticsearch.rb, line 185
def filter(event)
  matched = false
  begin
    params = { :index => event.sprintf(@index) }

    if @query_dsl
      query = LogStash::Json.load(event.sprintf(@query_dsl))
      params[:body] = query
    else
      query = event.sprintf(@query)
      params[:q] = query
      params[:size] = result_size
      params[:sort] =  @sort if @enable_sort
    end

    @logger.debug("Querying elasticsearch for lookup", :params => params)

    results = get_client.search(params)
    raise "Elasticsearch query error: #{results["_shards"]["failures"]}" if results["_shards"].include? "failures"

    event.set("[@metadata][total_hits]", extract_total_from_hits(results['hits']))

    resultsHits = results["hits"]["hits"]
    if !resultsHits.nil? && !resultsHits.empty?
      matched = true
      @fields.each do |old_key, new_key|
        old_key_path = extract_path(old_key)
        set = resultsHits.map do |doc|
          extract_value(doc["_source"], old_key_path)
        end
        event.set(new_key, set.count > 1 ? set : set.first)
      end
      @docinfo_fields.each do |old_key, new_key|
        old_key_path = extract_path(old_key)
        set = resultsHits.map do |doc|
          extract_value(doc, old_key_path)
        end
        event.set(new_key, set.count > 1 ? set : set.first)
      end
    end

    resultsAggs = results["aggregations"]
    if !resultsAggs.nil? && !resultsAggs.empty?
      matched = true
      @aggregation_fields.each do |agg_name, ls_field|
        event.set(ls_field, resultsAggs[agg_name])
      end
    end

  rescue => e
    if @logger.trace?
      @logger.warn("Failed to query elasticsearch for previous event", :index => @index, :query => query, :event => event.to_hash, :error => e.message, :backtrace => e.backtrace)
    elsif @logger.debug?
      @logger.warn("Failed to query elasticsearch for previous event", :index => @index, :error => e.message, :backtrace => e.backtrace)
    else
      @logger.warn("Failed to query elasticsearch for previous event", :index => @index, :error => e.message)
    end
    @tag_on_failure.each{|tag| event.tag(tag)}
  else
    filter_matched(event) if matched
  end
end
prepare_user_agent() click to toggle source

public only to be reuse in testing

# File lib/logstash/filters/elasticsearch.rb, line 249
def prepare_user_agent
  os_name = java.lang.System.getProperty('os.name')
  os_version = java.lang.System.getProperty('os.version')
  os_arch = java.lang.System.getProperty('os.arch')
  jvm_vendor = java.lang.System.getProperty('java.vendor')
  jvm_version = java.lang.System.getProperty('java.version')

  plugin_version = Gem.loaded_specs['logstash-filter-elasticsearch'].version
  # example: logstash/7.14.1 (OS=Linux-5.4.0-84-generic-amd64; JVM=AdoptOpenJDK-11.0.11) logstash-output-elasticsearch/11.0.1
  "logstash/#{LOGSTASH_VERSION} (OS=#{os_name}-#{os_version}-#{os_arch}; JVM=#{jvm_vendor}-#{jvm_version}) logstash-#{@plugin_type}-#{config_name}/#{plugin_version}"
end
register() click to toggle source
# File lib/logstash/filters/elasticsearch.rb, line 163
def register
  #Load query if it exists
  if @query_template
    if File.zero?(@query_template)
      raise "template is empty"
    end
    file = File.open(@query_template, 'r')
    @query_dsl = file.read
  end

  validate_query_settings
  fill_hosts_from_cloud_id
  setup_ssl_params!
  validate_authentication
  fill_user_password_from_cloud_auth

  @hosts = Array(@hosts).map { |host| host.to_s } # potential SafeURI#to_s

  test_connection!
  setup_serverless
end

Private Instance Methods

client_options() click to toggle source
# File lib/logstash/filters/elasticsearch.rb, line 263
def client_options
  @client_options ||= {
    :user => @user,
    :password => @password,
    :api_key => @api_key,
    :proxy => @proxy,
    :ssl => client_ssl_options,
    :retry_on_failure => @retry_on_failure,
    :retry_on_status => @retry_on_status,
    :user_agent => prepare_user_agent
  }
end
client_ssl_options() click to toggle source
# File lib/logstash/filters/elasticsearch.rb, line 276
def client_ssl_options
  ssl_options = {}
  ssl_options[:enabled] = @ssl_enabled

  # If the deprecated `ssl` option was explicitly provided, it keeps the same behavior
  # setting up all the client SSL configs even if ssl => false. Otherwise, it should use
  # the @ssl_enabled value as it was either explicitly set by the `ssl_enabled` option or
  # inferred from the hosts scheme.
  return ssl_options unless @ssl_enabled || original_params.include?('ssl')

  ssl_options[:enabled] = true
  ssl_certificate_authorities, ssl_truststore_path, ssl_certificate, ssl_keystore_path = params.values_at('ssl_certificate_authorities', 'ssl_truststore_path', 'ssl_certificate', 'ssl_keystore_path')

  if ssl_certificate_authorities && ssl_truststore_path
    raise LogStash::ConfigurationError, 'Use either "ssl_certificate_authorities/ca_file" or "ssl_truststore_path" when configuring the CA certificate'
  end

  if ssl_certificate && ssl_keystore_path
    raise LogStash::ConfigurationError, 'Use either "ssl_certificate" or "ssl_keystore_path/keystore" when configuring client certificates'
  end

  if ssl_certificate_authorities&.any?
    raise LogStash::ConfigurationError, 'Multiple values on "ssl_certificate_authorities" are not supported by this plugin' if ssl_certificate_authorities.size > 1
    ssl_options[:ca_file] = ssl_certificate_authorities.first
  end

  setup_client_ssl_store(ssl_options, 'truststore', ssl_truststore_path)
  setup_client_ssl_store(ssl_options, 'keystore', ssl_keystore_path)
  logger.debug("Keystore for client certificate", :keystore => ssl_keystore_path) if ssl_keystore_path

  ssl_key = params["ssl_key"]
  if ssl_certificate
    raise LogStash::ConfigurationError, 'Using an "ssl_certificate" requires an "ssl_key"' unless ssl_key
    ssl_options[:client_cert] = ssl_certificate
    ssl_options[:client_key] = ssl_key
  elsif !ssl_key.nil?
    raise LogStash::ConfigurationError, 'An "ssl_certificate" is required when using an "ssl_key"'
  end

  ssl_verification_mode = params["ssl_verification_mode"]
  unless ssl_verification_mode.nil?
    case ssl_verification_mode
      when 'none'
        logger.warn "You have enabled encryption but DISABLED certificate verification, " +
                      "to make sure your data is secure set `ssl_verification_mode => full`"
        ssl_options[:verify] = :disable
      else
        # Manticore's :default maps to Apache HTTP Client's DefaultHostnameVerifier,
        # which is the modern STRICT verifier that replaces the deprecated StrictHostnameVerifier
        ssl_options[:verify] = :default
    end
  end

  ssl_options[:cipher_suites] = params["ssl_cipher_suites"] if params.include?("ssl_cipher_suites")
  protocols = params['ssl_supported_protocols']
  ssl_options[:protocols] = protocols if protocols&.any?
  ssl_options[:trust_strategy] = trust_strategy_for_ca_trusted_fingerprint

  ssl_options
end
effectively_ssl?() click to toggle source
# File lib/logstash/filters/elasticsearch.rb, line 524
def effectively_ssl?
  return true if @ssl_enabled

  hosts = Array(@hosts)
  return false if hosts.nil? || hosts.empty?

  hosts.all? { |host| host && host.to_s.start_with?("https") }
end
extract_path(path_reference) click to toggle source

get an array of path elements from a path reference

# File lib/logstash/filters/elasticsearch.rb, line 359
def extract_path(path_reference)
  return [path_reference] unless path_reference.start_with?('[') && path_reference.end_with?(']')

  path_reference[1...-1].split('][')
end
extract_total_from_hits(hits) click to toggle source

Given a “hits” object from an Elasticsearch response, return the total number of hits in the result set. @param hits [Hash{String=>Object}] @return [Integer]

# File lib/logstash/filters/elasticsearch.rb, line 380
def extract_total_from_hits(hits)
  total = hits['total']

  # Elasticsearch 7.x produces an object containing `value` and `relation` in order
  # to enable unambiguous reporting when the total is only a lower bound; if we get
  # an object back, return its `value`.
  return total['value'] if total.kind_of?(Hash)

  total
end
extract_value(source, path) click to toggle source

given a Hash and an array of path fragments, returns the value at the path @param source [Hash{String=>Object}] @param path [Array{String}] @return [Object]

# File lib/logstash/filters/elasticsearch.rb, line 369
def extract_value(source, path)
  path.reduce(source) do |memo, old_key_fragment|
    break unless memo.include?(old_key_fragment)
    memo[old_key_fragment]
  end
end
fill_hosts_from_cloud_id() click to toggle source
# File lib/logstash/filters/elasticsearch.rb, line 427
def fill_hosts_from_cloud_id
  return unless @cloud_id

  if @hosts && !hosts_default?(@hosts)
    raise LogStash::ConfigurationError, 'Both cloud_id and hosts specified, please only use one of those.'
  end
  @hosts = parse_host_uri_from_cloud_id(@cloud_id)
end
fill_user_password_from_cloud_auth() click to toggle source
# File lib/logstash/filters/elasticsearch.rb, line 420
def fill_user_password_from_cloud_auth
  return unless @cloud_auth

  @user, @password = parse_user_password_from_cloud_auth(@cloud_auth)
  params['user'], params['password'] = @user, @password
end
get_client() click to toggle source
# File lib/logstash/filters/elasticsearch.rb, line 352
def get_client
  @shared_client || synchronize do
    @shared_client ||= new_client
  end
end
hosts_default?(hosts) click to toggle source
# File lib/logstash/filters/elasticsearch.rb, line 391
def hosts_default?(hosts)
  hosts.is_a?(Array) && hosts.size == 1 && !original_params.key?('hosts')
end
infer_ssl_enabled_from_hosts() click to toggle source
# File lib/logstash/filters/elasticsearch.rb, line 518
def infer_ssl_enabled_from_hosts
  return if original_params.include?('ssl') || original_params.include?('ssl_enabled')

  @ssl_enabled = params['ssl_enabled'] = effectively_ssl?
end
new_client() click to toggle source
# File lib/logstash/filters/elasticsearch.rb, line 346
def new_client
  # NOTE: could pass cloud-id/cloud-auth to client but than we would need to be stricter on ES version requirement
  # and also LS parsing might differ from ES client's parsing so for consistency we do not pass cloud options ...
  LogStash::Filters::ElasticsearchClient.new(@logger, @hosts, client_options)
end
parse_host_uri_from_cloud_id(cloud_id) click to toggle source
# File lib/logstash/filters/elasticsearch.rb, line 436
def parse_host_uri_from_cloud_id(cloud_id)
  require 'logstash/util/safe_uri'
  begin # might not be available on older LS
    require 'logstash/util/cloud_setting_id'
  rescue LoadError
    raise LogStash::ConfigurationError, 'The cloud_id setting is not supported by your version of Logstash, ' +
        'please upgrade your installation (or set hosts instead).'
  end

  begin
    cloud_id = LogStash::Util::CloudSettingId.new(cloud_id) # already does append ':{port}' to host
  rescue ArgumentError => e
    raise LogStash::ConfigurationError, e.message.to_s.sub(/Cloud Id/i, 'cloud_id')
  end
  cloud_uri = "#{cloud_id.elasticsearch_scheme}://#{cloud_id.elasticsearch_host}"
  LogStash::Util::SafeURI.new(cloud_uri)
end
parse_user_password_from_cloud_auth(cloud_auth) click to toggle source
# File lib/logstash/filters/elasticsearch.rb, line 454
def parse_user_password_from_cloud_auth(cloud_auth)
  begin # might not be available on older LS
    require 'logstash/util/cloud_setting_auth'
  rescue LoadError
    raise LogStash::ConfigurationError, 'The cloud_auth setting is not supported by your version of Logstash, ' +
        'please upgrade your installation (or set user/password instead).'
  end

  cloud_auth = cloud_auth.value if cloud_auth.is_a?(LogStash::Util::Password)
  begin
    cloud_auth = LogStash::Util::CloudSettingAuth.new(cloud_auth)
  rescue ArgumentError => e
    raise LogStash::ConfigurationError, e.message.to_s.sub(/Cloud Auth/i, 'cloud_auth')
  end
  [ cloud_auth.username, cloud_auth.password ]
end
setup_client_ssl_store(ssl_options, kind, store_path) click to toggle source

@param kind is a string [truststore|keystore]

# File lib/logstash/filters/elasticsearch.rb, line 338
def setup_client_ssl_store(ssl_options, kind, store_path)
  if store_path
    ssl_options[kind.to_sym] = store_path
    ssl_options["#{kind}_type".to_sym] = params["ssl_#{kind}_type"] if params.include?("ssl_#{kind}_type")
    ssl_options["#{kind}_password".to_sym] = params["ssl_#{kind}_password"].value if params.include?("ssl_#{kind}_password")
  end
end
setup_serverless() click to toggle source
# File lib/logstash/filters/elasticsearch.rb, line 479
def setup_serverless
  if get_client.serverless?
    @client_options[:serverless] = true
    @shared_client = new_client
    get_client.info
  end
rescue => e
  @logger.error("Failed to retrieve Elasticsearch info", message: e.message, exception: e.class, backtrace: e.backtrace)
  raise LogStash::ConfigurationError, "Could not connect to a compatible version of Elasticsearch"
end
setup_ssl_params!() click to toggle source
# File lib/logstash/filters/elasticsearch.rb, line 490
def setup_ssl_params!
  @ssl_enabled = normalize_config(:ssl_enabled) do |normalize|
    normalize.with_deprecated_alias(:ssl)
  end

  # Infer the value if neither the deprecate `ssl` and `ssl_enabled` were set
  infer_ssl_enabled_from_hosts

  @ssl_keystore_path = normalize_config(:ssl_keystore_path) do |normalize|
    normalize.with_deprecated_alias(:keystore)
  end

  @ssl_keystore_password = normalize_config(:ssl_keystore_password) do |normalize|
    normalize.with_deprecated_alias(:keystore_password)
  end

  @ssl_certificate_authorities = normalize_config(:ssl_certificate_authorities) do |normalize|
    normalize.with_deprecated_mapping(:ca_file) do |ca_file|
      [ca_file]
    end
  end

  params['ssl_enabled'] = @ssl_enabled
  params['ssl_keystore_path'] = @ssl_keystore_path unless @ssl_keystore_path.nil?
  params['ssl_keystore_password'] = @ssl_keystore_password unless @ssl_keystore_password.nil?
  params['ssl_certificate_authorities'] = @ssl_certificate_authorities unless @ssl_certificate_authorities.nil?
end
test_connection!() click to toggle source
# File lib/logstash/filters/elasticsearch.rb, line 471
def test_connection!
  begin
    get_client.client.ping
  rescue Elasticsearch::UnsupportedProductError
    raise LogStash::ConfigurationError, "Could not connect to a compatible version of Elasticsearch"
  end
end
validate_authentication() click to toggle source
# File lib/logstash/filters/elasticsearch.rb, line 405
def validate_authentication
  authn_options = 0
  authn_options += 1 if @cloud_auth
  authn_options += 1 if (@api_key && @api_key.value)
  authn_options += 1 if (@user || (@password && @password.value))

  if authn_options > 1
    raise LogStash::ConfigurationError, 'Multiple authentication options are specified, please only use one of user/password, cloud_auth or api_key'
  end

  if @api_key && @api_key.value && @ssl_enabled != true
    raise(LogStash::ConfigurationError, "Using api_key authentication requires SSL/TLS secured communication using the `ssl => true` option")
  end
end
validate_query_settings() click to toggle source
# File lib/logstash/filters/elasticsearch.rb, line 395
def validate_query_settings
  unless @query || @query_template
    raise LogStash::ConfigurationError, "Both `query` and `query_template` are empty. Require either `query` or `query_template`."
  end

  if @query && @query_template
    raise LogStash::ConfigurationError, "Both `query` and `query_template` are set. Use either `query` or `query_template`."
  end
end