class Fluent::SplunkHTTPEventcollectorOutput

Public Class Methods

new() click to toggle source

Called on class load (class initializer)

Calls superclass method
# File lib/fluent/plugin/out_splunk-http-eventcollector.rb, line 71
def initialize
  super
  log.trace "splunk-http-eventcollector(initialize) called"
  require 'net/http/persistent'
  require 'openssl'
end
placeholder_expander(log) click to toggle source

Thanks to github.com/kazegusuri/fluent-plugin-prometheus/blob/348c112d/lib/fluent/plugin/prometheus.rb

# File lib/fluent/plugin/out_splunk-http-eventcollector.rb, line 80
def self.placeholder_expander(log)
  # Use internal class in order to expand placeholder
  if defined?(Fluent::Filter) # for v0.12, built-in PlaceholderExpander
    begin
      require 'fluent/plugin/filter_record_transformer'
      if defined?(Fluent::Plugin::RecordTransformerFilter::PlaceholderExpander)
        # for v0.14
        return Fluent::Plugin::RecordTransformerFilter::PlaceholderExpander.new(log: log)
      else
        # for v0.12
        return Fluent::RecordTransformerFilter::PlaceholderExpander.new(log: log)
      end
    rescue LoadError => e
      raise ConfigError, "cannot find filter_record_transformer plugin: #{e.message}"
    end
  else # for v0.10, use PlaceholderExapander in fluent-plugin-record-reformer plugin
    begin
      require 'fluent/plugin/out_record_reformer.rb'
      return Fluent::RecordReformerOutput::PlaceholderExpander.new(log: log)
    rescue LoadError => e
      raise ConfigError, "cannot find fluent-plugin-record-reformer: #{e.message}"
    end
  end
end

Public Instance Methods

configure(conf) click to toggle source

This method is called before starting. 'conf' is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.

Calls superclass method
# File lib/fluent/plugin/out_splunk-http-eventcollector.rb, line 108
def configure(conf)
  super
  log.trace "splunk-http-eventcollector(configure) called"
  begin
    @splunk_uri = URI "#{@protocol}://#{@server}/services/collector"
  rescue
    raise ConfigError, "Unable to parse the server into a URI."
  end

  @placeholder_expander = Fluent::SplunkHTTPEventcollectorOutput.placeholder_expander(log)
  @hostname = Socket.gethostname
  # TODO Add other robust input/syntax checks.
end
convert_to_utf8(input) click to toggle source

Encode as UTF-8. If 'coerce_to_utf8' is set to true in the config, any non-UTF-8 character would be replaced by the string specified by 'non_utf8_replacement_string'. If 'coerce_to_utf8' is set to false, any non-UTF-8 character would trigger the plugin to error out. Thanks to github.com/GoogleCloudPlatform/fluent-plugin-google-cloud/blob/dbc28575/lib/fluent/plugin/out_google_cloud.rb#L1284

# File lib/fluent/plugin/out_splunk-http-eventcollector.rb, line 294
def convert_to_utf8(input)
  if input.is_a?(Hash)
    record = {}
    input.each do |key, value|
      record[convert_to_utf8(key)] = convert_to_utf8(value)
    end

    return record
  end
  return input.map { |value| convert_to_utf8(value) } if input.is_a?(Array)
  return input unless input.respond_to?(:encode)

  if @coerce_to_utf8
    input.encode(
      'utf-8',
      invalid: :replace,
      undef: :replace,
      replace: @non_utf8_replacement_string)
  else
    begin
      input.encode('utf-8')
    rescue EncodingError
      @log.error 'Encountered encoding issues potentially due to non ' \
                 'UTF-8 characters. To allow non-UTF-8 characters and ' \
                 'replace them with spaces, please set "coerce_to_utf8" ' \
                 'to true.'
      raise
    end
  end
end
format(tag, time, record) click to toggle source

