class Fluent::Plugin::SplunkIngestApiOutput

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method Fluent::Plugin::SplunkOutput#configure
# File lib/fluent/plugin/out_splunk_ingest_api.rb, line 40
def configure(conf)
  super
end
construct_api() click to toggle source
# File lib/fluent/plugin/out_splunk_ingest_api.rb, line 44
def construct_api
  uri = "https://#{@ingest_api_host}/#{@ingest_api_tenant}#{@ingest_api_events_endpoint}"
  URI(uri)
rescue StandardError
  raise Fluent::ConfigError, "URI #{uri} is invalid"
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_splunk_ingest_api.rb, line 51
def format(tag, time, record)
  format_event(tag, time, record)
end
format_event(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_splunk_ingest_api.rb, line 55
def format_event(tag, time, record)
  event = prepare_event_payload(tag, time, record)
  # Unsure how to drop a record. So append the empty string
  if event[:body].nil? || event[:body].strip.empty?
    ''
  else
    MultiJson.dump(event) + ','
  end
end
new_connection() click to toggle source
# File lib/fluent/plugin/out_splunk_ingest_api.rb, line 89
def new_connection
  Rack::OAuth2.debugging = true if @debug_http
  client = OpenIDConnect::Client.new(
    token_endpoint: @token_endpoint,
    identifier: @service_client_identifier,
    secret: @service_client_secret_key,
    redirect_uri: 'http://localhost:8080/', # Not used
    host: @ingest_auth_host,
    scheme: 'https'
  )

  client.access_token!(client_auth_method: 'other')
end
prefer_buffer_processing() click to toggle source
# File lib/fluent/plugin/out_splunk_ingest_api.rb, line 36
def prefer_buffer_processing
  true
end
prepare_event_payload(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_splunk_ingest_api.rb, line 65
def prepare_event_payload(tag, time, record)
  payload = super(tag, time, record)
  payload[:attributes] = payload.delete(:fields) || {}
  payload[:attributes][:index] = payload.delete(:index) if payload[:index]
  payload[:body] = payload.delete(:event)
  payload.delete(:time)
  payload[:timestamp] = (time.to_f * 1000).to_i
  payload[:nanos] = time.nsec / 100_000

  payload
end
process_response(response, request_body) click to toggle source
# File lib/fluent/plugin/out_splunk_ingest_api.rb, line 77
def process_response(response, request_body)
  super
  if response.code.to_s == '401'
    @conn = new_connection
    raise 'Auth Error recived. New token has been fetched.'
  elsif response.code.to_s == '429'
    raise "Throttle error from server. #{response.body}"
  elsif /INVALID_DATA/.match?(response.body)
    log.error "#{self.class}: POST Body #{request_body}"
  end
end
write_to_splunk(chunk) click to toggle source
# File lib/fluent/plugin/out_splunk_ingest_api.rb, line 103
def write_to_splunk(chunk)
  log.trace "#{self.class}: In write() with #{chunk.size_of_events} records and #{chunk.bytesize} bytes "
  # ingest API is an array of json objects
  body = "[#{chunk.read.chomp(',')}]"
  @conn ||= new_connection
  response = @conn.post("https://#{@ingest_api_host}/#{@ingest_api_tenant}#{@ingest_api_events_endpoint}", body: body)
  process_response(response, body)
end