class Fluent::Plugin::GrowthForecastOutput

Constants

DEFAULT_GRAPH_PATH

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_growthforecast.rb, line 91
def configure(conf)
  super

  unless @chunk_key_tag
    raise Fluent::ConfigError, "configure buffer chunk_keys with tag"
  end

  if @gfapi_url !~ /\/api\/\Z/
    raise Fluent::ConfigError, "gfapi_url must end with /api/"
  end
  if not @graph_path.nil? and @graph_path !~ /^[^\/]+\/[^\/]+\/[^\/]+$/
    raise Fluent::ConfigError, "graph_path must be like '${service}/${section}/${tag}_${key_name}'"
  end

  if @name_keys.nil? and @name_key_pattern.nil?
    raise Fluent::ConfigError, "missing both of name_keys and name_key_pattern"
  end
  if not @name_keys.nil? and not @name_key_pattern.nil?
    raise Fluent::ConfigError, "cannot specify both of name_keys and name_key_pattern"
  end
  if not @graphs.nil? and @name_keys.nil?
    raise Fluent::ConfigError, "graphs must be specified with name_keys"
  end

  if @name_keys
    @name_keys = @name_keys.split(',')
  end
  if @name_key_pattern
    @name_key_pattern = Regexp.new(@name_key_pattern)
  end

  if @graphs
    @graphs = @graphs.split(',')
  end
  if @name_keys and @graphs and @name_keys.size != @graphs.size
    raise Fluent::ConfigError, "sizes of name_keys and graphs do not match"
  end

  @mode = case @mode
          when 'count' then :count
          when 'modified' then :modified
          else
            :gauge
          end

  @tag_for = case @tag_for
             when 'ignore' then :ignore
             when 'section' then :section
             when 'service' then :service
             else
               :name_prefix
             end
  if @graph_path.nil?
    if @tag_for != :section and @section.nil?
      raise Fluent::ConfigError, "section parameter is needed when tag_for is not 'section'"
    end
    if @tag_for != :service and @service.nil?
      raise Fluent::ConfigError, "service parameter is needed when tag_for is not 'service'"
    end
    @graph_path = DEFAULT_GRAPH_PATH[@tag_for]
  end

  if @remove_prefix
    @removed_prefix_string = @remove_prefix + '.'
    @removed_length = @removed_prefix_string.length
  end

  @auth = case @authentication
          when 'basic' then :basic
          else
            :none
          end
  @resolver = Resolve::Hostname.new(system_resolver: true)
end
connect_to(tag, name) click to toggle source
# File lib/fluent/plugin/out_growthforecast.rb, line 192
def connect_to(tag, name)
  url = URI.parse(format_url(tag,name))
  return url.host, url.port
end
escape(param) click to toggle source
# File lib/fluent/plugin/out_growthforecast.rb, line 182
def escape(param)
  escaped ||= param
  escaped = CGI.escape(param) if param
end
format_url(tag, name) click to toggle source
# File lib/fluent/plugin/out_growthforecast.rb, line 187
def format_url(tag, name)
  graph_path = @graph_path.gsub(/(\${[_a-z]+})/, placeholder_mapping(tag, name))
  return @gfapi_url + graph_path
