class Fluent::Plugin::SplunkOutput

Constants

KEY_FIELDS
TAG_PLACEHOLDER
VERSION

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_splunk.rb, line 77
def initialize
  super
  @registry = ::Prometheus::Client.registry
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_splunk.rb, line 82
def configure(conf)
  super
  check_conflict
  @api = construct_api
  prepare_key_fields
  configure_fields(conf)
  configure_metrics(conf)

  # @formatter_configs is from formatter helper
  @formatters = @formatter_configs.map do |section|
    MatchFormatter.new section.usage, formatter_create(usage: section.usage)
  end
end
construct_api() click to toggle source
# File lib/fluent/plugin/out_splunk.rb, line 114
def construct_api
  raise NotImplementedError("Child class should implement 'construct_api'")
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_splunk.rb, line 96
def write(chunk)
  log.trace { "#{self.class}: Received new chunk, size=#{chunk.read.bytesize}" }

  t = Benchmark.realtime do
    write_to_splunk(chunk)
  end

  @metrics[:record_counter].increment(labels: metric_labels, by: chunk.size_of_events)
  @metrics[:bytes_counter].increment(labels: metric_labels, by: chunk.bytesize)
  @metrics[:write_records_histogram].observe(chunk.size_of_events, labels: metric_labels)
  @metrics[:write_bytes_histogram].observe(chunk.bytesize, labels: metric_labels, )
  @metrics[:write_latency_histogram].observe(t, labels: metric_labels, )
end
write_to_splunk(_chunk) click to toggle source
# File lib/fluent/plugin/out_splunk.rb, line 110
def write_to_splunk(_chunk)
  raise NotImplementedError("Child class should implement 'write_to_splunk'")
end

Protected Instance Methods

