class LogStash::Inputs::OktaEnterprise
This Logstash input plugin allows you to call an the Okta HTTP API to ship to other SIEMS. This plugin is based on the http_poller plugin, however the plugin needed to retain a state. It should do that, and can be used as a basis for similar web api style loggers. The plugin supports the rufus style scheduling. Using the HTTP poller with custom a custom CA or self signed cert. ==== Example This is a basic configuration. The API key is passed through using an environment variable. While it is possible to just put the API key directly into the file, it is NOT recommended. [source,ruby] ---------------------------------- input { okta_enterprise { schedule => { every => "30s" } chunk_size => 1000 auth_token_env => "${OKTA_API_KEY}" url => "https://uri.okta.com/api/v1/events" } } output { stdout { codec => rubydebug } } ---------------------------------- It is possible to save the application state, so if the plugin is stopped it won't have to pull all the data again. Currently Linux ONLY. The state file base is added to the config, which will be used store the state of the event query. The directory in which the exists should have rwx permissions for the logstash user. As such it should not be the primary logstash config directory. [source,ruby] ---------------------------------- input { okta_enterprise { schedule => { every => "30s" } state_file_base => "/etc/logstash/state_file/okta_base_" # A file can also be used instead of environment variable. auth_token_file => "/path/to/security/creds" url => "https://uri.okta.com/api/v1/events" # Metadata can be stored in the same way as the http_poller metadata_target => "metadata" # Data can be stored in any arbitrary key target => "target" } } output { stdout { codec => rubydebug } } ---------------------------------- If you have a self signed cert you will need to convert your server's certificate to a valid# `.jks` or `.p12` file. An easy way to do it is to run the following one-liner, substituting your server's URL for the placeholder `MYURL` and `MYPORT`. [source,ruby] ---------------------------------- openssl s_client -showcerts -connect MYURL:MYPORT </dev/null 2>/dev/null|openssl x509 -outform PEM > downloaded_cert.pem; keytool -import -alias test -file downloaded_cert.pem -keystore downloaded_truststore.jks ---------------------------------- The above snippet will create two files `downloaded_cert.pem` and `downloaded_truststore.jks`. You will be prompted to set a password for the `jks` file during this process. To configure logstash use a config like the one that follows. [source,ruby] ----------------------------------
input {
okta_enterprise { ... truststore => "/path/to/downloaded_truststore.jks" truststore_password => "mypassword" }
}
----------------------------------
Constants
- Schedule_types
Public Instance Methods
register()
click to toggle source
# File lib/logstash/inputs/okta_enterprise.rb, line 189 def register unless (@accept_deprecation_notice) msg = "The Okta Events API (and this plugin) have been deprecated. For more info: " + "https://github.com/SecurityRiskAdvisors/logstash-input-okta_enterprise/blob/master/docs/Migration.md. " + "Instructions to proceed can be found there." @logger.fatal(msg) raise LogStash::ConfigurationError, msg end if (@auth_token_env and @auth_token_file) raise LogStash::ConfigurationError, "auth_token_file and auth_token_env" + "cannot be set. Please select one for use." end unless (@auth_token_env or @auth_token_file) auth_message = "Both auth_token_file and auth_token_env cannot be empty."+ "Please select one for use." raise LogStash::ConfigurationError, auth_message end if (@auth_token_file) begin if (File.size(@auth_token_file) > MAX_AUTH_TOKEN_FILE_SIZE) raise LogStash::ConfigurationError, "The auth_token file is too large to map" else @auth_token = File.read(@auth_token_file).chomp @logger.info("Successfully opened auth_token_file",:auth_token_file => @auth_token_file) end rescue LogStash::ConfigurationError raise # Some clean up magic to cover the stuff below. # This will keep me from stomping on signal interrupts and ctrl+c rescue SignalException raise rescue Exception => e # This is currently a bug in logstash, confirmed here: # https://discuss.elastic.co/t/logstash-configurationerror-but-configurationok-logstash-2-4-0/65727/2 # Will need to determine the best way to handle this # Rather than testing all error conditions, this can just display them. # Should figure out a way to display this in a better fashion. raise LogStash::ConfigurationError, e.inspect end else (@auth_token_env) @auth_token = @auth_token_env end unless (@auth_token.index(/[^A-Za-z0-9\-_~]/).nil?) raise LogStash::ConfigurationError, "The auth_token should be" + "unreserved characters only, please check the token to ensure it is correct." end if (@start_date and @filter) raise LogStash::ConfigurationError, "You can only set either" + "start_date or filter." end if (@start_date) begin @start_date = DateTime.parse(@start_date).rfc3339(3) rescue ArgumentError => e raise LogStash::ConfigurationError, "start_date must be of the form " + "yyyy-MM-dd’‘T’‘HH:mm:ss.SSSZZ, e.g. 2013-01-01T12:00:00.000-07:00." end @start_date = CGI.escape(@start_date) end if (@filter) @filter = CGI.escape(@filter) end @noisy_log = method(:open_log) if (@log_throttle) if (@log_throttle > FIXNUM_RESET_SIZE) raise LogStash::ConfigurationError, "Config log_throttle must be" + "less than #{FIXNUM_RESET_SIZE}." end @noisy_log = method(:throttled_log) @throttle_counter = 0 end if (@logger.debug?) @noisy_log = method(:open_log) end begin if (@logger.trace?) @noisy_log = method(:open_log) end rescue NoMethodError # Do nothing b/c it doesn't really matter, it retains compatability with 2.4 vs higher end if (@state_file_base) dir_name = File.dirname(@state_file_base) ## Generally the state file directory will have the correct permissions ## so check for that case first. if (File.readable?(dir_name) and File.executable?(dir_name) and File.writable?(dir_name)) if (Dir[@state_file_base + "*"].size > 1) raise LogStash::ConfigurationError, "There is more than one file" + "in the state file base dir (possibly an error?)." + "Please keep the latest/most relevant file" end @state_file = Dir[@state_file_base + "*"].last else ## Build one message for the rest of the issues access_message = "Could not access the state file dir" + "#{dir_name} for the following reasons: " unless (File.readable?(dir_name)) access_message << "Cannot read #{dir_name}." end unless (File.executable?(dir_name)) access_message << "Cannot list directory or perform special" + "operations on #{dir_name}." end unless (File.writable?(dir_name)) access_message << "Cannot write to #{dir_name}." end access_message << "Please provide the appropriate permissions." raise LogStash::ConfigurationError, access_message end if (@state_file) ## Only wanna pull the base64 encoded url outta there unless (@state_file.eql?(@state_file_base + "start")) regex_state_file = %r{(?<state_file>#{@state_file_base}) (?<state>(?:[A-Za-z0-9_-]{4})+(?:[A-Za-z0-9_-]{2}==|[A-Za-z0-9_-]{3}=)?)}x state_url = Base64.urlsafe_decode64(@state_file.slice(regex_state_file,'state')) unless (state_url =~ /^#{@url}.*/) raise LogStash::ConfigurationError, "State file does not match #{@url}. " + "Please ensure the state file is correct: #{state_url}." end @url = Base64.urlsafe_decode64(@state_file.slice(regex_state_file,'state')) end else begin @state_file = @state_file_base + "start" # 'touch' a file to keep the conditional from happening later File.open(@state_file, "w") {} @logger.info("Created base state_file", :state_file => @state_file) rescue Exception => e raise LogStash::ConfigurationError, "Could not create #{@statefile}. " + "Error: #{e.inspect}." end end end params_event = Hash.new params_event[:limit] = @chunk_size if @chunk_size > 0 params_event[:startDate] = @start_date if @start_date params_event[:filter] = @filter if @filter if (!@url.to_s.include?('?') and params_event.count > 0) @url = "#{@url}?" + params_event.to_a.map { |arr|"#{arr[0]}=#{arr[1]}" }.join('&') end @logger.debug("Created initial URL to call", :url => @url) @host = Socket.gethostname end
run(queue)
click to toggle source
# File lib/logstash/inputs/okta_enterprise.rb, line 360 def run(queue) msg_invalid_schedule = "Invalid config. schedule hash must contain " + "exactly one of the following keys - cron, at, every or in" raise LogStash::ConfigurationError, msg_invalid_schedule if @schedule.keys.length !=1 schedule_type = @schedule.keys.first schedule_value = @schedule[schedule_type] raise LogStash::ConfigurationError, msg_invalid_schedule unless Schedule_types.include?(schedule_type) @scheduler = Rufus::Scheduler.new(:max_work_threads => 1) #as of v3.0.9, :first_in => :now doesn't work. Use the following workaround instead opts = schedule_type == "every" ? { :first_in => 0.01 } : {} opts[:overlap] = false; @scheduler.send(schedule_type, schedule_value, opts) { run_once(queue) } @scheduler.join end
stop()
click to toggle source
# File lib/logstash/inputs/okta_enterprise.rb, line 657 def stop # nothing to do in this case so it is not necessary to define stop # examples of common "stop" tasks: # * close sockets (unblocking blocking reads/accepts) # * cleanup temporary files # * terminate spawned threads begin @scheduler.stop rescue NoMethodError => e unless (e.message == "undefined method `stop' for nil:NilClass") raise end rescue Exception => e @logger.warn("Undefined error", :exception => e.inspect) raise ensure if (@state_file_base) new_file = @state_file_base + Base64.urlsafe_encode64(@url) if (@state_file != new_file ) begin File.rename(@state_file,new_file) rescue SignalException raise rescue Exception => e @logger.fatal("Could not rename file", :old_file => @state_file, :new_file => new_file, :exception => e.inspect) raise end @state_file = new_file end end end end
Private Instance Methods
apply_metadata(event, requested_url, response=nil, exec_time=nil)
click to toggle source
# File lib/logstash/inputs/okta_enterprise.rb, line 618 def apply_metadata(event, requested_url, response=nil, exec_time=nil) return unless @metadata_target m = {} m = { "host" => @host, "url" => requested_url, "runtime_seconds" => exec_time } if response m["code"] = response.code m["response_headers"] = response.headers m["response_message"] = response.message m["retry_count"] = response.times_retried end event.set(@metadata_target,m) end
handle_failure(queue, exception, requested_url, exec_time)
click to toggle source
# File lib/logstash/inputs/okta_enterprise.rb, line 598 def handle_failure(queue, exception, requested_url, exec_time) @continue = false @logger.warn("Client Connection Error", :exception => exception.inspect) event = LogStash::Event.new apply_metadata(event, requested_url, nil, exec_time) event.set("http_request_failure", { "Okta-Plugin-Status" => "Client Connection Error", "Connection-Error" => exception.message, "backtrace" => exception.backtrace }) event.tag("_http_request_failure") decorate(event) queue << event end
handle_success(queue, response, requested_url, exec_time)
click to toggle source
# File lib/logstash/inputs/okta_enterprise.rb, line 440 def handle_success(queue, response, requested_url, exec_time) @continue = false case response.code when 200 ## Some benchmarking code for reasonings behind the methods. ## They aren't great benchmarks, but basic ones that proved a point. ## If anyone has better/contradicting results let me know # ## Some system info on which these tests were run: #$ cat /proc/cpuinfo | grep -i "model name" | uniq -c # 4 model name : Intel(R) Core(TM) i7-3740QM CPU @ 2.70GHz # #$ free -m # total used free shared buff/cache available # Mem: 1984 925 372 8 686 833 # Swap: 2047 0 2047 # #str = '<https://dev-instance.oktapreview.com/api/v1/events?after=tevHLxinRbATJeKgKjgXGXy0Q1479278142000&limit=1000>; rel="next"' #require "benchmark" # # #n = 50000000 # # #Benchmark.bm do |x| # x.report { n.times { str.include?('rel="next"') } } # (2) 23.008853sec @50000000 times # x.report { n.times { str.end_with?('rel="next"') } } # (1) 16.894623sec @50000000 times # x.report { n.times { str =~ /rel="next"$/ } } # (3) 30.757554sec @50000000 times #end # #Benchmark.bm do |x| # x.report { n.times { str.match(/<([^>]+)>/).captures[0] } } # (2) 262.166085sec @50000000 times # x.report { n.times { str.split(';')[0][1...-1] } } # (1) 31.673270sec @50000000 times #end ## This feels like gross code Array(response.headers["link"]).each do |link_header| if link_header.end_with?('rel="next"') @url = link_header.split(';')[0][1...-1] end end if (response.body.length > 0) @codec.decode(response.body) do |decoded| event = @target ? LogStash::Event.new(@target => decoded.to_hash) : decoded apply_metadata(event, requested_url, response, exec_time) decorate(event) queue << event end else @codec.decode("{}") do |decoded| event = @target ? LogStash::Event.new(@target => decoded.to_hash) : decoded apply_metadata(event, requested_url, response, exec_time) decorate(event) queue << event end end if (Array(response.headers["link"]).count > 1) @continue = true @logger.debug("Continue status", :continue => @continue ) end @noisy_log.call("Successful response returned",:code => response.code, :headers => response.headers) @logger.debug("Response body", :body => response.body) when 401 @codec.decode(response.body) do |decoded| event = @target ? LogStash::Event.new(@target => decoded.to_hash) : decoded apply_metadata(event, requested_url, response, exec_time) event.set("Okta-Plugin-Status","Auth_token supplied is not valid, " + "validate the auth_token and update the plugin config.") event.set("HTTP-Code",401) event.tag("_okta_response_error") decorate(event) queue << event end @logger.error("Authentication required, check auth_code", :code => response.code, :headers => response.headers) @logger.debug("Authentication failed body", :body => response.body) when 400 if (response.body.include?("E0000031")) @codec.decode(response.body) do |decoded| event = @target ? LogStash::Event.new(@target => decoded.to_hash) : decoded apply_metadata(event, requested_url, response, exec_time) event.set("Okta-Plugin-Status","Filter string was not valid.") event.set("HTTP-Code",400) event.tag("_okta_response_error") decorate(event) queue << event end @logger.error("Filter string was not valid", :response_code => response.code, :okta_error => "E0000031", :filter_string => @filter) @logger.debug("Filter string error response", :response_body => response.body, :response_headers => response.headers) elsif (response.body.include?("E0000030")) @codec.decode(response.body) do |decoded| event = @target ? LogStash::Event.new(@target => decoded.to_hash) : decoded apply_metadata(event, requested_url, response, exec_time) event.set("Okta-Plugin-Status","Date was not formatted correctly.") event.set("HTTP-Code",400) event.tag("_okta_response_error") decorate(event) queue << event end @logger.error("Date was not formatted correctly", :response_code => response.code, :okta_error => "E0000030", :date_string => @start_date) @logger.debug("Start date error response", :response_body => response.body, :response_headers => response.headers) ## If the Okta error code does not match known codes ## Process it as a generic error else handle_unknown_http_code(queue,response,requested_url,exec_time) end else handle_unknown_http_code(queue,response,requested_url,exec_time) end end
handle_unknown_http_code(queue,response,requested_url,exec_time)
click to toggle source
# File lib/logstash/inputs/okta_enterprise.rb, line 580 def handle_unknown_http_code(queue,response,requested_url,exec_time) @codec.decode(response.body) do |decoded| event = @target ? LogStash::Event.new(@target => decoded.to_hash) : decoded apply_metadata(event, requested_url, response, exec_time) event.set("Okta-Plugin-Status","Unknown error, see Okta error") event.set("HTTP-Code",response.code) event.tag("_okta_response_error") decorate(event) queue << event end @logger.error("Okta API Error", :http_code => response.code, :body => response.body, :headers => response.headers) end
open_log(message, vars)
click to toggle source
# File lib/logstash/inputs/okta_enterprise.rb, line 652 def open_log(message, vars) @logger.info(message, vars) end
request_async(queue)
click to toggle source
# File lib/logstash/inputs/okta_enterprise.rb, line 389 def request_async(queue) @continue = true accept = "application/json" content_type = "application/json" begin while @continue and !stop? @logger.debug("Calling URL", :url => @url, :token_set => @auth_token.length > 0, :accept => accept, :content_type => content_type) started = Time.now client.async.get(@url.to_s, headers: {"Authorization" => "SSWS #{@auth_token}", "Accept" => accept, "Content-Type" => content_type }). on_success { |response | handle_success(queue, response, @url, Time.now - started) }. on_failure { |exception | handle_failure(queue, exception, @url, Time.now - started) } client.execute! end rescue Exception => e raise e ensure if (@state_file_base) new_file = @state_file_base + Base64.urlsafe_encode64(@url) if (@state_file != new_file ) begin File.rename(@state_file,new_file) rescue SignalException raise rescue Exception => e @logger.fatal("Could not rename file", :old_file => @state_file, :new_file => new_file, :exception => e.inspect) raise end @state_file = new_file end end end end
run_once(queue)
click to toggle source
# File lib/logstash/inputs/okta_enterprise.rb, line 382 def run_once(queue) request_async(queue) end
throttled_log(message, vars = {})
click to toggle source
# File lib/logstash/inputs/okta_enterprise.rb, line 640 def throttled_log(message, vars = {}) if (@throttle_counter < 3 or @throttle_counter % @log_throttle == 0 or @throttle_counter >= FIXNUM_RESET_SIZE) @logger.info(message, vars) if (@throttle_counter >= FIXNUM_RESET_SIZE) @throttle_counter = 0 end end @throttle_counter += 1 end