end
gf_events(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_growthforecast.rb, line 284
def gf_events(tag, time, record)
  events = []
  if @name_keys
    @name_keys.each_with_index do |name, i|
      if value = record[name]
        events.push({tag: tag, name: (@graphs ? @graphs[i] : name), value: value})
      end
    end
  else # for name_key_pattern
    record.keys.each do |key|
      if @name_key_pattern.match(key) and record[key]
        events.push({tag: tag, name: key, value: record[key]})
      end
    end
  end
  events
end
gf_events_from_es(tag, es) click to toggle source
# File lib/fluent/plugin/out_growthforecast.rb, line 302
def gf_events_from_es(tag, es)
  events = []
  es.each do |time, record|
    events.concat(gf_events(tag, time, record))
  end
  events
end
http_connection(host, port) click to toggle source
# File lib/fluent/plugin/out_growthforecast.rb, line 197
def http_connection(host, port)
  http = Net::HTTP.new(@resolver.getaddress(host), port)
  if @timeout
    http.open_timeout = @timeout
    http.read_timeout = @timeout
  end
  if @ssl
    http.use_ssl = true
    unless @verify_ssl
      http.verify_mode = OpenSSL::SSL::VERIFY_NONE
    end
  end
  http
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_growthforecast.rb, line 166
def multi_workers_ready?
  true
end
placeholder_mapping(tag, name) click to toggle source
# File lib/fluent/plugin/out_growthforecast.rb, line 174
def placeholder_mapping(tag, name)
  if @remove_prefix and
      ( (tag.start_with?(@removed_prefix_string) and tag.length > @removed_length) or tag == @remove_prefix)
    tag = tag[@removed_length..-1]
  end
  {'${service}' => escape(@service), '${section}' => escape(@section), '${tag}' => escape(tag), '${key_name}' => escape(name)}
end
post(tag, name, value) click to toggle source
# File lib/fluent/plugin/out_growthforecast.rb, line 227
def post(tag, name, value)
  url = format_url(tag,name)
  res = nil
  begin
    host,port = connect_to(tag, name)
    req = post_request(tag, name, value)
    http = http_connection(host, port)
    res = http.start {|client| client.request(req) }
  rescue IOError, EOFError, SystemCallError
    # server didn't respond
    log.warn "net/http POST raises exception: #{$!.class}, '#{$!.message}'"
  end
  unless res and res.is_a?(Net::HTTPSuccess)
    log.warn "failed to post to growthforecast: #{url}, number: #{value}, code: #{res && res.code}"
  end
end
post_events(events) click to toggle source
# File lib/fluent/plugin/out_growthforecast.rb, line 274
def post_events(events)
  if @keepalive
    post_keepalive(events)
  else
    events.each do |event|
      post(event[:tag], event[:name], event[:value])
    end
  end
end
post_keepalive(events) click to toggle source
# File lib/fluent/plugin/out_growthforecast.rb, line 244
def post_keepalive(events) # [{:tag=>'',:name=>'',:value=>X}]
  return if events.size < 1

  # gf host/port is same for all events (host is from configuration)
  host,port = connect_to(events.first[:tag], events.first[:name])

  requests = events.map{|e| post_request(e[:tag], e[:name], e[:value])}

  http = nil
  requests.each do |req|
    begin
      unless http
        http = http_connection(host, port)
        http.start
      end
      res = http.request(req)
      unless res and res.is_a?(Net::HTTPSuccess)
        log.warn "failed to post to growthforecast: #{host}:#{port}#{req.path}, post_data: #{req.body} code: #{res && res.code}"
      end
    rescue IOError, EOFError, Errno::ECONNRESET, Errno::ETIMEDOUT, SystemCallError
      log.warn "net/http keepalive POST raises exception", error: $!
      http.finish rescue nil # ignore all errors for connection with error
      http = nil
    end
  end
  if http
    http.finish rescue nil
  end
end
post_request(tag, name, value) click to toggle source
# File lib/fluent/plugin/out_growthforecast.rb, line 212
def post_request(tag, name, value)
  url = URI.parse(format_url(tag,name))
  req = Net::HTTP::Post.new(url.path)
  if @auth and @auth == :basic
    req.basic_auth(@username, @password)
  end
  req['Host'] = url.host
  if @keepalive
    req['Connection'] = 'Keep-Alive'
  end
  value = @enable_float_number ? value.to_f : value.to_i
  req.set_form_data({'number' => value, 'mode' => @mode.to_s})
  req
end
prefer_buffered_processing() click to toggle source
# File lib/fluent/plugin/out_growthforecast.rb, line 170
def prefer_buffered_processing
  @background_post
end
process(tag, es) click to toggle source
# File lib/fluent/plugin/out_growthforecast.rb, line 310
def process(tag, es)
  events = gf_events_from_es(tag, es)
  begin
    post_events(events)
  rescue => e
    log.warn "HTTP POST Error occurs to growthforecast server, ignored (use background_post for retries)", error: e
  end
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_growthforecast.rb, line 319
def write(chunk)
  tag = chunk.metadata.tag
  events = []
  chunk.each do |time, record|
    events.concat(gf_events(tag, time, record))
  end
  begin
    post_events(events)
  rescue => e
    log.warn "HTTP POST Error occures to growthforecast server", error: e
    raise if @retry
  end
end