class Fluent::Plugin::SplunkIngestApiOutput
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
Fluent::Plugin::SplunkOutput#configure
# File lib/fluent/plugin/out_splunk_ingest_api.rb, line 40 def configure(conf) super end
construct_api()
click to toggle source
# File lib/fluent/plugin/out_splunk_ingest_api.rb, line 44 def construct_api uri = "https://#{@ingest_api_host}/#{@ingest_api_tenant}#{@ingest_api_events_endpoint}" URI(uri) rescue StandardError raise Fluent::ConfigError, "URI #{uri} is invalid" end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_splunk_ingest_api.rb, line 51 def format(tag, time, record) format_event(tag, time, record) end
format_event(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_splunk_ingest_api.rb, line 55 def format_event(tag, time, record) event = prepare_event_payload(tag, time, record) # Unsure how to drop a record. So append the empty string if event[:body].nil? || event[:body].strip.empty? '' else MultiJson.dump(event) + ',' end end
new_connection()
click to toggle source
# File lib/fluent/plugin/out_splunk_ingest_api.rb, line 89 def new_connection Rack::OAuth2.debugging = true if @debug_http client = OpenIDConnect::Client.new( token_endpoint: @token_endpoint, identifier: @service_client_identifier, secret: @service_client_secret_key, redirect_uri: 'http://localhost:8080/', # Not used host: @ingest_auth_host, scheme: 'https' ) client.access_token!(client_auth_method: 'other') end
prefer_buffer_processing()
click to toggle source
# File lib/fluent/plugin/out_splunk_ingest_api.rb, line 36 def prefer_buffer_processing true end
prepare_event_payload(tag, time, record)
click to toggle source
Calls superclass method
Fluent::Plugin::SplunkOutput#prepare_event_payload
# File lib/fluent/plugin/out_splunk_ingest_api.rb, line 65 def prepare_event_payload(tag, time, record) payload = super(tag, time, record) payload[:attributes] = payload.delete(:fields) || {} payload[:attributes][:index] = payload.delete(:index) if payload[:index] payload[:body] = payload.delete(:event) payload.delete(:time) payload[:timestamp] = (time.to_f * 1000).to_i payload[:nanos] = time.nsec / 100_000 payload end
process_response(response, request_body)
click to toggle source
Calls superclass method
Fluent::Plugin::SplunkOutput#process_response
# File lib/fluent/plugin/out_splunk_ingest_api.rb, line 77 def process_response(response, request_body) super if response.code.to_s == '401' @conn = new_connection raise 'Auth Error recived. New token has been fetched.' elsif response.code.to_s == '429' raise "Throttle error from server. #{response.body}" elsif /INVALID_DATA/.match?(response.body) log.error "#{self.class}: POST Body #{request_body}" end end
write_to_splunk(chunk)
click to toggle source
# File lib/fluent/plugin/out_splunk_ingest_api.rb, line 103 def write_to_splunk(chunk) log.trace "#{self.class}: In write() with #{chunk.size_of_events} records and #{chunk.bytesize} bytes " # ingest API is an array of json objects body = "[#{chunk.read.chomp(',')}]" @conn ||= new_connection response = @conn.post("https://#{@ingest_api_host}/#{@ingest_api_tenant}#{@ingest_api_events_endpoint}", body: body) process_response(response, body) end