class Fluent::HttpShadowOutput

Constants

ERB_REGEXP
PLACEHOLDER_REGEXP
SUPPORT_PROTOCOLS

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_http_shadow.rb, line 8
def initialize
  super
  require 'erb'
  require 'typhoeus'
  require "addressable/uri"
  require 'string/scrub' if RUBY_VERSION.to_f < 2.1
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_http_shadow.rb, line 35
def configure(conf)
  super
  if @host.nil? && @host_hash.nil?
    raise ConfigError, "out_http_shadow: required to @host or @host_hash."
  end
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_http_shadow.rb, line 59
def format(tag, time, record)
  [tag, time, record].to_msgpack
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_http_shadow.rb, line 55
def shutdown
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_http_shadow.rb, line 42
def start
  super
  @path_format = ERB.new(@path_format.gsub(PLACEHOLDER_REGEXP, ERB_REGEXP))
  @protocol_format = ERB.new(@protocol_format.gsub(PLACEHOLDER_REGEXP, ERB_REGEXP))

  @headers = get_formatter(@header_hash)
  @cookies = get_formatter(@cookie_hash)

  if @no_send_header_pattern
    @no_send_header_pattern = /#{@no_send_header_pattern}/
  end
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_http_shadow.rb, line 63
def write(chunk)
  records = []
  chunk.msgpack_each do |tag, time, record|
    records << record
  end
  sampling_size = (records.size * (@rate * 0.01)).to_i
  if @rate > 100
    orig_records = records.dup
    loop do
      records.concat(orig_records)
      break if sampling_size < records.size
    end
  end
  send_request_parallel(records.first(sampling_size))
end

Private Instance Methods

get_formatter(hash) click to toggle source
# File lib/fluent/plugin/out_http_shadow.rb, line 120
def get_formatter(hash)
  formatter = {}
  return formatter unless hash
  hash.each do |k, v|
    format = v.gsub(PLACEHOLDER_REGEXP, ERB_REGEXP)
    formatter[k] = ERB.new(format)
  end
  formatter
end
get_header(record) click to toggle source
# File lib/fluent/plugin/out_http_shadow.rb, line 142
def get_header(record)
  header = {}
  @headers.each do |k, v|
    value = v.result(binding)
    if @no_send_header_pattern
      header[k] = value unless @no_send_header_pattern =~ value
    else
      header[k] = value
    end
  end
  header['Cookie'] = get_cookie_string(record) if @cookie_hash
  header
end
get_request(host, record) click to toggle source
# File lib/fluent/plugin/out_http_shadow.rb, line 94
def get_request(host, record)
  method = (record[@method_key] || 'get').downcase.to_sym
  path = @path_format.result(binding)
  path = replace_string(path) if @replace_hash
  protocol = @protocol_format.result(binding)
  protocol = SUPPORT_PROTOCOLS.include?(protocol) ? protocol : 'http'

  base_url = "#{protocol}://" + host
  url = base_url + path
  uri = Addressable::URI.parse(url)
  params = uri.query_values
  params.merge(record[@params_key]) unless record[@params_key].nil?
  params = replace_params(params) if @replace_hash && params

  option = {
    timeout: @timeout,
    followlocation: true,
    method: method,
    params: params,
    headers: get_header(record)
  }
  option[:userpwd] = "#{@username}:#{@password}" if @username

  Typhoeus::Request.new(base_url + uri.path, option)
end
rate_per_method(method) click to toggle source
# File lib/fluent/plugin/out_http_shadow.rb, line 175
def rate_per_method(method)
  return true unless @rate_per_method_hash

  rate_per_method = @rate_per_method_hash[method.to_s] || 100
  return Random.rand(100) < rate_per_method
end
replace_params(params) click to toggle source
# File lib/fluent/plugin/out_http_shadow.rb, line 138
def replace_params(params)
  Hash[params.map { |k,v| [k, replace_string(v)] }]
end
replace_string(str) click to toggle source
# File lib/fluent/plugin/out_http_shadow.rb, line 130
def replace_string(str)
  return nil if str.nil?
  @replace_hash.each do |k, v|
    str = str.scrub.gsub(k, v)
  end
  str
end
send_request_parallel(records) click to toggle source
# File lib/fluent/plugin/out_http_shadow.rb, line 81
def send_request_parallel(records)
  hydra = Typhoeus::Hydra.new(max_concurrency: @max_concurrency)
  records.each do |record|
    host = @host || @host_hash[record[@host_key]]
    next if host.nil?
    request = get_request(host, record)
    method = request.options[:method]
    next unless supported?(method)
    hydra.queue(request) if rate_per_method(method)
  end
  hydra.run
end
supported?(method) click to toggle source
# File lib/fluent/plugin/out_http_shadow.rb, line 170
def supported?(method)
  return true unless @support_methods
  return @support_methods.include?(method.to_s)
end