class Fluent::HRForecastOutput

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_hrforecast.rb, line 6
def initialize
  super
  require 'net/http'
  require 'uri'
  require 'resolve/hostname'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_hrforecast.rb, line 44
def configure(conf)
  super

  if @hrfapi_url !~ %r|/api/\z|
    raise Fluent::ConfigError, "hrfapi_url must end with /api/"
  end
  if @graph_path !~ %r|\A[^/]+/[^/]+/[^/]+\z|
    raise Fluent::ConfigError, "graph_path #{@graph_path}  must be like 'service/section/${tag}_${key_name}'"
  end

  if @name_keys.nil? and @name_key_pattern.nil?
    raise Fluent::ConfigError, "missing both of name_keys and name_key_pattern"
  end
  if not @name_keys.nil? and not @name_key_pattern.nil?
    raise Fluent::ConfigError, "cannot specify both of name_keys and name_key_pattern"
  end

  if not @datetime_key.nil? and @datetime_key_format.nil?
    raise Fluent::ConfigError, "missing datetime_key_format"
  end

  url = URI.parse(@hrfapi_url)
  @host = url.host
  @port = url.port

  if @name_keys
    @name_keys = Hash[
      @name_keys.split(',').map{|k|
        k.split('=>',2).tap{|kv|
          kv.push(kv[0]) if kv.size == 1
        }
      }
    ]
  end

  if @name_key_pattern
    @name_key_pattern = Regexp.new(@name_key_pattern)
  end

  if @remove_prefix
    @removed_prefix_string = @remove_prefix + '.'
    @removed_length = @removed_prefix_string.length
  end

  @auth = case @authentication
          when 'basic' then :basic
          else
            :none
          end
  @resolver = Resolve::Hostname.new(:system_resolver => true)
end
decide_time(time, record) click to toggle source
# File lib/fluent/plugin/out_hrforecast.rb, line 210
def decide_time(time, record)
  if @datetime_key && record[@datetime_key]
    time = Time.strptime(record[@datetime_key], @datetime_key_format)
  end
  time
end
emit(tag, es, chain) click to toggle source
# File lib/fluent/plugin/out_hrforecast.rb, line 217
def emit(tag, es, chain)
  events = []
  if @name_keys
    es.each {|time,record|
      time = decide_time(time, record)
      @name_keys.each {|key, name|
        if value = record[key]
          events.push({:tag => tag, :name => name, :value => value, :time => time})
        end
      }
    }
  else # for name_key_pattern
    es.each {|time,record|
      time = decide_time(time, record)
      record.keys.each {|key|
        if @name_key_pattern.match(key) and record[key]
          name = Regexp.last_match(1) || key
          events.push({:tag => tag, :name => name, :value => record[key], :time => time})
        end
      }
    }
  end

  if @post_thread
    @post_thread.queue << events
  else
    begin
      post_events(events)
    rescue => e
      log.warn "HTTP POST Error occures to HRforecast server", :error_class => e.class, :error => e.message
      raise if @retry
    end
  end

  chain.next
end
format_url(tag, name) click to toggle source
# File lib/fluent/plugin/out_hrforecast.rb, line 150
def format_url(tag, name)
  graph_path = @graph_path.gsub(/(\${[_a-z]+})/, placeholder_mapping(tag, name))
  return @hrfapi_url + URI.escape(graph_path)
end
make_http_connection() click to toggle source
# File lib/fluent/plugin/out_hrforecast.rb, line 155
def make_http_connection()
  http = Net::HTTP.new(@resolver.getaddress(@host), @port)
  if @timeout
    http.open_timeout = @timeout
    http.read_timeout = @timeout
  end
  if @ssl
    http.use_ssl = true
    unless @verify_ssl
      http.verify_mode = OpenSSL::SSL::VERIFY_NONE
    end
  end
  http
end
make_post_request(tag, name, value, time) click to toggle source
# File lib/fluent/plugin/out_hrforecast.rb, line 170
def make_post_request(tag, name, value, time)
  url = URI.parse(format_url(tag,name))
  req = Net::HTTP::Post.new(url.path)
  if @auth and @auth == :basic
    req.basic_auth(@username, @password)
  end
  req['Host'] = url.host
  req['Connection'] = 'Keep-Alive' if @keepalive
  req.set_form_data({
    'number' => @enable_float_number ? value.to_f : value.to_i,
    'datetime' => Time.at(time).strftime(@datetime_format),
  })
  req
end
placeholder_mapping(tag, name) click to toggle source
# File lib/fluent/plugin/out_hrforecast.rb, line 142
def placeholder_mapping(tag, name)
  if @remove_prefix and
      ( (tag.start_with?(@removed_prefix_string) and tag.length > @removed_length) or tag == @remove_prefix)
    tag = tag[@removed_length..-1]
  end
  {'${tag}' => tag, '${key_name}' => name}
end
post_events(events) click to toggle source
# File lib/fluent/plugin/out_hrforecast.rb, line 185
def post_events(events)
  return if events.size < 1

  requests = events.map do |e|
    make_post_request(e[:tag], e[:name], e[:value], e[:time])
  end

  http = make_http_connection()
  requests.each do |req|
    begin
      http.start unless http.started?
      res = http.request(req)
      unless res and res.is_a?(Net::HTTPSuccess)
        log.warn "failed to post to HRforecast: #{host}:#{port}#{req.path}, post_data: #{req.body} code: #{res && res.code}"
      end
    rescue IOError, EOFError, Errno::ECONNRESET, Errno::ETIMEDOUT, SystemCallError
      log.warn "net/http POST raises exception: #{$!.class}, '#{$!.message}'"
      http.finish if http.started?
    end
    if not @keepalive and http.started?
      http.finish
    end
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_hrforecast.rb, line 135
def shutdown
  if @post_thread
    @post_thread.shutdown
  end
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_hrforecast.rb, line 126
def start
  super

  @post_thread = nil
  if @background_post
    @post_thread = PostThread.new(self)
  end
end