class Fluent::CloudtrailInput

Constants

PLUGIN_VERSION
USER_AGENT_NAME

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cloudtrail.rb, line 42
def initialize
  super
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cloudtrail.rb, line 38
def configure(conf)
  super
end
load_clients() click to toggle source
# File lib/fluent/plugin/in_cloudtrail.rb, line 59
def load_clients
  user_agent_suffix = "#{USER_AGENT_NAME}/#{PLUGIN_VERSION}"
  options = {
    user_agent_suffix: user_agent_suffix
  }
  if @region
    options[:region] = @region
  end

  if @aws_key_id && @aws_sec_key
    options.update(
      access_key_id: @aws_key_id,
      secret_access_key: @aws_sec_key,
    )
  elsif @profile
    credentials_opts = {:profile_name => @profile}
    credentials_opts[:path] = @credentials_path if @credentials_path
    credentials = Aws::SharedCredentials.new(credentials_opts)
    options[:credentials] = credentials
  elsif @role_arn
    credentials = Aws::AssumeRoleCredentials.new(
      client: Aws::STS::Client.new(options),
      role_arn: @role_arn,
      role_session_name: "fluent-plugin-cloudtrail",
      external_id: @external_id,
      duration_seconds: 60 * 60,
    )
    options[:credentials] = credentials
  end

  if @debug
    options.update(
      logger: Logger.new(log.out),
      log_level: :debug
    )
    # XXX: Add the following options, if necessary
    # :http_wire_trace => true
  end

  if @http_proxy
    options[:http_proxy] = @http_proxy
  end

  @s3_client = Aws::S3::Client.new(options)
  @sqs_client = Aws::SQS::Client.new(options)
end
run_periodic() click to toggle source
# File lib/fluent/plugin/in_cloudtrail.rb, line 106
def run_periodic
  until @finished
    begin
      sleep @receive_interval
      sqs_resp = @sqs_client.receive_message(
        queue_url: @sqs_url,
        max_number_of_messages: @max_number_of_messages,
        wait_time_seconds: @wait_time_seconds
      )
      for message in sqs_resp.messages
        body_obj = JSON.parse(message.body)
        message_obj = JSON.parse(body_obj['Message'])
        s3_bucket = message_obj['s3Bucket']
        for s3_object_key in message_obj['s3ObjectKey']
          s3_resp = @s3_client.get_object(
            :bucket => s3_bucket,
            :key => s3_object_key
          )
          io = StringIO.new
          io.write s3_resp.body.read
          io.rewind
          gz = Zlib::GzipReader.new(io)
          cloudtrail_data = gz.read
          gz.close
          cloudtrail_records = JSON.parse(cloudtrail_data)['Records']
          for record in cloudtrail_records
            router.emit(@tag, Time.now.to_i, record)
          end
        end

        @sqs_client.delete_message(
          queue_url: @sqs_url,
          receipt_handle: message.receipt_handle
        )
      end
    rescue
      log.error "failed to emit", :error => $!.to_s, :error_class => $!.class.to_s
      log.warn_backtrace $!.backtrace
    end
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cloudtrail.rb, line 53
def shutdown
  super
  @finished = true
  @thread.join
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cloudtrail.rb, line 46
def start
  super
  load_clients
  @finished = false
  @thread = Thread.new(&method(:run_periodic))
end