class LogStash::Inputs::Elasticsearch

.Compatibility Note

NOTE

Starting with Elasticsearch 5.3, there’s an {ref}modules-http.html[HTTP setting] called ‘http.content_type.required`. If this option is set to `true`, and you are using Logstash 2.4 through 5.2, you need to update the Elasticsearch input plugin to version 4.0.2 or higher.

Read from an Elasticsearch cluster, based on search query results. This is useful for replaying test logs, reindexing, etc. It also supports periodically scheduling lookup enrichments using a cron syntax (see ‘schedule` setting).

Example:

source,ruby

input {

# Read all documents from Elasticsearch matching the given query
elasticsearch {
  hosts => "localhost"
  query => '{ "query": { "match": { "statuscode": 200 } }, "sort": [ "_doc" ] }'
}

}

This would create an Elasticsearch query with the following format:

source,json

curl ‘localhost:9200/logstash-*/_search?&scroll=1m&size=1000’ -d ‘{

"query": {
  "match": {
    "statuscode": 200
  }
},
"sort": [ "_doc" ]

}‘

Scheduling

Input from this plugin can be scheduled to run periodically according to a specific schedule. This scheduling syntax is powered by github.com/jmettraux/rufus-scheduler[rufus-scheduler]. The syntax is cron-like with some extensions specific to Rufus (e.g. timezone support ).

Examples:

|========================================================== | ‘* 5 * 1-3 *` | will execute every minute of 5am every day of January through March. | `0 * * * *` | will execute on the 0th minute of every hour every day. | `0 6 * * * America/Chicago` | will execute at 6:00am (UTC/GMT -5) every day. |==========================================================

Further documentation describing this syntax can be found github.com/jmettraux/rufus-scheduler#parsing-cronlines-and-time-strings[here].

Constants

BUILD_FLAVOR_SERVERLESS
DEFAULT_EAV_HEADER

Attributes

client[R]

@private used by unit specs

pipeline_id[R]

Public Class Methods

new(params={}) click to toggle source
Calls superclass method
# File lib/logstash/inputs/elasticsearch.rb, line 275
def initialize(params={})
  super(params)

  if docinfo_target.nil?
    @docinfo_target = ecs_select[disabled: '@metadata', v1: '[@metadata][input][elasticsearch]']
  end
end

Public Instance Methods

push_hit(hit, output_queue, root_field = '_source') click to toggle source

This can be called externally from the query_executor

# File lib/logstash/inputs/elasticsearch.rb, line 349
def push_hit(hit, output_queue, root_field = '_source')
  event = targeted_event_factory.new_event hit[root_field]
  set_docinfo_fields(hit, event) if @docinfo
  decorate(event)
  output_queue << event
end
register() click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 283
def register
  require "rufus/scheduler"

  @pipeline_id = execution_context&.pipeline_id || 'main'

  fill_hosts_from_cloud_id
  setup_ssl_params!

  @base_query = LogStash::Json.load(@query)
  if @slices
    @base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option")
    @slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`")
  end

  @retries < 0 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `retries` option must be equal or greater than zero, got `#{@retries}`")

  validate_authentication
  fill_user_password_from_cloud_auth

  transport_options = {:headers => {}}
  transport_options[:headers].merge!(setup_basic_auth(user, password))
  transport_options[:headers].merge!(setup_api_key(api_key))
  transport_options[:headers].merge!({'user-agent' => prepare_user_agent()})
  transport_options[:request_timeout] = @request_timeout_seconds unless @request_timeout_seconds.nil?
  transport_options[:connect_timeout] = @connect_timeout_seconds unless @connect_timeout_seconds.nil?
  transport_options[:socket_timeout]  = @socket_timeout_seconds  unless @socket_timeout_seconds.nil?

  hosts = setup_hosts
  ssl_options = setup_client_ssl

  @logger.warn "Supplied proxy setting (proxy => '') has no effect" if @proxy.eql?('')

  transport_options[:proxy] = @proxy.to_s if @proxy && !@proxy.eql?('')

  @client_options = {
    :hosts => hosts,
    :transport_options => transport_options,
    :transport_class => ::Elasticsearch::Transport::Transport::HTTP::Manticore,
    :ssl => ssl_options
  }

  @client = Elasticsearch::Client.new(@client_options)

  test_connection!

  setup_serverless

  setup_search_api

  setup_query_executor

  @client
