module Fluent::Plugin::KinesisHelper::Client
Public Class Methods
included(mod)
click to toggle source
# File lib/fluent/plugin/kinesis_helper/client.rb, line 89 def self.included(mod) mod.include ClientParams end
Public Instance Methods
client()
click to toggle source
# File lib/fluent/plugin/kinesis_helper/client.rb, line 98 def client @client ||= client_class.new(client_options) end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/kinesis_helper/client.rb, line 93 def configure(conf) super @region = client.config.region if @region.nil? end
Private Instance Methods
client_class()
click to toggle source
# File lib/fluent/plugin/kinesis_helper/client.rb, line 104 def client_class case request_type when :streams, :streams_aggregated require 'aws-sdk-kinesis' Aws::Kinesis::Client when :firehose require 'aws-sdk-firehose' Aws::Firehose::Client end end
client_options()
click to toggle source
# File lib/fluent/plugin/kinesis_helper/client.rb, line 115 def client_options options = setup_credentials options.update( user_agent_suffix: "fluent-plugin-kinesis/#{request_type}/#{FluentPluginKinesis::VERSION}" ) options.update(region: @region) unless @region.nil? options.update(http_proxy: @http_proxy) unless @http_proxy.nil? options.update(endpoint: @endpoint) unless @endpoint.nil? options.update(ssl_verify_peer: @ssl_verify_peer) unless @ssl_verify_peer.nil? if @debug options.update(logger: Logger.new(log.out)) options.update(log_level: :debug) end options end
setup_credentials()
click to toggle source
# File lib/fluent/plugin/kinesis_helper/client.rb, line 131 def setup_credentials options = {} credentials_options = {} case when @aws_key_id && @aws_sec_key && @aws_ses_token options[:access_key_id] = @aws_key_id options[:secret_access_key] = @aws_sec_key options[:session_token] = @aws_ses_token when @aws_key_id && @aws_sec_key options[:access_key_id] = @aws_key_id options[:secret_access_key] = @aws_sec_key when @assume_role_credentials c = @assume_role_credentials credentials_options[:role_arn] = c.role_arn credentials_options[:role_session_name] = c.role_session_name credentials_options[:policy] = c.policy if c.policy credentials_options[:duration_seconds] = c.duration_seconds if c.duration_seconds credentials_options[:external_id] = c.external_id if c.external_id credentials_options[:sts_endpoint_url] = c.sts_endpoint_url if c.sts_endpoint_url if @region and c.sts_http_proxy and c.sts_endpoint_url credentials_options[:client] = Aws::STS::Client.new(region: @region, http_proxy: c.sts_http_proxy, endpoint: c.sts_endpoint_url) elsif c.sts_http_proxy and c.sts_endpoint_url credentials_options[:client] = Aws::STS::Client.new(http_proxy: c.sts_http_proxy, endpoint: c.sts_endpoint_url) elsif @region and c.sts_http_proxy credentials_options[:client] = Aws::STS::Client.new(region: @region, http_proxy: c.sts_http_proxy) elsif @region and c.sts_endpoint_url credentials_options[:client] = Aws::STS::Client.new(region: @region, endpoint: c.sts_endpoint_url) elsif c.sts_http_proxy credentials_options[:client] = Aws::STS::Client.new(http_proxy: c.sts_http_proxy) elsif c.sts_endpoint_url credentials_options[:client] = Aws::STS::Client.new(endpoint: c.sts_endpoint_url) elsif @region credentials_options[:client] = Aws::STS::Client.new(region: @region) end options[:credentials] = Aws::AssumeRoleCredentials.new(credentials_options) when @web_identity_credentials c = @web_identity_credentials credentials_options[:role_arn] = c.role_arn credentials_options[:role_session_name] = c.role_session_name credentials_options[:web_identity_token_file] = c.web_identity_token_file credentials_options[:policy] = c.policy if c.policy credentials_options[:duration_seconds] = c.duration_seconds if c.duration_seconds if @region credentials_options[:client] = Aws::STS::Client.new(:region => @region) end options[:credentials] = Aws::AssumeRoleWebIdentityCredentials.new(credentials_options) when @instance_profile_credentials c = @instance_profile_credentials credentials_options[:retries] = c.retries if c.retries credentials_options[:ip_address] = c.ip_address if c.ip_address credentials_options[:port] = c.port if c.port credentials_options[:http_open_timeout] = c.http_open_timeout if c.http_open_timeout credentials_options[:http_read_timeout] = c.http_read_timeout if c.http_read_timeout if ENV["AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"] options[:credentials] = Aws::ECSCredentials.new(credentials_options) else options[:credentials] = Aws::InstanceProfileCredentials.new(credentials_options) end when @shared_credentials c = @shared_credentials credentials_options[:path] = c.path if c.path credentials_options[:profile_name] = c.profile_name if c.profile_name options[:credentials] = Aws::SharedCredentials.new(credentials_options) when @process_credentials if Gem::Version.new(Aws::CORE_GEM_VERSION) < Gem::Version.new('3.24.0') raise Fluent::ConfigError, "Config process_credentials requires aws-sdk-core >= 3.24.0. Found aws-sdk-core #{Aws::CORE_GEM_VERSION} instead." end c = @process_credentials process = c.process options[:credentials] = Aws::ProcessCredentials.new(process) else # Use default credentials # See http://docs.aws.amazon.com/sdkforruby/api/Aws/S3/Client.html end options end