class Fluent::Plugin::LambdaOutput

Constants

DEFAULT_BUFFER_TYPE

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_lambda.rb, line 29
def initialize
  super
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_lambda.rb, line 33
def configure(conf)
  compat_parameters_convert(conf, :buffer, :inject)
  super

  aws_opts = {}

  if @profile
    credentials_opts = {:profile_name => @profile}
    credentials_opts[:path] = @credentials_path if @credentials_path
    credentials = Aws::SharedCredentials.new(credentials_opts)
    aws_opts[:credentials] = credentials
  end

  if @group_events
    raise Fluent::ConfigError, "could not group events without 'function_name'" if @function_name.nil?
  end

  aws_opts[:access_key_id] = @aws_key_id if @aws_key_id
  aws_opts[:secret_access_key] = @aws_sec_key if @aws_sec_key
  aws_opts[:region] = @region if @region
  aws_opts[:endpoint] = @endpoint if @endpoint

  configure_aws(aws_opts)
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_lambda.rb, line 68
def format(tag, time, record)
  [tag, time, record].to_msgpack
end
formatted_to_msgpack_binary() click to toggle source
# File lib/fluent/plugin/out_lambda.rb, line 64
def formatted_to_msgpack_binary
  true
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_lambda.rb, line 58
def start
  super

  @client = create_client
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_lambda.rb, line 72
def write(chunk)
  chunk = chunk.to_enum(:msgpack_each)
  if @group_events
    write_batch(chunk)
  else
    write_by_one(chunk)
  end
end

Private Instance Methods

configure_aws(options) click to toggle source
# File lib/fluent/plugin/out_lambda.rb, line 83
def configure_aws(options)
  Aws.config.update(options)
end
create_client() click to toggle source
# File lib/fluent/plugin/out_lambda.rb, line 87
def create_client
  Aws::Lambda::Client.new
end
write_batch(chunk) click to toggle source
# File lib/fluent/plugin/out_lambda.rb, line 91
def write_batch(chunk) 
  func_name = @function_name
  chunk.group_by {|tag, time, record| 
    tag
  }.each{|key, group|
    events = []
    group.each do |time, tag, record|
      events << record
    end
    @client.invoke({
      :function_name => func_name,
      :payload => JSON.dump(events),
      :invocation_type => 'Event',
    })
  }
end
write_by_one(chunk) click to toggle source
# File lib/fluent/plugin/out_lambda.rb, line 108
def write_by_one(chunk)
  chunk.select {|tag, time, record|
    if @function_name or record['function_name']
      true
    else
      log.warn("`function_name` key does not exist: #{[tag, time, record].inspect}")
      false
    end
  }.each {|tag, time, record|
    record = inject_values_to_record(tag, time, record)
    func_name = @function_name || record['function_name']

    payload = {
      :function_name => func_name,
      :payload => JSON.dump(record),
      :invocation_type => 'Event',
    }
    payload[:qualifier] = @qualifier unless @qualifier.nil?

    @client.invoke(payload)
  }
end