module Fluent::Plugin::KinesisHelper::Client

Public Class Methods

included(mod) click to toggle source
# File lib/fluent/plugin/kinesis_helper/client.rb, line 89
def self.included(mod)
  mod.include ClientParams
end

Public Instance Methods

client() click to toggle source
# File lib/fluent/plugin/kinesis_helper/client.rb, line 98
def client
  @client ||= client_class.new(client_options)
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/kinesis_helper/client.rb, line 93
def configure(conf)
  super
  @region = client.config.region if @region.nil?
end

Private Instance Methods

client_class() click to toggle source
# File lib/fluent/plugin/kinesis_helper/client.rb, line 104
def client_class
  case request_type
  when :streams, :streams_aggregated
    require 'aws-sdk-kinesis'
    Aws::Kinesis::Client
  when :firehose
    require 'aws-sdk-firehose'
    Aws::Firehose::Client
  end
end
client_options() click to toggle source
# File lib/fluent/plugin/kinesis_helper/client.rb, line 115
def client_options
  options = setup_credentials
  options.update(
    user_agent_suffix: "fluent-plugin-kinesis/#{request_type}/#{FluentPluginKinesis::VERSION}"
  )
  options.update(region:          @region)          unless @region.nil?
  options.update(http_proxy:      @http_proxy)      unless @http_proxy.nil?
  options.update(endpoint:        @endpoint)        unless @endpoint.nil?
  options.update(ssl_verify_peer: @ssl_verify_peer) unless @ssl_verify_peer.nil?
  if @debug
    options.update(logger: Logger.new(log.out))
    options.update(log_level: :debug)
  end
  options
end
setup_credentials() click to toggle source
# File lib/fluent/plugin/kinesis_helper/client.rb, line 131
def setup_credentials
  options = {}
  credentials_options = {}
  case
  when @aws_key_id && @aws_sec_key && @aws_ses_token
    options[:access_key_id] = @aws_key_id
    options[:secret_access_key] = @aws_sec_key
    options[:session_token] = @aws_ses_token  
  when @aws_key_id && @aws_sec_key
    options[:access_key_id] = @aws_key_id
    options[:secret_access_key] = @aws_sec_key
  when @assume_role_credentials
    c = @assume_role_credentials
    credentials_options[:role_arn] = c.role_arn
    credentials_options[:role_session_name] = c.role_session_name
    credentials_options[:policy] = c.policy if c.policy
    credentials_options[:duration_seconds] = c.duration_seconds if c.duration_seconds
    credentials_options[:external_id] = c.external_id if c.external_id
    credentials_options[:sts_endpoint_url] = c.sts_endpoint_url if c.sts_endpoint_url
    if @region and c.sts_http_proxy and c.sts_endpoint_url
        credentials_options[:client] = Aws::STS::Client.new(region: @region, http_proxy: c.sts_http_proxy, endpoint: c.sts_endpoint_url)
    elsif c.sts_http_proxy and c.sts_endpoint_url
        credentials_options[:client] = Aws::STS::Client.new(http_proxy: c.sts_http_proxy, endpoint: c.sts_endpoint_url)
    elsif @region and c.sts_http_proxy
        credentials_options[:client] = Aws::STS::Client.new(region: @region, http_proxy: c.sts_http_proxy)
    elsif @region and c.sts_endpoint_url
        credentials_options[:client] = Aws::STS::Client.new(region: @region, endpoint: c.sts_endpoint_url)
    elsif c.sts_http_proxy
        credentials_options[:client] = Aws::STS::Client.new(http_proxy: c.sts_http_proxy)
    elsif c.sts_endpoint_url
        credentials_options[:client] = Aws::STS::Client.new(endpoint: c.sts_endpoint_url)
    elsif @region
        credentials_options[:client] = Aws::STS::Client.new(region: @region)
    end
    options[:credentials] = Aws::AssumeRoleCredentials.new(credentials_options)
  when @web_identity_credentials
    c = @web_identity_credentials
    credentials_options[:role_arn] = c.role_arn
    credentials_options[:role_session_name] = c.role_session_name
    credentials_options[:web_identity_token_file] = c.web_identity_token_file
    credentials_options[:policy] = c.policy if c.policy
    credentials_options[:duration_seconds] = c.duration_seconds if c.duration_seconds
    if @region
      credentials_options[:client] = Aws::STS::Client.new(:region => @region)
    end
    options[:credentials] = Aws::AssumeRoleWebIdentityCredentials.new(credentials_options)
  when @instance_profile_credentials
    c = @instance_profile_credentials
    credentials_options[:retries] = c.retries if c.retries
    credentials_options[:ip_address] = c.ip_address if c.ip_address
    credentials_options[:port] = c.port if c.port
    credentials_options[:http_open_timeout] = c.http_open_timeout if c.http_open_timeout
    credentials_options[:http_read_timeout] = c.http_read_timeout if c.http_read_timeout
    if ENV["AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"]
      options[:credentials] = Aws::ECSCredentials.new(credentials_options)
    else
      options[:credentials] = Aws::InstanceProfileCredentials.new(credentials_options)
    end
  when @shared_credentials
    c = @shared_credentials
    credentials_options[:path] = c.path if c.path
    credentials_options[:profile_name] = c.profile_name if c.profile_name
    options[:credentials] = Aws::SharedCredentials.new(credentials_options)
  when @process_credentials
    if Gem::Version.new(Aws::CORE_GEM_VERSION) < Gem::Version.new('3.24.0')
      raise Fluent::ConfigError, "Config process_credentials requires aws-sdk-core >= 3.24.0. Found aws-sdk-core #{Aws::CORE_GEM_VERSION} instead."
    end
    c = @process_credentials
    process = c.process
    options[:credentials] = Aws::ProcessCredentials.new(process)
  else
    # Use default credentials
    # See http://docs.aws.amazon.com/sdkforruby/api/Aws/S3/Client.html
  end
  options
end