This method is called when an event reaches to Fluentd. (like unbuffered emit()) Convert the event to a raw string.

# File lib/fluent/plugin/out_splunk-http-eventcollector.rb, line 148
def format(tag, time, record)
  #log.trace "splunk-http-eventcollector(format) called"
  # Basic object for Splunk. Note explicit type-casting to avoid accidental errors.

  placeholder_values = {
    'tag' => tag,
    'tag_parts' => tag.split('.'),
    'hostname' => @hostname,
    'record' => record
  }

  placeholders = @placeholder_expander.prepare_placeholders(placeholder_values)

  splunk_object = Hash[
      "time" => time.to_i,
      "source" => if @source.nil? then tag.to_s else @placeholder_expander.expand(@source, placeholders) end,
      "sourcetype" => @placeholder_expander.expand(@sourcetype.to_s, placeholders),
      "host" => @placeholder_expander.expand(@host.to_s, placeholders),
      "index" =>  @placeholder_expander.expand(@index, placeholders)
    ]
  # TODO: parse different source types as expected: KVP, JSON, TEXT
  if @all_items
    splunk_object["event"] = convert_to_utf8(record)
  else
    splunk_object["event"] = convert_to_utf8(record["message"])
  end

  json_event = splunk_object.to_json
  #log.debug "Generated JSON(#{json_event.class.to_s}): #{json_event.to_s}"
  #log.debug "format: returning: #{[tag, record].to_json.to_s}"
  json_event
end
numfmt(input) click to toggle source
# File lib/fluent/plugin/out_splunk-http-eventcollector.rb, line 284
def numfmt(input)
  input.to_s.reverse.gsub(/(\d{3})(?=\d)/, '\1,').reverse
end
push_buffer(body) click to toggle source
# File lib/fluent/plugin/out_splunk-http-eventcollector.rb, line 242
def push_buffer(body)
  post = Net::HTTP::Post.new @splunk_uri.request_uri
  post.body = body
  log.debug "POST #{@splunk_uri}"
  if @test_mode
    log.debug "TEST_MODE Payload: #{body}"
    return
  end
  # retry up to :post_retry_max times
  1.upto(@post_retry_max) do |c|
    response = @http.request @splunk_uri, post
    log.debug "=>(#{c}/#{numfmt(@post_retry_max)}) #{response.code} " +
        "(#{response.message})"
    # TODO check the actual server response too (it's JSON)
    if response.code == "200"  # and...
      # success
      break
    # TODO check 40X response within post_retry_max and retry
    elsif response.code.match(/^50/) and c < @post_retry_max
      # retry
      log.warn "#{@splunk_uri}: Server error #{response.code} (" +
          "#{response.message}). Retrying in #{@post_retry_interval} " +
          "seconds.\n#{response.body}"
      sleep @post_retry_interval
      next
    elsif response.code.match(/^40/)
      # user error
      log.error "#{@splunk_uri}: #{response.code} (#{response.message})\n#{response.body}"
      break
    elsif c < @post_retry_max
      # retry
      log.debug "#{@splunk_uri}: Retrying..."
      sleep @post_retry_interval
      next
    else
      # other errors. fluentd will retry processing on exception
      # FIXME: this may duplicate logs when using multiple buffers
      raise "#{@splunk_uri}: #{response.message}\n#{response.body}"
    end # If response.code
  end # 1.upto(@post_retry_max)
end
shutdown() click to toggle source

This method is called when shutting down. Shutdown the thread and close sockets or files here.

Calls superclass method
# File lib/fluent/plugin/out_splunk-http-eventcollector.rb, line 138
def shutdown
  super
  log.trace "splunk-http-eventcollector(shutdown) called"

  @http.shutdown
  log.trace "shutdown from splunk-http-eventcollector"
end
start() click to toggle source

This method is called when starting. Open sockets or files here.

