class LogStash::Inputs::OpenWhisk
This Logstash input plugin allows you to drain OpenWhisk
Activation logs, decoding the output into event(s), and send them on their merry way. Using the OpenWhisk
platform API, we poll the activation logs API according to the config schedule. This plugin borrows heavily from the HTTP Poller input plugin.
Example¶ ↑
Drain logs from an OpenWhisk
platform instance. The config should look like this:
- source,ruby
input {
openwhisk { # Mandatory Configuration Parameters hostname => "openwhisk.ng.bluemix.net" username => "sample_user@email.com" password => "some_password" # Supports "cron", "every", "at" and "in" schedules by rufus scheduler schedule => { cron => "* * * * * UTC"} # Optional Configuration Parameters # Namespace is optional, defaults to user's default namespace. namespace => "" request_timeout => 60 codec => "json" # A hash of request metadata info (timing, response headers, etc.) will be sent here metadata_target => "http_poller_metadata" }
}
output {
stdout { codec => rubydebug }
}
Constants
- Schedule_types
Public Instance Methods
# File lib/logstash/inputs/openwhisk.rb, line 88 def register @host = Socket.gethostname.force_encoding(Encoding::UTF_8) @logger.info("Registering openwhisk Input", :type => @type, :hostname=> @hostname, :interval => @interval, :schedule => @schedule, :timeout => @timeout) # we will start polling for logs since the current epoch @logs_since = Time.now.to_i * 1000 # activation ids from previous poll used to control what is indexed, # we might have overlapping results and don't want to index the same # activations twice. @activation_ids = Set.new end
# File lib/logstash/inputs/openwhisk.rb, line 118 def run(queue) #interval or schedule must be provided. Must be exclusively either one. Not neither. Not both. raise LogStash::ConfigurationError, "Invalid config. Neither interval nor schedule was specified." \ unless @interval || @schedule raise LogStash::ConfigurationError, "Invalid config. Specify only interval or schedule. Not both." \ if @interval && @schedule if @interval setup_interval(queue) elsif @schedule setup_schedule(queue) else #should not reach here raise LogStash::ConfigurationError, "Invalid config. Neither interval nor schedule was specified." end end
# File lib/logstash/inputs/openwhisk.rb, line 103 def stop Stud.stop!(@interval_thread) if @interval_thread @scheduler.stop if @scheduler end
Private Instance Methods
# File lib/logstash/inputs/openwhisk.rb, line 267 def apply_metadata(event, name, request, response=nil, execution_time=nil) return unless @metadata_target event.set(@metadata_target, event_metadata(name, request, response, execution_time)) end
generate HTTP request options for current platform host.
# File lib/logstash/inputs/openwhisk.rb, line 110 def construct_request(opts) url = "https://#{opts['hostname']}/api/v1/namespaces/#{opts['namespace']}/activations" auth = {user: opts['username'], pass: opts['password']} query = {docs: true, limit: 0, skip: 0, since: @logs_since} res = [:get, url, {:auth => auth, :query => query}] end
# File lib/logstash/inputs/openwhisk.rb, line 273 def event_metadata(name, request, response=nil, execution_time=nil) m = { "name" => name, "hostname" => @hostname, "request" => structure_request(request), } m["runtime_seconds"] = execution_time if response m["code"] = response.code m["response_headers"] = response.headers m["response_message"] = response.message m["times_retried"] = response.times_retried end m end
# File lib/logstash/inputs/openwhisk.rb, line 223 def handle_decoded_event(queue, name, request, response, event, execution_time) apply_metadata(event, name, request, response, execution_time) decorate(event) queue << event rescue StandardError, java.lang.Exception => e @logger.error? && @logger.error("Error eventifying response!", :exception => e, :exception_message => e.message, :name => name, :url => request, :response => response ) end
Beware, on old versions of manticore some uncommon failures are not handled
# File lib/logstash/inputs/openwhisk.rb, line 239 def handle_failure(queue, name, request, exception, execution_time) event = LogStash::Event.new apply_metadata(event, name, request) event.tag("_http_request_failure") # This is also in the metadata, but we send it anyone because we want this # persisted by default, whereas metadata isn't. People don't like mysterious errors event.set("http_request_failure", { "request" => structure_request(request), "name" => name, "error" => exception.to_s, "backtrace" => exception.backtrace, "runtime_seconds" => execution_time }) queue << event rescue StandardError, java.lang.Exception => e @logger.error? && @logger.error("Cannot read URL or send the error as an event!", :exception => e, :exception_message => e.message, :exception_backtrace => e.backtrace, :name => name, :url => request ) end
# File lib/logstash/inputs/openwhisk.rb, line 180 def handle_success(queue, name, request, response, execution_time) activation_ids = Set.new @codec.decode(response.body) do |decoded| activation_id = decoded.to_hash["activationId"] ## ignore results we have previously seen if !@activation_ids.include?(activation_id) sanitize(decoded) event = @target ? LogStash::Event.new(@target => decoded.to_hash) : decoded update_logs_since(decoded.to_hash["end"]) handle_decoded_event(queue, name, request, response, event, execution_time) end activation_ids.add(activation_id) end @activation_ids = activation_ids end
# File lib/logstash/inputs/openwhisk.rb, line 167 def request_async(queue, name, request) @logger.debug? && @logger.debug("Fetching URL", :name => name, :url => request) started = Time.now method, *request_opts = request client.async.send(method, *request_opts). on_success {|response| handle_success(queue, name, request, response, Time.now - started)}. on_failure {|exception| handle_failure(queue, name, request, exception, Time.now - started) } end
# File lib/logstash/inputs/openwhisk.rb, line 159 def run_once(queue) request = construct_request({"hostname" => @hostname, "username" => @username, "password" => @password, "namespace" => @namespace}) request_async(queue, "openwhisk", request) client.execute! end
elastic search cannot handle attributes which change types. serialise annotations to JSON strings
# File lib/logstash/inputs/openwhisk.rb, line 203 def sanitize(activation) annotations = activation.get("annotations") annotations.each {|a| a["value"] = a["value"].to_json} activation.set("annotations", annotations) end
# File lib/logstash/inputs/openwhisk.rb, line 136 def setup_interval(queue) @interval_thread = Thread.current Stud.interval(@interval) do run_once(queue) end end
# File lib/logstash/inputs/openwhisk.rb, line 143 def setup_schedule(queue) #schedule hash must contain exactly one of the allowed keys msg_invalid_schedule = "Invalid config. schedule hash must contain " + "exactly one of the following keys - cron, at, every or in" raise Logstash::ConfigurationError, msg_invalid_schedule if @schedule.keys.length !=1 schedule_type = @schedule.keys.first schedule_value = @schedule[schedule_type] raise LogStash::ConfigurationError, msg_invalid_schedule unless Schedule_types.include?(schedule_type) @scheduler = Rufus::Scheduler.new(:max_work_threads => 1) #as of v3.0.9, :first_in => :now doesn't work. Use the following workaround instead opts = schedule_type == "every" ? { :first_in => 0.01 } : {} @scheduler.send(schedule_type, schedule_value, opts) { run_once(queue) } @scheduler.join end
Turn [method, url, spec] requests into a hash for friendlier logging / ES indexing
# File lib/logstash/inputs/openwhisk.rb, line 294 def structure_request(request) method, url, spec = request # Flatten everything into the 'spec' hash, also stringify any keys to normalize Hash[(spec||{}).merge({ "method" => method.to_s, "url" => url, }).map {|k,v| [k.to_s,v] }] end
updates the query parameter for the next request based upon the last activation's end time.
# File lib/logstash/inputs/openwhisk.rb, line 212 def update_logs_since(ms_since_epoch) # actions have a maximum timeout for five minutes max_action_time_ms = 5 * 60 * 1000 next_logs_since = ms_since_epoch - max_action_time_ms if (next_logs_since > @logs_since) @logs_since = next_logs_since end end