class Fluent::Plugin::GrowthForecastOutput
Constants
- DEFAULT_GRAPH_PATH
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_growthforecast.rb, line 91 def configure(conf) super unless @chunk_key_tag raise Fluent::ConfigError, "configure buffer chunk_keys with tag" end if @gfapi_url !~ /\/api\/\Z/ raise Fluent::ConfigError, "gfapi_url must end with /api/" end if not @graph_path.nil? and @graph_path !~ /^[^\/]+\/[^\/]+\/[^\/]+$/ raise Fluent::ConfigError, "graph_path must be like '${service}/${section}/${tag}_${key_name}'" end if @name_keys.nil? and @name_key_pattern.nil? raise Fluent::ConfigError, "missing both of name_keys and name_key_pattern" end if not @name_keys.nil? and not @name_key_pattern.nil? raise Fluent::ConfigError, "cannot specify both of name_keys and name_key_pattern" end if not @graphs.nil? and @name_keys.nil? raise Fluent::ConfigError, "graphs must be specified with name_keys" end if @name_keys @name_keys = @name_keys.split(',') end if @name_key_pattern @name_key_pattern = Regexp.new(@name_key_pattern) end if @graphs @graphs = @graphs.split(',') end if @name_keys and @graphs and @name_keys.size != @graphs.size raise Fluent::ConfigError, "sizes of name_keys and graphs do not match" end @mode = case @mode when 'count' then :count when 'modified' then :modified else :gauge end @tag_for = case @tag_for when 'ignore' then :ignore when 'section' then :section when 'service' then :service else :name_prefix end if @graph_path.nil? if @tag_for != :section and @section.nil? raise Fluent::ConfigError, "section parameter is needed when tag_for is not 'section'" end if @tag_for != :service and @service.nil? raise Fluent::ConfigError, "service parameter is needed when tag_for is not 'service'" end @graph_path = DEFAULT_GRAPH_PATH[@tag_for] end if @remove_prefix @removed_prefix_string = @remove_prefix + '.' @removed_length = @removed_prefix_string.length end @auth = case @authentication when 'basic' then :basic else :none end @resolver = Resolve::Hostname.new(system_resolver: true) end
connect_to(tag, name)
click to toggle source
# File lib/fluent/plugin/out_growthforecast.rb, line 192 def connect_to(tag, name) url = URI.parse(format_url(tag,name)) return url.host, url.port end
escape(param)
click to toggle source
# File lib/fluent/plugin/out_growthforecast.rb, line 182 def escape(param) escaped ||= param escaped = CGI.escape(param) if param end
format_url(tag, name)
click to toggle source
# File lib/fluent/plugin/out_growthforecast.rb, line 187 def format_url(tag, name) graph_path = @graph_path.gsub(/(\${[_a-z]+})/, placeholder_mapping(tag, name)) return @gfapi_url + graph_path end
gf_events(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_growthforecast.rb, line 284 def gf_events(tag, time, record) events = [] if @name_keys @name_keys.each_with_index do |name, i| if value = record[name] events.push({tag: tag, name: (@graphs ? @graphs[i] : name), value: value}) end end else # for name_key_pattern record.keys.each do |key| if @name_key_pattern.match(key) and record[key] events.push({tag: tag, name: key, value: record[key]}) end end end events end
gf_events_from_es(tag, es)
click to toggle source
# File lib/fluent/plugin/out_growthforecast.rb, line 302 def gf_events_from_es(tag, es) events = [] es.each do |time, record| events.concat(gf_events(tag, time, record)) end events end
http_connection(host, port)
click to toggle source
# File lib/fluent/plugin/out_growthforecast.rb, line 197 def http_connection(host, port) http = Net::HTTP.new(@resolver.getaddress(host), port) if @timeout http.open_timeout = @timeout http.read_timeout = @timeout end if @ssl http.use_ssl = true unless @verify_ssl http.verify_mode = OpenSSL::SSL::VERIFY_NONE end end http end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_growthforecast.rb, line 166 def multi_workers_ready? true end
placeholder_mapping(tag, name)
click to toggle source
# File lib/fluent/plugin/out_growthforecast.rb, line 174 def placeholder_mapping(tag, name) if @remove_prefix and ( (tag.start_with?(@removed_prefix_string) and tag.length > @removed_length) or tag == @remove_prefix) tag = tag[@removed_length..-1] end {'${service}' => escape(@service), '${section}' => escape(@section), '${tag}' => escape(tag), '${key_name}' => escape(name)} end
post(tag, name, value)
click to toggle source
# File lib/fluent/plugin/out_growthforecast.rb, line 227 def post(tag, name, value) url = format_url(tag,name) res = nil begin host,port = connect_to(tag, name) req = post_request(tag, name, value) http = http_connection(host, port) res = http.start {|client| client.request(req) } rescue IOError, EOFError, SystemCallError # server didn't respond log.warn "net/http POST raises exception: #{$!.class}, '#{$!.message}'" end unless res and res.is_a?(Net::HTTPSuccess) log.warn "failed to post to growthforecast: #{url}, number: #{value}, code: #{res && res.code}" end end
post_events(events)
click to toggle source
# File lib/fluent/plugin/out_growthforecast.rb, line 274 def post_events(events) if @keepalive post_keepalive(events) else events.each do |event| post(event[:tag], event[:name], event[:value]) end end end
post_keepalive(events)
click to toggle source
# File lib/fluent/plugin/out_growthforecast.rb, line 244 def post_keepalive(events) # [{:tag=>'',:name=>'',:value=>X}] return if events.size < 1 # gf host/port is same for all events (host is from configuration) host,port = connect_to(events.first[:tag], events.first[:name]) requests = events.map{|e| post_request(e[:tag], e[:name], e[:value])} http = nil requests.each do |req| begin unless http http = http_connection(host, port) http.start end res = http.request(req) unless res and res.is_a?(Net::HTTPSuccess) log.warn "failed to post to growthforecast: #{host}:#{port}#{req.path}, post_data: #{req.body} code: #{res && res.code}" end rescue IOError, EOFError, Errno::ECONNRESET, Errno::ETIMEDOUT, SystemCallError log.warn "net/http keepalive POST raises exception", error: $! http.finish rescue nil # ignore all errors for connection with error http = nil end end if http http.finish rescue nil end end
post_request(tag, name, value)
click to toggle source
# File lib/fluent/plugin/out_growthforecast.rb, line 212 def post_request(tag, name, value) url = URI.parse(format_url(tag,name)) req = Net::HTTP::Post.new(url.path) if @auth and @auth == :basic req.basic_auth(@username, @password) end req['Host'] = url.host if @keepalive req['Connection'] = 'Keep-Alive' end value = @enable_float_number ? value.to_f : value.to_i req.set_form_data({'number' => value, 'mode' => @mode.to_s}) req end
prefer_buffered_processing()
click to toggle source
# File lib/fluent/plugin/out_growthforecast.rb, line 170 def prefer_buffered_processing @background_post end
process(tag, es)
click to toggle source
# File lib/fluent/plugin/out_growthforecast.rb, line 310 def process(tag, es) events = gf_events_from_es(tag, es) begin post_events(events) rescue => e log.warn "HTTP POST Error occurs to growthforecast server, ignored (use background_post for retries)", error: e end end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_growthforecast.rb, line 319 def write(chunk) tag = chunk.metadata.tag events = [] chunk.each do |time, record| events.concat(gf_events(tag, time, record)) end begin post_events(events) rescue => e log.warn "HTTP POST Error occures to growthforecast server", error: e raise if @retry end end