class Fluent::Plugin::DynamoOutput

Constants

BATCHWRITE_CONTENT_SIZE_LIMIT
BATCHWRITE_ITEM_LIMIT
DEFAULT_BUFFER_TYPE

Public Instance Methods

batch_put_records(records) click to toggle source
# File lib/fluent/plugin/out_dynamo.rb, line 135
def batch_put_records(records)
  @dynamo_db.batch_write_item(request_items: { @dynamo_db_table => records })
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_dynamo.rb, line 34
def configure(conf)
  compat_parameters_convert(conf, :buffer)
  super

  @timef = Fluent::TimeFormatter.new(@time_format, @localtime)
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_dynamo.rb, line 88
def format(tag, time, record)
  if !record.key?(@hash_key.attribute_name)
    record[@hash_key.attribute_name] = UUIDTools::UUID.timestamp_create.to_s
  end
  match_type!(@hash_key, record)

  formatted_time = @timef.format(time)
  if @range_key
    if !record.key?(@range_key.attribute_name)
      record[@range_key.attribute_name] = formatted_time
    end
    match_type!(@range_key, record)
  end
  record['time'] = formatted_time if @add_time_attribute

  record.to_msgpack
end
formatted_to_msgpack_binary?() click to toggle source
# File lib/fluent/plugin/out_dynamo.rb, line 106
def formatted_to_msgpack_binary?
  true
end
match_type!(key, record) click to toggle source
# File lib/fluent/plugin/out_dynamo.rb, line 78
def match_type!(key, record)
  if key.key_type == "NUMBER"
    potential_value = record[key.attribute_name].to_i
    if potential_value == 0
      log.fatal "Failed attempt to cast hash_key to Integer."
    end
    record[key.attribute_name] = potential_value
  end
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_dynamo.rb, line 110
def multi_workers_ready?
  true
end
restart_session(options) click to toggle source
# File lib/fluent/plugin/out_dynamo.rb, line 65
def restart_session(options)
  @dynamo_db = Aws::DynamoDB::Client.new(options)
  @resource = Aws::DynamoDB::Resource.new(client: @dynamo_db)

end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_dynamo.rb, line 41
def start
  options = {}
  if @aws_key_id && @aws_sec_key
    options[:access_key_id] = @aws_key_id
    options[:secret_access_key] = @aws_sec_key
  end
  options[:region] = @dynamo_db_region if @dynamo_db_region
  options[:endpoint] = @dynamo_db_endpoint
  options[:proxy_uri] = @proxy_uri if @proxy_uri

  super

  begin
    restart_session(options)
    valid_table(@dynamo_db_table)
  rescue Fluent::ConfigError => e
    log.fatal "ConfigError: Please check your configuration, then restart fluentd. '#{e}'"
    exit!
  rescue Exception => e
    log.fatal "UnknownError: '#{e}'"
    exit!
  end
end
valid_table(table_name) click to toggle source
# File lib/fluent/plugin/out_dynamo.rb, line 71
def valid_table(table_name)
  table = @resource.table(table_name)
  @hash_key = table.key_schema.select{|e| e.key_type == "HASH" }.first
  range_key_candidate = table.key_schema.select{|e| e.key_type == "RANGE" }
  @range_key = range_key_candidate.first if range_key_candidate
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_dynamo.rb, line 114
def write(chunk)
  batch_size = 0
  batch_records = []
  chunk.msgpack_each {|record|
    batch_records << {
      put_request: {
        item: record
      }
    }
    batch_size += record.to_json.length # FIXME: heuristic
    if batch_records.size >= BATCHWRITE_ITEM_LIMIT || batch_size >= BATCHWRITE_CONTENT_SIZE_LIMIT
      batch_put_records(batch_records)
      batch_records.clear
      batch_size = 0
    end
  }
  unless batch_records.empty?
    batch_put_records(batch_records)
  end
end