format_event(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_splunk.rb, line 149
def format_event(tag, time, record)
  MultiJson.dump(prepare_event_payload(tag, time, record))
end
prepare_event_payload(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_splunk.rb, line 120
def prepare_event_payload(tag, time, record)
  {
    host: @host ? @host.call(tag, record) : @default_host,
    # From the API reference
    # http://docs.splunk.com/Documentation/Splunk/latest/RESTREF/RESTinput#services.2Fcollector
    # `time` should be a string or unsigned integer.
    # That's why we use `to_s` here.
    time: time.to_f.to_s
  }.tap do |payload|
    payload[:index] = @index.call(tag, record) if @index
    payload[:source] = @source.call(tag, record) if @source
    payload[:sourcetype] = @sourcetype.call(tag, record) if @sourcetype

    # delete nil fields otherwise will get format error from HEC
    %i[host index source sourcetype].each { |f| payload.delete f if payload[f].nil? }

    if @extra_fields
      payload[:fields] = @extra_fields.map { |name, field| [name, record[field]] }.to_h
      payload[:fields].compact!
      # if a field is already in indexed fields, then remove it from the original event
      @extra_fields.values.each { |field| record.delete field }
    end
    if formatter = @formatters.find { |f| f.match? tag }
      record = formatter.format(tag, time, record)
    end
    payload[:event] = convert_to_utf8 record
  end
end
process_response(response, _request_body) click to toggle source
# File lib/fluent/plugin/out_splunk.rb, line 153
def process_response(response, _request_body)
  log.trace { "[Response] POST #{@api}: #{response.inspect}" }

  @metrics[:status_counter].increment(labels: metric_labels(status: response.code.to_s))

  raise_err = response.code.to_s.start_with?('5') || (!@consume_chunk_on_4xx_errors && response.code.to_s.start_with?('4'))

  # raise Exception to utilize Fluentd output plugin retry mechanism
  raise "Server error (#{response.code}) for POST #{@api}, response: #{response.body}" if raise_err

  # For both success response (2xx) we will consume the chunk.
  unless response.code.to_s.start_with?('2')
    log.error "#{self.class}: Failed POST to #{@api}, response: #{response.body}"
    log.error { "#{self.class}: Failed request body: #{post.body}" }
  end
end

Private Instance Methods

check_conflict() click to toggle source
# File lib/fluent/plugin/out_splunk.rb, line 172
def check_conflict
  KEY_FIELDS.each do |f|
    kf = "#{f}_key"
    raise Fluent::ConfigError, "Can not set #{f} and #{kf} at the same time." \
      if %W[@#{f} @#{kf}].all? &method(:instance_variable_get)
  end
end
configure_fields(conf) click to toggle source

<fields> directive, which defines:

  • when data_type is event, index-time fields

  • when data_type is metric, metric dimensions

# File lib/fluent/plugin/out_splunk.rb, line 208
def configure_fields(conf)
  # This loop looks dump, but it is used to suppress the unused parameter configuration warning
  # Learned from `filter_record_transformer`.
  conf.elements.select { |element| element.name == 'fields' }.each do |element|
    element.each_pair { |k, _v| element.has_key?(k) }
  end

  return unless @fields

  @extra_fields = @fields.corresponding_config_element.map do |k, v|
    [k, v.empty? ? k : v]
  end.to_h
  end
configure_metrics(conf) click to toggle source
# File lib/fluent/plugin/out_splunk.rb, line 230
def configure_metrics(conf)
  @metric_labels = {
    type: conf['@type'],
    plugin_id: plugin_id
  }

  @metrics = {
    record_counter: register_metric(::Prometheus::Client::Counter.new(
                                      :splunk_output_write_records_count, docstring:
                                      'The number of log records being sent',
                                      labels: metric_label_keys
                                    )),
    bytes_counter: register_metric(::Prometheus::Client::Counter.new(
                                     :splunk_output_write_bytes_count, docstring:
                                     'The number of log bytes being sent',
                                     labels: metric_label_keys
                                   )),
    status_counter: register_metric(::Prometheus::Client::Counter.new(
                                      :splunk_output_write_status_count, docstring:
                                      'The count of sends by response_code',
                                      labels: metric_label_keys(status: "")
                                    )),
    write_bytes_histogram: register_metric(::Prometheus::Client::Histogram.new(
                                             :splunk_output_write_payload_bytes, docstring:
                                             'The size of the write payload in bytes', buckets: [1024, 23_937, 47_875, 95_750, 191_500, 383_000, 766_000, 1_149_000],
                                             labels: metric_label_keys
                                           )),
    write_records_histogram: register_metric(::Prometheus::Client::Histogram.new(
                                               :splunk_output_write_payload_records, docstring:
                                               'The number of records written per write', buckets: [1, 10, 25, 100, 200, 300, 500, 750, 1000, 1500],
                                               labels: metric_label_keys
                                             )),
    write_latency_histogram: register_metric(::Prometheus::Client::Histogram.new(
                                               :splunk_output_write_latency_seconds, docstring:
                                               'The latency of writes',
                                               labels: metric_label_keys
                                             ))
  }
end
convert_to_utf8(input) click to toggle source

Encode as UTF-8. If 'coerce_to_utf8' is set to true in the config, any non-UTF-8 character would be replaced by the string specified by 'non_utf8_replacement_string'. If 'coerce_to_utf8' is set to false, any non-UTF-8 character would trigger the plugin to error out. Thanks to github.com/GoogleCloudPlatform/fluent-plugin-google-cloud/blob/dbc28575/lib/fluent/plugin/out_google_cloud.rb#L1284

# File lib/fluent/plugin/out_splunk.rb, line 285
def convert_to_utf8(input)
  if input.is_a?(Hash)
    record = {}
    input.each do |key, value|
      record[convert_to_utf8(key)] = convert_to_utf8(value)
    end

    return record
  end
  return input.map { |value| convert_to_utf8(value) } if input.is_a?(Array)
  return input unless input.respond_to?(:encode)

  if @coerce_to_utf8
    input.encode(
      'utf-8',
      invalid: :replace,
      undef: :replace,
      replace: @non_utf8_replacement_string
    )
  else
    begin
      input.encode('utf-8')
    rescue EncodingError
      log.error do
        'Encountered encoding issues potentially due to non ' \
                         'UTF-8 characters. To allow non-UTF-8 characters and ' \
                         'replace them with spaces, please set "coerce_to_utf8" ' \
                         'to true.'
      end
      raise
    end
  end
end
metric_label_keys(other_labels = {}) click to toggle source
# File lib/fluent/plugin/out_splunk.rb, line 275
def metric_label_keys(other_labels = {})
  (@metric_labels.merge other_labels).keys
end
metric_labels(other_labels = {}) click to toggle source

Tag metrics with the type string that was used to register the plugin

# File lib/fluent/plugin/out_splunk.rb, line 271
def metric_labels(other_labels = {})
  @metric_labels.merge other_labels
end
pick_custom_format_method() click to toggle source
# File lib/fluent/plugin/out_splunk.rb, line 222
def pick_custom_format_method
  if @data_type == :event
    define_singleton_method :format, method(:format_event)
  else
    define_singleton_method :format, method(:format_metric)
  end
end
prepare_key_fields() click to toggle source
# File lib/fluent/plugin/out_splunk.rb, line 180
def prepare_key_fields
  KEY_FIELDS.each do |f|
    v = instance_variable_get "@#{f}_key"
    if v
      attrs = v.split('.').freeze
      if @keep_keys
        instance_variable_set "@#{f}", ->(_, record) { attrs.inject(record) { |o, k| o[k] } }
      else
        instance_variable_set "@#{f}", lambda { |_, record|
          attrs[0...-1].inject(record) { |o, k| o[k] }.delete(attrs[-1])
        }
      end
    else
      v = instance_variable_get "@#{f}"
      next unless v

      if v == TAG_PLACEHOLDER
        instance_variable_set "@#{f}", ->(tag, _) { tag }
      else
        instance_variable_set "@#{f}", ->(_, _) { v }
      end
    end
  end
end
register_metric(metric) click to toggle source
# File lib/fluent/plugin/out_splunk.rb, line 319
def register_metric(metric)
  if !@registry.exist?(metric.name)
    @registry.register(metric)
  else
    @registry.get(metric.name)
  end
end