class Fluent::Plugin::HTTPOutput
Constants
- DEFAULT_BUFFER_TYPE
- DEFAULT_FORMATTER
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_http.rb, line 15 def initialize super end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_http.rb, line 61 def configure(conf) compat_parameters_convert(conf, :buffer, :formatter) super @ssl_verify_mode = if @ssl_no_verify OpenSSL::SSL::VERIFY_NONE else OpenSSL::SSL::VERIFY_PEER end @ca_file = @cacert_file @last_request_time = nil raise Fluent::ConfigError, "'tag' in chunk_keys is required." if !@chunk_key_tag && @buffered http_methods = [:get, :put, :post, :delete] @http_method = if http_methods.include? @http_method.intern @http_method.intern else :post end puts @endpoint_url @uri = URI.parse(@endpoint_url) # ssl_verify = @ssl_no_verify url = @uri.scheme + "://" + @uri.host + ":" + @uri.port.to_s @adapter = Faraday.new(url: url, ssl: {verify:false} ) do |f| f.request :retry, max: 5, interval: 1, interval_randomness: 0.5, backoff_factor: 2, methods: @http_method, exceptions: %w(Errno::ETIMEDOUT Faraday::TimeoutError Faraday::Error::TimeoutError Net::ReadTimeout).freeze f.adapter :net_http_persistent end if @formatter_config = conf.elements('format').first @formatter = formatter_create end end
create_request(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 155 def create_request(tag, time, record) url = format_url(tag, time, record) uri = URI.parse(url) req = Net::HTTP.const_get(@http_method.to_s.capitalize).new(uri.request_uri) set_body(req, tag, time, record) set_header(req, tag, time, record) return req, uri end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 204 def format(tag, time, record) [time, record].to_msgpack end
format_url(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 112 def format_url(tag, time, record) @endpoint_url end
formatted_to_msgpack_binary?()
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 208 def formatted_to_msgpack_binary? true end
handle_record(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 192 def handle_record(tag, time, record) if @formatter_config record = @formatter.format(tag, time, record) end # req, uri = create_request(tag, time, record) send_request(record) end
http_opts(uri)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 164 def http_opts(uri) opts = { :use_ssl => uri.scheme == 'https' } opts[:verify_mode] = @ssl_verify_mode if opts[:use_ssl] opts[:ca_file] = File.join(@ca_file) if File.file?(@ca_file) opts end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 212 def multi_workers_ready? true end
prefer_buffered_processing()
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 200 def prefer_buffered_processing @buffered end
process(tag, es)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 216 def process(tag, es) es.each do |time, record| handle_record(tag, time, record) end end
proxies()
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 173 def proxies ENV['HTTPS_PROXY'] || ENV['HTTP_PROXY'] || ENV['http_proxy'] || ENV['https_proxy'] end
send_request(record)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 177 def send_request(record) is_rate_limited = (@rate_limit_msec != 0 and not @last_request_time.nil?) if is_rate_limited and ((Time.now.to_f - @last_request_time) * 1000.0 < @rate_limit_msec) log.info('Dropped request due to rate limiting') return end _ = @adapter.post(@uri.path) do |request| request.headers['Content-Type'] = 'application/json' request.body = Yajl.dump(record) request.options.timeout = 60 request.options.open_timeout = 60 end end
set_body(req, tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 116 def set_body(req, tag, time, record) if @serializer == :json set_json_body(req, record) elsif @serializer == :text set_text_body(req, record) elsif @serializer == :raw set_raw_body(req, record) else req.set_form_data(record) end req end
set_header(req, tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 129 def set_header(req, tag, time, record) if @custom_headers @custom_headers.each do |k,v| req[k] = v end req else req end end
set_json_body(req, data)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 140 def set_json_body(req, data) req.body = Yajl.dump(data) req['Content-Type'] = 'application/json' end
set_raw_body(req, data)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 150 def set_raw_body(req, data) req.body = data.to_s req['Content-Type'] = 'application/octet-stream' end
set_text_body(req, data)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 145 def set_text_body(req, data) req.body = data["message"] req['Content-Type'] = 'text/plain' end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_http.rb, line 108 def shutdown super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_http.rb, line 104 def start super end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 222 def write(chunk) tag = chunk.metadata.tag @endpoint_url = extract_placeholders(@endpoint_url, chunk.metadata) chunk.msgpack_each do |time, record| handle_record(tag, time, record) end end