class Fluent::Plugin::HTTPOutput

Constants

ConnectionCache

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::Compat::Output::new
# File lib/fluent/plugin/out_http.rb, line 121
def initialize
  super

  @uri = nil
  @proxy_uri = nil
  @formatter = nil

  @connection_cache = []
  @connection_cache_id_mutex = Mutex.new
  @connection_cache_next_id = 0
end

Public Instance Methods

close() click to toggle source
Calls superclass method Fluent::Plugin::Output#close
# File lib/fluent/plugin/out_http.rb, line 133
def close
  super

  @connection_cache.each {|entry| entry.conn.finish if entry.conn&.started? }
end
configure(conf) click to toggle source
Calls superclass method Fluent::Compat::Output#configure
# File lib/fluent/plugin/out_http.rb, line 139
def configure(conf)
  super

  @connection_cache = Array.new(actual_flush_thread_count, ConnectionCache.new("", nil)) if @reuse_connections

  if @retryable_response_codes.nil?
    log.warn('Status code 503 is going to be removed from default `retryable_response_codes` from fluentd v2. Please add it by yourself if you wish')
    @retryable_response_codes = [503]
  end

  @http_opt = setup_http_option
  @proxy_uri = URI.parse(@proxy) if @proxy
  @formatter = formatter_create
  @content_type = setup_content_type unless @content_type

  if @json_array
    if @formatter_configs.first[:@type] != "json"
      raise Fluent::ConfigError, "json_array option could be used with json formatter only"
    end
    define_singleton_method(:format, method(:format_json_array))
  end

  if @auth and @auth.method == :aws_sigv4
    begin
      require 'aws-sigv4'
      require 'aws-sdk-core'
    rescue LoadError
      raise Fluent::ConfigError, "The aws-sdk-core and aws-sigv4 gems are required for aws_sigv4 auth. Run: gem install aws-sdk-core -v '~> 3.191'"
    end

    raise Fluent::ConfigError, "aws_service is required for aws_sigv4 auth" unless @auth.aws_service != nil
    raise Fluent::ConfigError, "aws_region is required for aws_sigv4 auth" unless @auth.aws_region != nil

    if @auth.aws_role_arn == nil
      aws_credentials = Aws::CredentialProviderChain.new.resolve
    else
      aws_credentials = Aws::AssumeRoleCredentials.new(
        client: Aws::STS::Client.new(
          region: @auth.aws_region
        ),
        role_arn: @auth.aws_role_arn,
        role_session_name: "fluentd"
      )
    end

    @aws_signer = Aws::Sigv4::Signer.new(
      service: @auth.aws_service,
      region: @auth.aws_region,
      credentials_provider: aws_credentials
    )
  end
end
connection_cache_id_for_thread() click to toggle source
# File lib/fluent/plugin/out_http.rb, line 113
def connection_cache_id_for_thread
  Thread.current[connection_cache_id_thread_key]
end
connection_cache_id_for_thread=(id) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 117
def connection_cache_id_for_thread=(id)
  Thread.current[connection_cache_id_thread_key] = id
end
connection_cache_id_thread_key() click to toggle source
# File lib/fluent/plugin/out_http.rb, line 109
def connection_cache_id_thread_key
  "#{plugin_id}_connection_cache_id"
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 200
def format(tag, time, record)
  @formatter.format(tag, time, record)
