class Fluent::Plugin::AwsElasticsearchServiceOutput

Public Instance Methods

get_connection_options(con_host=nil) click to toggle source

@override

# File lib/fluent/plugin/out_aws-elasticsearch-service.rb, line 35
def get_connection_options(con_host=nil)
  raise "`endpoint` require." if @endpoint.empty?

  @endpoint.map do |ep|
    raise Fluent::ConfigError, "Ensure you don't have a trailing slash on the endpoint URL in your fluentd configuration." if ep[:url].end_with?("/")
  end

  hosts =
    begin
      @endpoint.map do |ep|
        uri = URI(ep[:url])
        host = %w(user password path).inject(host: uri.host, port: uri.port, scheme: uri.scheme) do |hash, key|
          hash[key.to_sym] = uri.public_send(key) unless uri.public_send(key).nil? || uri.public_send(key) == ''
          hash
        end

        host[:aws_elasticsearch_service] = {
          :credentials => credentials(ep),
          :region => ep[:region]
        }

        host
      end
    end

  {
    hosts: hosts
  }
end
write(chunk) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_aws-elasticsearch-service.rb, line 65
def write(chunk)
  super
end

Private Instance Methods

credentials(opts) click to toggle source

get AWS Credentials

# File lib/fluent/plugin/out_aws-elasticsearch-service.rb, line 75
def credentials(opts)
  calback = lambda do
    credentials = nil
    unless opts[:access_key_id].empty? or opts[:secret_access_key].empty?
      credentials = Aws::Credentials.new opts[:access_key_id], opts[:secret_access_key]
    else
      if opts[:assume_role_arn].nil?
        aws_container_credentials_relative_uri = opts[:ecs_container_credentials_relative_uri] || ENV["AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"]
        if aws_container_credentials_relative_uri.nil?
          credentials = Aws::SharedCredentials.new({retries: 2}).credentials
          credentials ||= Aws::InstanceProfileCredentials.new.credentials
          credentials ||= Aws::ECSCredentials.new.credentials
        else
          credentials = Aws::ECSCredentials.new({
            credential_path: aws_container_credentials_relative_uri
          }).credentials
        end
      else

        if opts[:assume_role_web_identity_token_file].nil?
          credentials = sts_credential_provider({
                          role_arn: opts[:assume_role_arn],
                          role_session_name: opts[:assume_role_session_name],
                          region: sts_credentials_region(opts)
                        }).credentials
        else
          credentials = sts_web_identity_credential_provider({
                          role_arn: opts[:assume_role_arn],
                          web_identity_token_file: opts[:assume_role_web_identity_token_file],
                          region: sts_credentials_region(opts)
                        }).credentials
        end
      end
    end
    raise "No valid AWS credentials found." unless credentials.set?
    credentials
  end
  def calback.inspect
    credentials = self.call
    credentials.credentials.inspect
  end
  calback
end
sts_credential_provider(opts) click to toggle source
# File lib/fluent/plugin/out_aws-elasticsearch-service.rb, line 123
def sts_credential_provider(opts)
  # AssumeRoleCredentials is an auto-refreshing credential provider
  @sts ||= Aws::AssumeRoleCredentials.new(opts)
end
sts_credentials_region(opts) click to toggle source
# File lib/fluent/plugin/out_aws-elasticsearch-service.rb, line 119
def sts_credentials_region(opts)
  opts[:sts_credentials_region] || opts[:region]
end
sts_web_identity_credential_provider(opts) click to toggle source
# File lib/fluent/plugin/out_aws-elasticsearch-service.rb, line 128
def sts_web_identity_credential_provider(opts)
  # AssumeRoleWebIdentityCredentials is an auto-refreshing credential provider
  @sts ||= Aws::AssumeRoleWebIdentityCredentials.new(opts)
end