end
run(output_queue) click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 337
def run(output_queue)
  if @schedule
    scheduler.cron(@schedule) { @query_executor.do_run(output_queue) }
    scheduler.join
  else
    @query_executor.do_run(output_queue)
  end
end
set_docinfo_fields(hit, event) click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 356
def set_docinfo_fields(hit, event)
  # do not assume event[@docinfo_target] to be in-place updatable. first get it, update it, then at the end set it in the event.
  docinfo_target = event.get(@docinfo_target) || {}

  unless docinfo_target.is_a?(Hash)
    @logger.error("Incompatible Event, incompatible type for the docinfo_target=#{@docinfo_target} field in the `_source` document, expected a hash got:", :docinfo_target_type => docinfo_target.class, :event => event.to_hash_with_metadata)

    # TODO: (colin) I am not sure raising is a good strategy here?
    raise Exception.new("Elasticsearch input: incompatible event")
  end

  @docinfo_fields.each do |field|
    docinfo_target[field] = hit[field]
  end

  event.set(@docinfo_target, docinfo_target)
end

Private Instance Methods

build_flavor() click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 630
def build_flavor
  @build_flavor ||= es_info&.dig('version', 'build_flavor')
end
effectively_ssl?() click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 380
def effectively_ssl?
  return true if @ssl_enabled

  hosts = Array(@hosts)
  return false if hosts.nil? || hosts.empty?

  hosts.all? { |host| host && host.to_s.start_with?("https") }
end
es_info() click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 605
def es_info
  @es_info ||= @client.info
end
es_major_version() click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 613
def es_major_version
  @es_major_version ||= es_version.split('.').first.to_i
end
es_version() click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 609
def es_version
  @es_version ||= es_info&.dig('version', 'number')
end
fill_hosts_from_cloud_id() click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 553
def fill_hosts_from_cloud_id
  return unless @cloud_id

  if @hosts && !hosts_default?(@hosts)
    raise LogStash::ConfigurationError, 'Both cloud_id and hosts specified, please only use one of those.'
  end
  @hosts = parse_host_uri_from_cloud_id(@cloud_id)
end
fill_user_password_from_cloud_auth() click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 546
def fill_user_password_from_cloud_auth
  return unless @cloud_auth

  @user, @password = parse_user_password_from_cloud_auth(@cloud_auth)
  params['user'], params['password'] = @user, @password
end
hosts_default?(hosts) click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 376
def hosts_default?(hosts)
  hosts.nil? || ( hosts.is_a?(Array) && hosts.empty? )
end
infer_ssl_enabled_from_hosts() click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 502
def infer_ssl_enabled_from_hosts
  return if original_params.include?('ssl') || original_params.include?('ssl_enabled')

  @ssl_enabled = params['ssl_enabled'] = effectively_ssl?
end
parse_host_uri_from_cloud_id(cloud_id) click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 562
def parse_host_uri_from_cloud_id(cloud_id)
  begin # might not be available on older LS
    require 'logstash/util/cloud_setting_id'
  rescue LoadError
    raise LogStash::ConfigurationError, 'The cloud_id setting is not supported by your version of Logstash, ' +
        'please upgrade your installation (or set hosts instead).'
  end

  begin
    cloud_id = LogStash::Util::CloudSettingId.new(cloud_id) # already does append ':{port}' to host
  rescue ArgumentError => e
    raise LogStash::ConfigurationError, e.message.to_s.sub(/Cloud Id/i, 'cloud_id')
  end
  cloud_uri = "#{cloud_id.elasticsearch_scheme}://#{cloud_id.elasticsearch_host}"
  LogStash::Util::SafeURI.new(cloud_uri)
end
parse_user_password_from_cloud_auth(cloud_auth) click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 579
def parse_user_password_from_cloud_auth(cloud_auth)
  begin # might not be available on older LS
    require 'logstash/util/cloud_setting_auth'
  rescue LoadError
    raise LogStash::ConfigurationError, 'The cloud_auth setting is not supported by your version of Logstash, ' +
        'please upgrade your installation (or set user/password instead).'
  end

  cloud_auth = cloud_auth.value if cloud_auth.is_a?(LogStash::Util::Password)
  begin
    cloud_auth = LogStash::Util::CloudSettingAuth.new(cloud_auth)
  rescue ArgumentError => e
    raise LogStash::ConfigurationError, e.message.to_s.sub(/Cloud Auth/i, 'cloud_auth')
  end
  [ cloud_auth.username, cloud_auth.password ]
