class Fluent::DynamodbAltOutput
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_dynamodb_alt.rb, line 28 def initialize super require 'aws-sdk-core' require 'parallel' require 'set' require 'stringio' end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_dynamodb_alt.rb, line 36 def configure(conf) super aws_opts = {} if @profile credentials_opts = {:profile_name => @profile} credentials_opts[:path] = @credentials_path if @credentials_path credentials = Aws::SharedCredentials.new(credentials_opts) aws_opts[:credentials] = credentials end aws_opts[:access_key_id] = @aws_key_id if @aws_key_id aws_opts[:secret_access_key] = @aws_sec_key if @aws_sec_key aws_opts[:region] = @region if @region aws_opts[:endpoint] = @endpoint if @endpoint configure_aws(aws_opts) client = create_client table = client.describe_table(:table_name => @table_name) table.table.key_schema.each do |attribute| case attribute.key_type when 'HASH' @hash_key = attribute.attribute_name when 'RANGE' @range_key = attribute.attribute_name else raise 'must not happen' end end if @expected @expected = parse_expected(@expected) log.info("dynamodb_alt expected: #{@expected.inspect}") end if @binary_keys @binary_keys = @binary_keys.strip.split(/\s*,\s*/) else @binary_keys = [] end end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_dynamodb_alt.rb, line 87 def format(tag, time, record) [tag, time, record].to_msgpack end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_dynamodb_alt.rb, line 81 def start super @client = create_client end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_dynamodb_alt.rb, line 91 def write(chunk) chunk = aggregate_records(chunk) block = proc do |tag, time, record| if @delete_key and record[@delete_key] delete_record(record) else put_record(record) end end if @concurrency > 1 Parallel.each(chunk, :in_threads => @concurrency, &block) else chunk.each(&block) end end
Private Instance Methods
aggregate_records(chunk)
click to toggle source
# File lib/fluent/plugin/out_dynamodb_alt.rb, line 230 def aggregate_records(chunk) chunk.enum_for(:msgpack_each).select {|tag, time, record| validate_record(record) }.chunk {|tag, time, record| if @range_key record.values_at(@hash_key, @range_key) else record[@hash_key] end }.map {|primary_key, records| records.sort_by {|tag, time, record| record[@timestamp_key] }.last } end
configure_aws(options)
click to toggle source
# File lib/fluent/plugin/out_dynamodb_alt.rb, line 111 def configure_aws(options) Aws.config.update(options) end
convert_binary!(record)
click to toggle source
# File lib/fluent/plugin/out_dynamodb_alt.rb, line 246 def convert_binary!(record) @binary_keys.each do |key| val = record[key] record[key] = StringIO.new(val) if val end return record end
convert_set!(record)
click to toggle source
# File lib/fluent/plugin/out_dynamodb_alt.rb, line 255 def convert_set!(record) record.each do |key, val| if val.kind_of?(Array) record[key] = Set.new(val) end end return record end
create_client()
click to toggle source
# File lib/fluent/plugin/out_dynamodb_alt.rb, line 115 def create_client Aws::DynamoDB::Client.new end
create_expected(record)
click to toggle source
# File lib/fluent/plugin/out_dynamodb_alt.rb, line 205 def create_expected(record) attrs = {} @expected.map do |key, op, val| attrs[key] = {:comparison_operator => op} if val if val.kind_of?(Proc) record_val = val.call(record) unless record_val log.warn("Expected value does not exist in the record: #{record.inspect}") return nil end attrs[key][:attribute_value_list] = [record_val] else attrs[key][:attribute_value_list] = [val] end end end return attrs end
delete_record(record)
click to toggle source
# File lib/fluent/plugin/out_dynamodb_alt.rb, line 142 def delete_record(record) key = {@hash_key => record[@hash_key]} key[@range_key] = record[@range_key] if @range_key item = { :table_name => @table_name, :key => key } begin if @expected expected = create_expected(record) return unless expected item[:expected] = expected item[:conditional_operator] = @conditional_operator if expected.length > 1 end @client.delete_item(item) rescue Aws::DynamoDB::Errors::ConditionalCheckFailedException, Aws::DynamoDB::Errors::ValidationException => e log.warn("#{e.message}: #{item.inspect}") end end
parse_expected(expected)
click to toggle source
# File lib/fluent/plugin/out_dynamodb_alt.rb, line 184 def parse_expected(expected) expected.split(',').map do |expr| key, op, val = expr.strip.split(/\s+/) if val if val =~ /\A\$\{(.+)\}\z/ record_key = $1.inspect val = eval("proc {|record| record[#{record_key}] }") else begin val = JSON.parse("[#{val}]").first if val rescue JSON::ParserError => e raise "Cannot parse the expected expression (#{expr}): #{e.message}" end end end [key, op, val] end end
put_record(record)
click to toggle source
# File lib/fluent/plugin/out_dynamodb_alt.rb, line 119 def put_record(record) convert_binary!(record) convert_set!(record) item = { :table_name => @table_name, :item => record } begin if @expected expected = create_expected(record) return unless expected item[:expected] = expected item[:conditional_operator] = @conditional_operator if expected.length > 1 end @client.put_item(item) rescue Aws::DynamoDB::Errors::ConditionalCheckFailedException, Aws::DynamoDB::Errors::ValidationException => e log.warn("#{e.message}: #{item.inspect}") end end
validate_record(record)
click to toggle source
# File lib/fluent/plugin/out_dynamodb_alt.rb, line 165 def validate_record(record) if not record[@hash_key] log.warn("Hash Key '#{@hash_key}' does not exist in the record: #{record.inspect}") return false end if @range_key and not record[@range_key] log.warn("Range Key '#{@range_key}' does not exist in the record: #{record.inspect}") return false end if not record[@timestamp_key] log.warn("Timestamp Key '#{@timestamp_key}' does not exist in the record: #{record.inspect}") return false end return true end