class Fluent::SplunkExOutput

Constants

SOCKET_TRY_MAX

Protected Class Methods

format_json(record) click to toggle source
# File lib/fluent/plugin/out_splunk_ex.rb, line 82
def self.format_json(record)
  json_str = record.to_json
end
format_kv(record) click to toggle source
# File lib/fluent/plugin/out_splunk_ex.rb, line 74
def self.format_kv(record)
  kv_out_str = ''
  record.each { |k, v|
    kv_out_str << sprintf('%s=%s ', URI::encode(k), URI::encode(v.to_s))
  }
  kv_out_str
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_splunk_ex.rb, line 28
def configure(conf)
  super
end
emit(tag, es, chain) click to toggle source
# File lib/fluent/plugin/out_splunk_ex.rb, line 63
def emit(tag, es, chain)
  chain.next
  es.each {|time,record|
    @send_data.call( @format_fn.call(record) )
  }
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_splunk_ex.rb, line 55
def shutdown
  super
  if !@test_mode && @splunk_connection.respond_to?(:close)
    @splunk_connection.close
  end
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_splunk_ex.rb, line 32
def start
  super

  if @output_format == 'kv'
    @format_fn = self.class.method(:format_kv)
  else
    @format_fn = self.class.method(:format_json)
  end

  if @test_mode
    @send_data = proc { |text| log.trace("test mode text: #{text}") }
  else
    begin
      @splunk_connection = TCPSocket.open(@host, @port)
    rescue Errno::ECONNREFUSED
      # we'll try again when data is sent
    end
    @send_data = self.method(:splunk_send)
  end

end

Protected Instance Methods

splunk_send(text, try_count=0) click to toggle source
# File lib/fluent/plugin/out_splunk_ex.rb, line 86
def splunk_send(text, try_count=0)
  log.debug("splunk_send: #{text}")

  successful_send = false
  try_count = 0

  while (!successful_send && try_count < SOCKET_TRY_MAX)
    begin
      @splunk_connection.puts(text)
      successful_send = true

    rescue NoMethodError, Errno::ECONNRESET, Errno::ECONNREFUSED, Errno::EPIPE => se
      log.error("splunk_send - socket send retry (#{try_count}) failed: #{se}")
      try_count = try_count + 1

      successful_reopen = false
      while (!successful_reopen && try_count < SOCKET_TRY_MAX)
        begin
          # Try reopening
          @splunk_connection = TCPSocket.open(@host, @port)
          successful_reopen = true
        rescue Errno::ECONNRESET, Errno::ECONNREFUSED, Errno::EPIPE => se
          log.error("splunk_send - socket open retry (#{try_count}) failed: #{se}")
          try_count = try_count + 1
        end
      end
    end
  end

  if !successful_send
    log.fatal("splunk_send - retry of sending data failed after #{SOCKET_TRY_MAX} chances.")
    log.warn(text)
  end
  
end