Calls superclass method
# File lib/fluent/plugin/out_splunk-http-eventcollector.rb, line 124
def start
  super
  log.trace "splunk-http-eventcollector(start) called"
  @http = Net::HTTP::Persistent.new 'fluent-plugin-splunk-http-eventcollector'
  @http.verify_mode = OpenSSL::SSL::VERIFY_NONE unless @verify
  @http.override_headers['Content-Type'] = 'application/json'
  @http.override_headers['User-Agent'] = 'fluent-plugin-splunk-http-eventcollector/0.0.1'
  @http.override_headers['Authorization'] = "Splunk #{@token}"

  log.trace "initialized for splunk-http-eventcollector"
end
write(chunk) click to toggle source

By this point, fluentd has decided its buffer is full and it's time to flush it. chunk.read is a concatenated string of JSON.to_s objects. Simply POST them to Splunk and go about our life.

This method is called every flush interval. Write the buffer chunk
to files or databases here.
'chunk' is a buffer chunk that includes multiple formatted
events. You can use 'data = chunk.read' to get all events and
'chunk.open {|io| ... }' to get IO objects.

NOTE! This method is called by internal thread, not Fluentd's main thread. So IO wait doesn't affect other plugins.
# File lib/fluent/plugin/out_splunk-http-eventcollector.rb, line 191
def write(chunk)
  log.trace "splunk-http-eventcollector(write) called"

  # Break the concatenated string of JSON-formatted events into an Array
  split_chunk = chunk.read.split("}{").each do |x|
    # Reconstruct the opening{/closing} that #split() strips off.
    x.prepend("{") unless x.start_with?("{")
    x << "}" unless x.end_with?("}")
  end
  log.debug "Pushing #{numfmt(split_chunk.size)} events (" +
      "#{numfmt(chunk.read.bytesize)} bytes) to Splunk."
  # If fluentd is pushing too much data to Splunk at once, split up the payload
  # Don't care about the number of events so much as the POST size (bytes)
  #if split_chunk.size > @batch_event_limit
  #  log.warn "Fluentd is attempting to push #{numfmt(split_chunk.size)} " +
  #      "events in a single push to Splunk. The configured limit is " +
  #      "#{numfmt(@batch_event_limit)}."
  #end
  if chunk.read.bytesize > @batch_size_limit
    log.warn "Fluentd is attempting to push #{numfmt(chunk.read.bytesize)} " +
        "bytes in a single push to Splunk. The configured limit is " +
        "#{numfmt(@batch_size_limit)} bytes."
    newbuffer = Array.new
    split_chunk_counter = 0
    split_chunk.each do |c|
      split_chunk_counter = split_chunk_counter + 1
      #log.debug "(#{numfmt(split_chunk_counter)}/#{numfmt(split_chunk.size)}) " +
      #    "newbuffer.bytesize=#{numfmt(newbuffer.join.bytesize)} + " +
      #    "c.bytesize=#{numfmt(c.bytesize)} ????"
      if newbuffer.join.bytesize + c.bytesize < @batch_size_limit
        #log.debug "Appended!"
        newbuffer << c
      else
        # Reached the limit - push the current newbuffer.join, and reset
        #log.debug "Would exceed limit. Flushing newbuffer and continuing."
        log.debug "(#{numfmt(split_chunk_counter)}/#{numfmt(split_chunk.size)}) " +
            "newbuffer.bytesize=#{numfmt(newbuffer.join.bytesize)} + " +
            "c.bytesize=#{numfmt(c.bytesize)} > #{numfmt(@batch_size_limit)}, " +
            "flushing current buffer to Splunk."
        push_buffer newbuffer.join
        newbuffer = Array c
      end # if/else buffer fits limit
    end # split_chunk.each
    # Push anything left over.
    push_buffer newbuffer.join if newbuffer.size
    return
  else
    return push_buffer chunk.read
  end # if chunk.read.bytesize > @batch_size_limit
end