class Fluent::HttpShadowOutput
Constants
- ERB_REGEXP
- PLACEHOLDER_REGEXP
- SUPPORT_PROTOCOLS
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_http_shadow.rb, line 8 def initialize super require 'erb' require 'typhoeus' require "addressable/uri" require 'string/scrub' if RUBY_VERSION.to_f < 2.1 end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_http_shadow.rb, line 35 def configure(conf) super if @host.nil? && @host_hash.nil? raise ConfigError, "out_http_shadow: required to @host or @host_hash." end end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_http_shadow.rb, line 59 def format(tag, time, record) [tag, time, record].to_msgpack end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_http_shadow.rb, line 55 def shutdown super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_http_shadow.rb, line 42 def start super @path_format = ERB.new(@path_format.gsub(PLACEHOLDER_REGEXP, ERB_REGEXP)) @protocol_format = ERB.new(@protocol_format.gsub(PLACEHOLDER_REGEXP, ERB_REGEXP)) @headers = get_formatter(@header_hash) @cookies = get_formatter(@cookie_hash) if @no_send_header_pattern @no_send_header_pattern = /#{@no_send_header_pattern}/ end end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_http_shadow.rb, line 63 def write(chunk) records = [] chunk.msgpack_each do |tag, time, record| records << record end sampling_size = (records.size * (@rate * 0.01)).to_i if @rate > 100 orig_records = records.dup loop do records.concat(orig_records) break if sampling_size < records.size end end send_request_parallel(records.first(sampling_size)) end
Private Instance Methods
get_formatter(hash)
click to toggle source
# File lib/fluent/plugin/out_http_shadow.rb, line 120 def get_formatter(hash) formatter = {} return formatter unless hash hash.each do |k, v| format = v.gsub(PLACEHOLDER_REGEXP, ERB_REGEXP) formatter[k] = ERB.new(format) end formatter end
get_header(record)
click to toggle source
# File lib/fluent/plugin/out_http_shadow.rb, line 142 def get_header(record) header = {} @headers.each do |k, v| value = v.result(binding) if @no_send_header_pattern header[k] = value unless @no_send_header_pattern =~ value else header[k] = value end end header['Cookie'] = get_cookie_string(record) if @cookie_hash header end
get_request(host, record)
click to toggle source
# File lib/fluent/plugin/out_http_shadow.rb, line 94 def get_request(host, record) method = (record[@method_key] || 'get').downcase.to_sym path = @path_format.result(binding) path = replace_string(path) if @replace_hash protocol = @protocol_format.result(binding) protocol = SUPPORT_PROTOCOLS.include?(protocol) ? protocol : 'http' base_url = "#{protocol}://" + host url = base_url + path uri = Addressable::URI.parse(url) params = uri.query_values params.merge(record[@params_key]) unless record[@params_key].nil? params = replace_params(params) if @replace_hash && params option = { timeout: @timeout, followlocation: true, method: method, params: params, headers: get_header(record) } option[:userpwd] = "#{@username}:#{@password}" if @username Typhoeus::Request.new(base_url + uri.path, option) end
rate_per_method(method)
click to toggle source
# File lib/fluent/plugin/out_http_shadow.rb, line 175 def rate_per_method(method) return true unless @rate_per_method_hash rate_per_method = @rate_per_method_hash[method.to_s] || 100 return Random.rand(100) < rate_per_method end
replace_params(params)
click to toggle source
# File lib/fluent/plugin/out_http_shadow.rb, line 138 def replace_params(params) Hash[params.map { |k,v| [k, replace_string(v)] }] end
replace_string(str)
click to toggle source
# File lib/fluent/plugin/out_http_shadow.rb, line 130 def replace_string(str) return nil if str.nil? @replace_hash.each do |k, v| str = str.scrub.gsub(k, v) end str end
send_request_parallel(records)
click to toggle source
# File lib/fluent/plugin/out_http_shadow.rb, line 81 def send_request_parallel(records) hydra = Typhoeus::Hydra.new(max_concurrency: @max_concurrency) records.each do |record| host = @host || @host_hash[record[@host_key]] next if host.nil? request = get_request(host, record) method = request.options[:method] next unless supported?(method) hydra.queue(request) if rate_per_method(method) end hydra.run end
supported?(method)
click to toggle source
# File lib/fluent/plugin/out_http_shadow.rb, line 170 def supported?(method) return true unless @support_methods return @support_methods.include?(method.to_s) end