class Fluent::CloudtrailInput
Constants
- PLUGIN_VERSION
- USER_AGENT_NAME
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cloudtrail.rb, line 42 def initialize super end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cloudtrail.rb, line 38 def configure(conf) super end
load_clients()
click to toggle source
# File lib/fluent/plugin/in_cloudtrail.rb, line 59 def load_clients user_agent_suffix = "#{USER_AGENT_NAME}/#{PLUGIN_VERSION}" options = { user_agent_suffix: user_agent_suffix } if @region options[:region] = @region end if @aws_key_id && @aws_sec_key options.update( access_key_id: @aws_key_id, secret_access_key: @aws_sec_key, ) elsif @profile credentials_opts = {:profile_name => @profile} credentials_opts[:path] = @credentials_path if @credentials_path credentials = Aws::SharedCredentials.new(credentials_opts) options[:credentials] = credentials elsif @role_arn credentials = Aws::AssumeRoleCredentials.new( client: Aws::STS::Client.new(options), role_arn: @role_arn, role_session_name: "fluent-plugin-cloudtrail", external_id: @external_id, duration_seconds: 60 * 60, ) options[:credentials] = credentials end if @debug options.update( logger: Logger.new(log.out), log_level: :debug ) # XXX: Add the following options, if necessary # :http_wire_trace => true end if @http_proxy options[:http_proxy] = @http_proxy end @s3_client = Aws::S3::Client.new(options) @sqs_client = Aws::SQS::Client.new(options) end
run_periodic()
click to toggle source
# File lib/fluent/plugin/in_cloudtrail.rb, line 106 def run_periodic until @finished begin sleep @receive_interval sqs_resp = @sqs_client.receive_message( queue_url: @sqs_url, max_number_of_messages: @max_number_of_messages, wait_time_seconds: @wait_time_seconds ) for message in sqs_resp.messages body_obj = JSON.parse(message.body) message_obj = JSON.parse(body_obj['Message']) s3_bucket = message_obj['s3Bucket'] for s3_object_key in message_obj['s3ObjectKey'] s3_resp = @s3_client.get_object( :bucket => s3_bucket, :key => s3_object_key ) io = StringIO.new io.write s3_resp.body.read io.rewind gz = Zlib::GzipReader.new(io) cloudtrail_data = gz.read gz.close cloudtrail_records = JSON.parse(cloudtrail_data)['Records'] for record in cloudtrail_records router.emit(@tag, Time.now.to_i, record) end end @sqs_client.delete_message( queue_url: @sqs_url, receipt_handle: message.receipt_handle ) end rescue log.error "failed to emit", :error => $!.to_s, :error_class => $!.class.to_s log.warn_backtrace $!.backtrace end end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cloudtrail.rb, line 53 def shutdown super @finished = true @thread.join end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cloudtrail.rb, line 46 def start super load_clients @finished = false @thread = Thread.new(&method(:run_periodic)) end