class Fluent::DynamodbAltOutput

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_dynamodb_alt.rb, line 28
def initialize
  super
  require 'aws-sdk-core'
  require 'parallel'
  require 'set'
  require 'stringio'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_dynamodb_alt.rb, line 36
def configure(conf)
  super

  aws_opts = {}

  if @profile
    credentials_opts = {:profile_name => @profile}
    credentials_opts[:path] = @credentials_path if @credentials_path
    credentials = Aws::SharedCredentials.new(credentials_opts)
    aws_opts[:credentials] = credentials
  end

  aws_opts[:access_key_id] = @aws_key_id if @aws_key_id
  aws_opts[:secret_access_key] = @aws_sec_key if @aws_sec_key
  aws_opts[:region] = @region if @region
  aws_opts[:endpoint] = @endpoint if @endpoint

  configure_aws(aws_opts)

  client = create_client
  table = client.describe_table(:table_name => @table_name)

  table.table.key_schema.each do |attribute|
    case attribute.key_type
    when 'HASH'
      @hash_key = attribute.attribute_name
    when 'RANGE'
      @range_key = attribute.attribute_name
    else
      raise 'must not happen'
    end
  end

  if @expected
    @expected = parse_expected(@expected)
    log.info("dynamodb_alt expected: #{@expected.inspect}")
  end

  if @binary_keys
    @binary_keys = @binary_keys.strip.split(/\s*,\s*/)
  else
    @binary_keys = []
  end
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_dynamodb_alt.rb, line 87
def format(tag, time, record)
  [tag, time, record].to_msgpack
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_dynamodb_alt.rb, line 81
def start
  super

  @client = create_client
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_dynamodb_alt.rb, line 91
def write(chunk)
  chunk = aggregate_records(chunk)

  block = proc do |tag, time, record|
    if @delete_key and record[@delete_key]
      delete_record(record)
    else
      put_record(record)
    end
  end

  if @concurrency > 1
    Parallel.each(chunk, :in_threads => @concurrency, &block)
  else
    chunk.each(&block)
  end
end

Private Instance Methods

aggregate_records(chunk) click to toggle source
# File lib/fluent/plugin/out_dynamodb_alt.rb, line 230
def aggregate_records(chunk)
  chunk.enum_for(:msgpack_each).select {|tag, time, record|
    validate_record(record)
  }.chunk {|tag, time, record|
    if @range_key
      record.values_at(@hash_key, @range_key)
    else
      record[@hash_key]
    end
  }.map {|primary_key, records|
    records.sort_by {|tag, time, record|
      record[@timestamp_key]
    }.last
  }
end
configure_aws(options) click to toggle source
# File lib/fluent/plugin/out_dynamodb_alt.rb, line 111
def configure_aws(options)
  Aws.config.update(options)
end
convert_binary!(record) click to toggle source
# File lib/fluent/plugin/out_dynamodb_alt.rb, line 246
def convert_binary!(record)
  @binary_keys.each do |key|
    val = record[key]
    record[key] = StringIO.new(val) if val
  end

  return record
end
convert_set!(record) click to toggle source
# File lib/fluent/plugin/out_dynamodb_alt.rb, line 255
def convert_set!(record)
  record.each do |key, val|
    if val.kind_of?(Array)
      record[key] = Set.new(val)
    end
  end

  return record
end
create_client() click to toggle source
# File lib/fluent/plugin/out_dynamodb_alt.rb, line 115
def create_client
  Aws::DynamoDB::Client.new
end
create_expected(record) click to toggle source
# File lib/fluent/plugin/out_dynamodb_alt.rb, line 205
def create_expected(record)
  attrs = {}

  @expected.map do |key, op, val|
    attrs[key] = {:comparison_operator => op}

    if val
      if val.kind_of?(Proc)
        record_val = val.call(record)

        unless record_val
          log.warn("Expected value does not exist in the record: #{record.inspect}")
          return nil
        end

        attrs[key][:attribute_value_list] = [record_val]
      else
        attrs[key][:attribute_value_list] = [val]
      end
    end
  end

  return attrs
end
delete_record(record) click to toggle source
# File lib/fluent/plugin/out_dynamodb_alt.rb, line 142
def delete_record(record)
  key = {@hash_key => record[@hash_key]}
  key[@range_key] = record[@range_key] if @range_key

  item = {
    :table_name => @table_name,
    :key => key
  }

  begin
    if @expected
      expected = create_expected(record)
      return unless expected
      item[:expected] = expected
      item[:conditional_operator] = @conditional_operator if expected.length > 1
    end

    @client.delete_item(item)
  rescue Aws::DynamoDB::Errors::ConditionalCheckFailedException, Aws::DynamoDB::Errors::ValidationException => e
    log.warn("#{e.message}: #{item.inspect}")
  end
end
parse_expected(expected) click to toggle source
# File lib/fluent/plugin/out_dynamodb_alt.rb, line 184
def parse_expected(expected)
  expected.split(',').map do |expr|
    key, op, val = expr.strip.split(/\s+/)

    if val
      if val =~ /\A\$\{(.+)\}\z/
        record_key = $1.inspect
        val = eval("proc {|record| record[#{record_key}] }")
      else
        begin
          val = JSON.parse("[#{val}]").first if val
        rescue JSON::ParserError => e
          raise "Cannot parse the expected expression (#{expr}): #{e.message}"
        end
      end
    end

    [key, op, val]
  end
end
put_record(record) click to toggle source
# File lib/fluent/plugin/out_dynamodb_alt.rb, line 119
def put_record(record)
  convert_binary!(record)
  convert_set!(record)

  item = {
    :table_name => @table_name,
    :item => record
  }

  begin
    if @expected
      expected = create_expected(record)
      return unless expected
      item[:expected] = expected
      item[:conditional_operator] = @conditional_operator if expected.length > 1
    end

    @client.put_item(item)
  rescue Aws::DynamoDB::Errors::ConditionalCheckFailedException, Aws::DynamoDB::Errors::ValidationException => e
    log.warn("#{e.message}: #{item.inspect}")
  end
end
validate_record(record) click to toggle source
# File lib/fluent/plugin/out_dynamodb_alt.rb, line 165
def validate_record(record)
  if not record[@hash_key]
    log.warn("Hash Key '#{@hash_key}' does not exist in the record: #{record.inspect}")
    return false
  end

  if @range_key and not record[@range_key]
    log.warn("Range Key '#{@range_key}' does not exist in the record: #{record.inspect}")
    return false
  end

  if not record[@timestamp_key]
    log.warn("Timestamp Key '#{@timestamp_key}' does not exist in the record: #{record.inspect}")
    return false
  end

  return true
end