end
format_json_array(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 204
def format_json_array(tag, time, record)
  @formatter.format(tag, time, record) << ","
end
formatted_to_msgpack_binary?() click to toggle source
# File lib/fluent/plugin/out_http.rb, line 196
def formatted_to_msgpack_binary?
  @formatter_configs.first[:@type] == 'msgpack'
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_http.rb, line 192
def multi_workers_ready?
  true
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 208
def write(chunk)
  uri = parse_endpoint(chunk)
  req = create_request(chunk, uri)

  log.debug { "#{@http_method.capitalize} data to #{uri.to_s} with chunk(#{dump_unique_id_hex(chunk.unique_id)})" }

  send_request(uri, req)
end

Private Instance Methods

create_request(chunk, uri) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 323
def create_request(chunk, uri)
  req = case @http_method
        when :post
          Net::HTTP::Post.new(uri.request_uri)
        when :put
          Net::HTTP::Put.new(uri.request_uri)
        end
  set_headers(req, uri, chunk)

  req.body = @json_array ? "[#{chunk.read.chop}]" : chunk.read

  if @compress == :gzip
    gz = Zlib::GzipWriter.new(StringIO.new)
    gz << req.body
    req.body = gz.close.string
  end

  # At least one authentication method requires the body and other headers, so the order of this call matters
  set_auth(req, uri)
  req
end
make_request(uri, req, &block) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 367
def make_request(uri, req, &block)
  if @proxy_uri
    Net::HTTP.start(uri.host, uri.port, @proxy_uri.host, @proxy_uri.port, @proxy_uri.user, @proxy_uri.password, @http_opt, &block)
  else
    Net::HTTP.start(uri.host, uri.port, @http_opt, &block)
  end
end
make_request_cached(uri, req) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 345
def make_request_cached(uri, req)
  id = self.connection_cache_id_for_thread
  if id.nil?
    @connection_cache_id_mutex.synchronize {
      id = @connection_cache_next_id
      @connection_cache_next_id += 1
    }
    self.connection_cache_id_for_thread = id
  end
  uri_str = uri.to_s
  if @connection_cache[id].uri != uri_str
    @connection_cache[id].conn.finish if @connection_cache[id].conn&.started?
    http =  if @proxy_uri
              Net::HTTP.start(uri.host, uri.port, @proxy_uri.host, @proxy_uri.port, @proxy_uri.user, @proxy_uri.password, @http_opt)
            else
              Net::HTTP.start(uri.host, uri.port, @http_opt)
            end
    @connection_cache[id] = ConnectionCache.new(uri_str, http)
  end
  @connection_cache[id].conn.request(req)
end
parse_endpoint(chunk) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 279
def parse_endpoint(chunk)
  endpoint = extract_placeholders(@endpoint, chunk)
  URI.parse(endpoint)
end
send_request(uri, req) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 375
def send_request(uri, req)
  res = if @reuse_connections
          make_request_cached(uri, req)
        else
          make_request(uri, req) { |http| http.request(req) }
        end

  if res.is_a?(Net::HTTPSuccess)
    log.debug { "#{res.code} #{res.message.rstrip}#{res.body.lstrip}" }
  else
    msg = "#{res.code} #{res.message.rstrip} #{res.body.lstrip}"

    if @retryable_response_codes.include?(res.code.to_i)
      raise RetryableResponse, msg
    end

    if @error_response_as_unrecoverable
      raise Fluent::UnrecoverableError, msg
    else
      log.error "got error response from '#{@http_method.capitalize} #{uri.to_s}' : #{msg}"
    end
  end
end
set_auth(req, uri) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 301
def set_auth(req, uri)
  return unless @auth

  if @auth.method == :basic
    req.basic_auth(@auth.username, @auth.password)
  elsif @auth.method == :aws_sigv4
    signature = @aws_signer.sign_request(
      http_method: req.method,
      url: uri.request_uri,
      headers: {
        'Content-Type' => @content_type,
        'Host' => uri.host
      },
      body: req.body
    )
    req.add_field('x-amz-date', signature.headers['x-amz-date'])
    req.add_field('x-amz-security-token', signature.headers['x-amz-security-token'])
    req.add_field('x-amz-content-sha256', signature.headers['x-amz-content-sha256'])
    req.add_field('authorization', signature.headers['authorization'])
  end
end
set_headers(req, uri, chunk) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 284
def set_headers(req, uri, chunk)
  if @headers
    @headers.each do |k, v|
      req[k] = v
    end
  end
  if @headers_from_placeholders
    @headers_from_placeholders.each do |k, v|
      req[k] = extract_placeholders(v, chunk)
    end
  end
  if @compress == :gzip
    req['Content-Encoding'] = "gzip"
  end
  req['Content-Type'] = @content_type
end
setup_content_type() click to toggle source
# File lib/fluent/plugin/out_http.rb, line 219
def setup_content_type
  case @formatter_configs.first[:@type]
  when 'json'
    @json_array ? 'application/json' : 'application/x-ndjson'
  when 'csv'
    'text/csv'
  when 'tsv', 'ltsv'
    'text/tab-separated-values'
  when 'msgpack'
    'application/x-msgpack'
  when 'out_file', 'single_value', 'stdout', 'hash'
    'text/plain'
  else
    raise Fluent::ConfigError, "can't determine Content-Type from formatter type. Set content_type parameter explicitly"
  end
end
setup_http_option() click to toggle source
# File lib/fluent/plugin/out_http.rb, line 236
def setup_http_option
  use_ssl = @endpoint.start_with?('https')
  opt = {
    open_timeout: @open_timeout,
    read_timeout: @read_timeout,
    ssl_timeout: @ssl_timeout,
    use_ssl: use_ssl
  }

  if use_ssl
    if @tls_ca_cert_path
      raise Fluent::ConfigError, "tls_ca_cert_path is wrong: #{@tls_ca_cert_path}" unless File.file?(@tls_ca_cert_path)
      opt[:ca_file] = @tls_ca_cert_path
    end
    if @tls_client_cert_path
      raise Fluent::ConfigError, "tls_client_cert_path is wrong: #{@tls_client_cert_path}" unless File.file?(@tls_client_cert_path)

      bundle = File.read(@tls_client_cert_path)
      bundle_certs = bundle.scan(/-----BEGIN CERTIFICATE-----(?:.|\n)+?-----END CERTIFICATE-----/)
      opt[:cert] = OpenSSL::X509::Certificate.new(bundle_certs[0])

      intermediate_certs = bundle_certs[1..-1]
      if intermediate_certs
        opt[:extra_chain_cert] = intermediate_certs.map { |cert| OpenSSL::X509::Certificate.new(cert) }
      end
    end
    if @tls_private_key_path
      raise Fluent::ConfigError, "tls_private_key_path is wrong: #{@tls_private_key_path}" unless File.file?(@tls_private_key_path)
      opt[:key] = OpenSSL::PKey.read(File.read(@tls_private_key_path), @tls_private_key_passphrase)
    end
    opt[:verify_mode] = case @tls_verify_mode
                        when :none
                          OpenSSL::SSL::VERIFY_NONE
                        when :peer
                          OpenSSL::SSL::VERIFY_PEER
                        end
    opt[:ciphers] = @tls_ciphers
    opt[:ssl_version] = @tls_version
  end

  opt
end