class Fluent::HRForecastOutput
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_hrforecast.rb, line 6 def initialize super require 'net/http' require 'uri' require 'resolve/hostname' end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_hrforecast.rb, line 44 def configure(conf) super if @hrfapi_url !~ %r|/api/\z| raise Fluent::ConfigError, "hrfapi_url must end with /api/" end if @graph_path !~ %r|\A[^/]+/[^/]+/[^/]+\z| raise Fluent::ConfigError, "graph_path #{@graph_path} must be like 'service/section/${tag}_${key_name}'" end if @name_keys.nil? and @name_key_pattern.nil? raise Fluent::ConfigError, "missing both of name_keys and name_key_pattern" end if not @name_keys.nil? and not @name_key_pattern.nil? raise Fluent::ConfigError, "cannot specify both of name_keys and name_key_pattern" end if not @datetime_key.nil? and @datetime_key_format.nil? raise Fluent::ConfigError, "missing datetime_key_format" end url = URI.parse(@hrfapi_url) @host = url.host @port = url.port if @name_keys @name_keys = Hash[ @name_keys.split(',').map{|k| k.split('=>',2).tap{|kv| kv.push(kv[0]) if kv.size == 1 } } ] end if @name_key_pattern @name_key_pattern = Regexp.new(@name_key_pattern) end if @remove_prefix @removed_prefix_string = @remove_prefix + '.' @removed_length = @removed_prefix_string.length end @auth = case @authentication when 'basic' then :basic else :none end @resolver = Resolve::Hostname.new(:system_resolver => true) end
decide_time(time, record)
click to toggle source
# File lib/fluent/plugin/out_hrforecast.rb, line 210 def decide_time(time, record) if @datetime_key && record[@datetime_key] time = Time.strptime(record[@datetime_key], @datetime_key_format) end time end
emit(tag, es, chain)
click to toggle source
# File lib/fluent/plugin/out_hrforecast.rb, line 217 def emit(tag, es, chain) events = [] if @name_keys es.each {|time,record| time = decide_time(time, record) @name_keys.each {|key, name| if value = record[key] events.push({:tag => tag, :name => name, :value => value, :time => time}) end } } else # for name_key_pattern es.each {|time,record| time = decide_time(time, record) record.keys.each {|key| if @name_key_pattern.match(key) and record[key] name = Regexp.last_match(1) || key events.push({:tag => tag, :name => name, :value => record[key], :time => time}) end } } end if @post_thread @post_thread.queue << events else begin post_events(events) rescue => e log.warn "HTTP POST Error occures to HRforecast server", :error_class => e.class, :error => e.message raise if @retry end end chain.next end
format_url(tag, name)
click to toggle source
# File lib/fluent/plugin/out_hrforecast.rb, line 150 def format_url(tag, name) graph_path = @graph_path.gsub(/(\${[_a-z]+})/, placeholder_mapping(tag, name)) return @hrfapi_url + URI.escape(graph_path) end
make_http_connection()
click to toggle source
# File lib/fluent/plugin/out_hrforecast.rb, line 155 def make_http_connection() http = Net::HTTP.new(@resolver.getaddress(@host), @port) if @timeout http.open_timeout = @timeout http.read_timeout = @timeout end if @ssl http.use_ssl = true unless @verify_ssl http.verify_mode = OpenSSL::SSL::VERIFY_NONE end end http end
make_post_request(tag, name, value, time)
click to toggle source
# File lib/fluent/plugin/out_hrforecast.rb, line 170 def make_post_request(tag, name, value, time) url = URI.parse(format_url(tag,name)) req = Net::HTTP::Post.new(url.path) if @auth and @auth == :basic req.basic_auth(@username, @password) end req['Host'] = url.host req['Connection'] = 'Keep-Alive' if @keepalive req.set_form_data({ 'number' => @enable_float_number ? value.to_f : value.to_i, 'datetime' => Time.at(time).strftime(@datetime_format), }) req end
placeholder_mapping(tag, name)
click to toggle source
# File lib/fluent/plugin/out_hrforecast.rb, line 142 def placeholder_mapping(tag, name) if @remove_prefix and ( (tag.start_with?(@removed_prefix_string) and tag.length > @removed_length) or tag == @remove_prefix) tag = tag[@removed_length..-1] end {'${tag}' => tag, '${key_name}' => name} end
post_events(events)
click to toggle source
# File lib/fluent/plugin/out_hrforecast.rb, line 185 def post_events(events) return if events.size < 1 requests = events.map do |e| make_post_request(e[:tag], e[:name], e[:value], e[:time]) end http = make_http_connection() requests.each do |req| begin http.start unless http.started? res = http.request(req) unless res and res.is_a?(Net::HTTPSuccess) log.warn "failed to post to HRforecast: #{host}:#{port}#{req.path}, post_data: #{req.body} code: #{res && res.code}" end rescue IOError, EOFError, Errno::ECONNRESET, Errno::ETIMEDOUT, SystemCallError log.warn "net/http POST raises exception: #{$!.class}, '#{$!.message}'" http.finish if http.started? end if not @keepalive and http.started? http.finish end end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_hrforecast.rb, line 135 def shutdown if @post_thread @post_thread.shutdown end super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_hrforecast.rb, line 126 def start super @post_thread = nil if @background_post @post_thread = PostThread.new(self) end end