end
prepare_user_agent() click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 534
def prepare_user_agent
    os_name = java.lang.System.getProperty('os.name')
    os_version = java.lang.System.getProperty('os.version')
    os_arch = java.lang.System.getProperty('os.arch')
    jvm_vendor = java.lang.System.getProperty('java.vendor')
    jvm_version = java.lang.System.getProperty('java.version')

    plugin_version = Gem.loaded_specs["logstash-input-elasticsearch"].version
    # example: logstash/7.14.1 (OS=Linux-5.4.0-84-generic-amd64; JVM=AdoptOpenJDK-11.0.11) logstash-input-elasticsearch/4.10.0
    "logstash/#{LOGSTASH_VERSION} (OS=#{os_name}-#{os_version}-#{os_arch}; JVM=#{jvm_vendor}-#{jvm_version}) logstash-#{@plugin_type}-#{config_name}/#{plugin_version}"
end
serverless?() click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 634
def serverless?
  @is_serverless ||= (build_flavor == BUILD_FLAVOR_SERVERLESS)
end
setup_api_key(api_key) click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 527
def setup_api_key(api_key)
  return {} unless (api_key && api_key.value)

  token = ::Base64.strict_encode64(api_key.value)
  { 'Authorization' => "ApiKey #{token}" }
end
setup_basic_auth(user, password) click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 520
def setup_basic_auth(user, password)
  return {} unless user && password && password.value

  token = ::Base64.strict_encode64("#{user}:#{password.value}")
  { 'Authorization' => "Basic #{token}" }
end
setup_client_ssl() click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 404
def setup_client_ssl
  ssl_options = {}
  ssl_options[:ssl] = true if @ssl_enabled

  unless @ssl_enabled
    # Keep it backward compatible with the deprecated `ssl` option
    ssl_options[:trust_strategy] = trust_strategy_for_ca_trusted_fingerprint if original_params.include?('ssl')
    return ssl_options
  end

  ssl_certificate_authorities, ssl_truststore_path, ssl_certificate, ssl_keystore_path = params.values_at('ssl_certificate_authorities', 'ssl_truststore_path', 'ssl_certificate', 'ssl_keystore_path')

  if ssl_certificate_authorities && ssl_truststore_path
    raise LogStash::ConfigurationError, 'Use either "ssl_certificate_authorities/ca_file" or "ssl_truststore_path" when configuring the CA certificate'
  end

  if ssl_certificate && ssl_keystore_path
    raise LogStash::ConfigurationError, 'Use either "ssl_certificate" or "ssl_keystore_path/keystore" when configuring client certificates'
  end

  if ssl_certificate_authorities&.any?
    raise LogStash::ConfigurationError, 'Multiple values on "ssl_certificate_authorities" are not supported by this plugin' if ssl_certificate_authorities.size > 1
    ssl_options[:ca_file] = ssl_certificate_authorities.first
  end

  if ssl_truststore_path
    ssl_options[:truststore] = ssl_truststore_path
    ssl_options[:truststore_type] = params["ssl_truststore_type"] if params.include?("ssl_truststore_type")
    ssl_options[:truststore_password] = params["ssl_truststore_password"].value if params.include?("ssl_truststore_password")
  end

  if ssl_keystore_path
    ssl_options[:keystore] = ssl_keystore_path
    ssl_options[:keystore_type] = params["ssl_keystore_type"] if params.include?("ssl_keystore_type")
    ssl_options[:keystore_password] = params["ssl_keystore_password"].value if params.include?("ssl_keystore_password")
  end

  ssl_key = params["ssl_key"]
  if ssl_certificate
    raise LogStash::ConfigurationError, 'Using an "ssl_certificate" requires an "ssl_key"' unless ssl_key
    ssl_options[:client_cert] = ssl_certificate
    ssl_options[:client_key] = ssl_key
  elsif !ssl_key.nil?
    raise LogStash::ConfigurationError, 'An "ssl_certificate" is required when using an "ssl_key"'
  end

  ssl_verification_mode = params["ssl_verification_mode"]
  unless ssl_verification_mode.nil?
    case ssl_verification_mode
      when 'none'
        logger.warn "You have enabled encryption but DISABLED certificate verification, " +
                      "to make sure your data is secure set `ssl_verification_mode => full`"
        ssl_options[:verify] = :disable
      else
        # Manticore's :default maps to Apache HTTP Client's DefaultHostnameVerifier,
        # which is the modern STRICT verifier that replaces the deprecated StrictHostnameVerifier
        ssl_options[:verify] = :default
    end
  end

  ssl_options[:cipher_suites] = params["ssl_cipher_suites"] if params.include?("ssl_cipher_suites")

  protocols = params['ssl_supported_protocols']
  ssl_options[:protocols] = protocols if protocols&.any?
  ssl_options[:trust_strategy] = trust_strategy_for_ca_trusted_fingerprint

  ssl_options
