class Fluent::SplunkAPIOutput

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_splunkapi.rb, line 55
def initialize
  super
  require 'net/http/persistent'
  require 'time'
end

Public Instance Methods

chunk_to_buffers(chunk) click to toggle source
# File lib/fluent/plugin/out_splunkapi.rb, line 151
def chunk_to_buffers(chunk)
  buffers = {}
  chunk.msgpack_each do |tag, event|
    (buffers[@source_formatter.call(tag)] ||= []) << event
  end
  return buffers
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_splunkapi.rb, line 61
def configure(conf)
  super

  case @source
  when '{TAG}'
    @source_formatter = lambda { |tag| tag }
  else
    @source_formatter = lambda { |tag| @source.sub('{TAG}', tag) }
  end

  case @time_format
  when 'none'
    @time_formatter = nil
  when 'unixtime'
    @time_formatter = lambda { |time| time.to_s }
  when 'localtime'
    @time_formatter = lambda { |time| Time.at(time).localtime }
  else
    @timef = TimeFormatter.new(@time_format, @localtime)
    @time_formatter = lambda { |time| @timef.format(time) }
  end

  case @format
  when 'json'
    @formatter = lambda { |record|
      record.to_json
    }
  when 'kvp'
    @formatter = lambda { |record|
      record_to_kvp(record)
    }
  when 'text'
    @formatter = lambda { |record|
      # NOTE: never modify 'record' directly
      record_copy = record.dup
      record_copy.delete('message')
      if record_copy.length == 0
        record['message']
      else
        "[#{record_to_kvp(record_copy)}] #{record['message']}"
      end
    }
  end

  if @protocol == 'rest'
    @username, @password = @auth.split(':')
    @base_url = "https://#{@server}/services/receivers/simple?sourcetype=#{@sourcetype}"
    @base_url += "&host=#{@host}" if @host
    @base_url += "&index=#{@index}" if @index
    @base_url += "&check-index=false" unless @check_index
  elsif @protocol == 'storm'
    @username, @password = 'x', @access_token
    @base_url = "https://#{@api_hostname}/1/inputs/http?index=#{@project_id}&sourcetype=#{@sourcetype}"
    @base_url += "&host=#{@host}" if @host
  end
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_splunkapi.rb, line 138
def format(tag, time, record)
  if @time_formatter
    time_str = "#{@time_formatter.call(time)}: "
  else
    time_str = ''
  end

  record.delete('time')
  event = "#{time_str}#{@formatter.call(record)}\n"

  [tag, event].to_msgpack
end
record_to_kvp(record) click to toggle source
# File lib/fluent/plugin/out_splunkapi.rb, line 118
def record_to_kvp(record)
  record.map {|k,v| v == nil ? "#{k}=" : "#{k}=\"#{v}\""}.join(' ')
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_splunkapi.rb, line 130
def shutdown
  # NOTE: call super before @http.shutdown because super may flush final output
  super

  @http.shutdown
  $log.debug "shutdown from #{@base_url}"
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_splunkapi.rb, line 122
def start
  super
  @http = Net::HTTP::Persistent.new 'fluentd-plugin-splunkapi'
  @http.verify_mode = OpenSSL::SSL::VERIFY_NONE unless @verify
  @http.headers['Content-Type'] = 'text/plain'
  $log.debug "initialized for #{@base_url}"
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_splunkapi.rb, line 159
def write(chunk)
  chunk_to_buffers(chunk).each do |source, messages|
    uri = URI @base_url + "&source=#{source}"
    post = Net::HTTP::Post.new uri.request_uri
    post.basic_auth @username, @password
    post.body = messages.join('')
    $log.debug "POST #{uri}"
    # retry up to :post_retry_max times
    1.upto(@post_retry_max) do |c|
      response = @http.request uri, post
      $log.debug "=> #{response.code} (#{response.message})"
      if response.code == "200"
        # success
        break
      elsif response.code.match(/^40/)
        # user error
        $log.error "#{uri}: #{response.code} (#{response.message})\n#{response.body}"
        break
      elsif c < @post_retry_max
        # retry
        sleep @post_retry_interval
        next
      else
        # other errors. fluentd will retry processing on exception
        # FIXME: this may duplicate logs when using multiple buffers
        raise "#{uri}: #{response.message}"
      end
    end
  end
end