class Fluent::SplunkAPIOutput
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_splunkapi.rb, line 55 def initialize super require 'net/http/persistent' require 'time' end
Public Instance Methods
chunk_to_buffers(chunk)
click to toggle source
# File lib/fluent/plugin/out_splunkapi.rb, line 151 def chunk_to_buffers(chunk) buffers = {} chunk.msgpack_each do |tag, event| (buffers[@source_formatter.call(tag)] ||= []) << event end return buffers end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_splunkapi.rb, line 61 def configure(conf) super case @source when '{TAG}' @source_formatter = lambda { |tag| tag } else @source_formatter = lambda { |tag| @source.sub('{TAG}', tag) } end case @time_format when 'none' @time_formatter = nil when 'unixtime' @time_formatter = lambda { |time| time.to_s } when 'localtime' @time_formatter = lambda { |time| Time.at(time).localtime } else @timef = TimeFormatter.new(@time_format, @localtime) @time_formatter = lambda { |time| @timef.format(time) } end case @format when 'json' @formatter = lambda { |record| record.to_json } when 'kvp' @formatter = lambda { |record| record_to_kvp(record) } when 'text' @formatter = lambda { |record| # NOTE: never modify 'record' directly record_copy = record.dup record_copy.delete('message') if record_copy.length == 0 record['message'] else "[#{record_to_kvp(record_copy)}] #{record['message']}" end } end if @protocol == 'rest' @username, @password = @auth.split(':') @base_url = "https://#{@server}/services/receivers/simple?sourcetype=#{@sourcetype}" @base_url += "&host=#{@host}" if @host @base_url += "&index=#{@index}" if @index @base_url += "&check-index=false" unless @check_index elsif @protocol == 'storm' @username, @password = 'x', @access_token @base_url = "https://#{@api_hostname}/1/inputs/http?index=#{@project_id}&sourcetype=#{@sourcetype}" @base_url += "&host=#{@host}" if @host end end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_splunkapi.rb, line 138 def format(tag, time, record) if @time_formatter time_str = "#{@time_formatter.call(time)}: " else time_str = '' end record.delete('time') event = "#{time_str}#{@formatter.call(record)}\n" [tag, event].to_msgpack end
record_to_kvp(record)
click to toggle source
# File lib/fluent/plugin/out_splunkapi.rb, line 118 def record_to_kvp(record) record.map {|k,v| v == nil ? "#{k}=" : "#{k}=\"#{v}\""}.join(' ') end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_splunkapi.rb, line 130 def shutdown # NOTE: call super before @http.shutdown because super may flush final output super @http.shutdown $log.debug "shutdown from #{@base_url}" end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_splunkapi.rb, line 122 def start super @http = Net::HTTP::Persistent.new 'fluentd-plugin-splunkapi' @http.verify_mode = OpenSSL::SSL::VERIFY_NONE unless @verify @http.headers['Content-Type'] = 'text/plain' $log.debug "initialized for #{@base_url}" end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_splunkapi.rb, line 159 def write(chunk) chunk_to_buffers(chunk).each do |source, messages| uri = URI @base_url + "&source=#{source}" post = Net::HTTP::Post.new uri.request_uri post.basic_auth @username, @password post.body = messages.join('') $log.debug "POST #{uri}" # retry up to :post_retry_max times 1.upto(@post_retry_max) do |c| response = @http.request uri, post $log.debug "=> #{response.code} (#{response.message})" if response.code == "200" # success break elsif response.code.match(/^40/) # user error $log.error "#{uri}: #{response.code} (#{response.message})\n#{response.body}" break elsif c < @post_retry_max # retry sleep @post_retry_interval next else # other errors. fluentd will retry processing on exception # FIXME: this may duplicate logs when using multiple buffers raise "#{uri}: #{response.message}" end end end end