end
setup_hosts() click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 508
def setup_hosts
  @hosts = Array(@hosts).map { |host| host.to_s } # potential SafeURI#to_s
  @hosts.map do |h|
    if h.start_with?('http:', 'https:')
      h
    else
      host, port = h.split(':')
      { host: host, port: port, scheme: (@ssl_enabled ? 'https' : 'http') }
    end
  end
end
setup_query_executor() click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 653
def setup_query_executor
  @query_executor = case @response_type
                    when 'hits'
                      if @resolved_search_api == "search_after"
                        LogStash::Inputs::Elasticsearch::SearchAfter.new(@client, self)
                      else
                        logger.warn("scroll API is no longer recommended for pagination. Consider using search_after instead.") if es_major_version >= 8
                        LogStash::Inputs::Elasticsearch::Scroll.new(@client, self)
                      end
                    when 'aggregations'
                      LogStash::Inputs::Elasticsearch::Aggregation.new(@client, self)
                    end
end
setup_search_api() click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 638
def setup_search_api
  @resolved_search_api = if @search_api == "auto"
                           api = if es_major_version >= 8
                                  "search_after"
                                 else
                                   "scroll"
                                 end
                           logger.info("`search_api => auto` resolved to `#{api}`", :elasticsearch => es_version)
                           api
                         else
                           @search_api
                         end

end
setup_serverless() click to toggle source

recreate client with default header when it is serverless verify the header by sending GET /

# File lib/logstash/inputs/elasticsearch.rb, line 619
def setup_serverless
  if serverless?
    @client_options[:transport_options][:headers].merge!(DEFAULT_EAV_HEADER)
    @client = Elasticsearch::Client.new(@client_options)
    @client.info
  end
rescue => e
  @logger.error("Failed to retrieve Elasticsearch info", message: e.message, exception: e.class, backtrace: e.backtrace)
  raise LogStash::ConfigurationError, "Could not connect to a compatible version of Elasticsearch"
end
setup_ssl_params!() click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 473
def setup_ssl_params!
  @ssl_enabled = normalize_config(:ssl_enabled) do |normalize|
    normalize.with_deprecated_alias(:ssl)
  end

  # Infer the value if neither the deprecate `ssl` and `ssl_enabled` were set
  infer_ssl_enabled_from_hosts

  @ssl_certificate_authorities = normalize_config(:ssl_certificate_authorities) do |normalize|
    normalize.with_deprecated_mapping(:ca_file) do |ca_file|
      [ca_file]
    end
  end

  @ssl_verification_mode = normalize_config(:ssl_verification_mode) do |normalize|
    normalize.with_deprecated_mapping(:ssl_certificate_verification) do |ssl_certificate_verification|
      if ssl_certificate_verification == true
        "full"
      else
        "none"
      end
    end
  end

  params['ssl_enabled'] = @ssl_enabled
  params['ssl_certificate_authorities'] = @ssl_certificate_authorities unless @ssl_certificate_authorities.nil?
  params['ssl_verification_mode'] = @ssl_verification_mode unless @ssl_verification_mode.nil?
end
test_connection!() click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 599
def test_connection!
  @client.ping
rescue Elasticsearch::UnsupportedProductError
  raise LogStash::ConfigurationError, "Could not connect to a compatible version of Elasticsearch"
end
validate_authentication() click to toggle source
# File lib/logstash/inputs/elasticsearch.rb, line 389
def validate_authentication
  authn_options = 0
  authn_options += 1 if @cloud_auth
  authn_options += 1 if (@api_key && @api_key.value)
  authn_options += 1 if (@user || (@password && @password.value))

  if authn_options > 1
    raise LogStash::ConfigurationError, 'Multiple authentication options are specified, please only use one of user/password, cloud_auth or api_key'
  end

  if @api_key && @api_key.value && @ssl_enabled != true
    raise(LogStash::ConfigurationError, "Using api_key authentication requires SSL/TLS secured communication using the `ssl_enabled => true` option")
  end
end