class Fluent::Plugin::HTTPOutput
Constants
- ConnectionCache
Public Class Methods
new()
click to toggle source
Calls superclass method
Fluent::Compat::Output::new
# File lib/fluent/plugin/out_http.rb, line 121 def initialize super @uri = nil @proxy_uri = nil @formatter = nil @connection_cache = [] @connection_cache_id_mutex = Mutex.new @connection_cache_next_id = 0 end
Public Instance Methods
close()
click to toggle source
Calls superclass method
Fluent::Plugin::Output#close
# File lib/fluent/plugin/out_http.rb, line 133 def close super @connection_cache.each {|entry| entry.conn.finish if entry.conn&.started? } end
configure(conf)
click to toggle source
Calls superclass method
Fluent::Compat::Output#configure
# File lib/fluent/plugin/out_http.rb, line 139 def configure(conf) super @connection_cache = Array.new(actual_flush_thread_count, ConnectionCache.new("", nil)) if @reuse_connections if @retryable_response_codes.nil? log.warn('Status code 503 is going to be removed from default `retryable_response_codes` from fluentd v2. Please add it by yourself if you wish') @retryable_response_codes = [503] end @http_opt = setup_http_option @proxy_uri = URI.parse(@proxy) if @proxy @formatter = formatter_create @content_type = setup_content_type unless @content_type if @json_array if @formatter_configs.first[:@type] != "json" raise Fluent::ConfigError, "json_array option could be used with json formatter only" end define_singleton_method(:format, method(:format_json_array)) end if @auth and @auth.method == :aws_sigv4 begin require 'aws-sigv4' require 'aws-sdk-core' rescue LoadError raise Fluent::ConfigError, "The aws-sdk-core and aws-sigv4 gems are required for aws_sigv4 auth. Run: gem install aws-sdk-core -v '~> 3.191'" end raise Fluent::ConfigError, "aws_service is required for aws_sigv4 auth" unless @auth.aws_service != nil raise Fluent::ConfigError, "aws_region is required for aws_sigv4 auth" unless @auth.aws_region != nil if @auth.aws_role_arn == nil aws_credentials = Aws::CredentialProviderChain.new.resolve else aws_credentials = Aws::AssumeRoleCredentials.new( client: Aws::STS::Client.new( region: @auth.aws_region ), role_arn: @auth.aws_role_arn, role_session_name: "fluentd" ) end @aws_signer = Aws::Sigv4::Signer.new( service: @auth.aws_service, region: @auth.aws_region, credentials_provider: aws_credentials ) end end
connection_cache_id_for_thread()
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 113 def connection_cache_id_for_thread Thread.current[connection_cache_id_thread_key] end
connection_cache_id_for_thread=(id)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 117 def connection_cache_id_for_thread=(id) Thread.current[connection_cache_id_thread_key] = id end
connection_cache_id_thread_key()
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 109 def connection_cache_id_thread_key "#{plugin_id}_connection_cache_id" end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 200 def format(tag, time, record) @formatter.format(tag, time, record) end
format_json_array(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 204 def format_json_array(tag, time, record) @formatter.format(tag, time, record) << "," end
formatted_to_msgpack_binary?()
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 196 def formatted_to_msgpack_binary? @formatter_configs.first[:@type] == 'msgpack' end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 192 def multi_workers_ready? true end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 208 def write(chunk) uri = parse_endpoint(chunk) req = create_request(chunk, uri) log.debug { "#{@http_method.capitalize} data to #{uri.to_s} with chunk(#{dump_unique_id_hex(chunk.unique_id)})" } send_request(uri, req) end
Private Instance Methods
create_request(chunk, uri)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 323 def create_request(chunk, uri) req = case @http_method when :post Net::HTTP::Post.new(uri.request_uri) when :put Net::HTTP::Put.new(uri.request_uri) end set_headers(req, uri, chunk) req.body = @json_array ? "[#{chunk.read.chop}]" : chunk.read if @compress == :gzip gz = Zlib::GzipWriter.new(StringIO.new) gz << req.body req.body = gz.close.string end # At least one authentication method requires the body and other headers, so the order of this call matters set_auth(req, uri) req end
make_request(uri, req, &block)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 367 def make_request(uri, req, &block) if @proxy_uri Net::HTTP.start(uri.host, uri.port, @proxy_uri.host, @proxy_uri.port, @proxy_uri.user, @proxy_uri.password, @http_opt, &block) else Net::HTTP.start(uri.host, uri.port, @http_opt, &block) end end
make_request_cached(uri, req)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 345 def make_request_cached(uri, req) id = self.connection_cache_id_for_thread if id.nil? @connection_cache_id_mutex.synchronize { id = @connection_cache_next_id @connection_cache_next_id += 1 } self.connection_cache_id_for_thread = id end uri_str = uri.to_s if @connection_cache[id].uri != uri_str @connection_cache[id].conn.finish if @connection_cache[id].conn&.started? http = if @proxy_uri Net::HTTP.start(uri.host, uri.port, @proxy_uri.host, @proxy_uri.port, @proxy_uri.user, @proxy_uri.password, @http_opt) else Net::HTTP.start(uri.host, uri.port, @http_opt) end @connection_cache[id] = ConnectionCache.new(uri_str, http) end @connection_cache[id].conn.request(req) end
parse_endpoint(chunk)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 279 def parse_endpoint(chunk) endpoint = extract_placeholders(@endpoint, chunk) URI.parse(endpoint) end
send_request(uri, req)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 375 def send_request(uri, req) res = if @reuse_connections make_request_cached(uri, req) else make_request(uri, req) { |http| http.request(req) } end if res.is_a?(Net::HTTPSuccess) log.debug { "#{res.code} #{res.message.rstrip}#{res.body.lstrip}" } else msg = "#{res.code} #{res.message.rstrip} #{res.body.lstrip}" if @retryable_response_codes.include?(res.code.to_i) raise RetryableResponse, msg end if @error_response_as_unrecoverable raise Fluent::UnrecoverableError, msg else log.error "got error response from '#{@http_method.capitalize} #{uri.to_s}' : #{msg}" end end end
set_auth(req, uri)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 301 def set_auth(req, uri) return unless @auth if @auth.method == :basic req.basic_auth(@auth.username, @auth.password) elsif @auth.method == :aws_sigv4 signature = @aws_signer.sign_request( http_method: req.method, url: uri.request_uri, headers: { 'Content-Type' => @content_type, 'Host' => uri.host }, body: req.body ) req.add_field('x-amz-date', signature.headers['x-amz-date']) req.add_field('x-amz-security-token', signature.headers['x-amz-security-token']) req.add_field('x-amz-content-sha256', signature.headers['x-amz-content-sha256']) req.add_field('authorization', signature.headers['authorization']) end end
set_headers(req, uri, chunk)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 284 def set_headers(req, uri, chunk) if @headers @headers.each do |k, v| req[k] = v end end if @headers_from_placeholders @headers_from_placeholders.each do |k, v| req[k] = extract_placeholders(v, chunk) end end if @compress == :gzip req['Content-Encoding'] = "gzip" end req['Content-Type'] = @content_type end
setup_content_type()
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 219 def setup_content_type case @formatter_configs.first[:@type] when 'json' @json_array ? 'application/json' : 'application/x-ndjson' when 'csv' 'text/csv' when 'tsv', 'ltsv' 'text/tab-separated-values' when 'msgpack' 'application/x-msgpack' when 'out_file', 'single_value', 'stdout', 'hash' 'text/plain' else raise Fluent::ConfigError, "can't determine Content-Type from formatter type. Set content_type parameter explicitly" end end
setup_http_option()
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 236 def setup_http_option use_ssl = @endpoint.start_with?('https') opt = { open_timeout: @open_timeout, read_timeout: @read_timeout, ssl_timeout: @ssl_timeout, use_ssl: use_ssl } if use_ssl if @tls_ca_cert_path raise Fluent::ConfigError, "tls_ca_cert_path is wrong: #{@tls_ca_cert_path}" unless File.file?(@tls_ca_cert_path) opt[:ca_file] = @tls_ca_cert_path end if @tls_client_cert_path raise Fluent::ConfigError, "tls_client_cert_path is wrong: #{@tls_client_cert_path}" unless File.file?(@tls_client_cert_path) bundle = File.read(@tls_client_cert_path) bundle_certs = bundle.scan(/-----BEGIN CERTIFICATE-----(?:.|\n)+?-----END CERTIFICATE-----/) opt[:cert] = OpenSSL::X509::Certificate.new(bundle_certs[0]) intermediate_certs = bundle_certs[1..-1] if intermediate_certs opt[:extra_chain_cert] = intermediate_certs.map { |cert| OpenSSL::X509::Certificate.new(cert) } end end if @tls_private_key_path raise Fluent::ConfigError, "tls_private_key_path is wrong: #{@tls_private_key_path}" unless File.file?(@tls_private_key_path) opt[:key] = OpenSSL::PKey.read(File.read(@tls_private_key_path), @tls_private_key_passphrase) end opt[:verify_mode] = case @tls_verify_mode when :none OpenSSL::SSL::VERIFY_NONE when :peer OpenSSL::SSL::VERIFY_PEER end opt[:ciphers] = @tls_ciphers opt[:ssl_version] = @tls_version end opt end