class Fluent::Plugin::DynamoOutput
Constants
- BATCHWRITE_CONTENT_SIZE_LIMIT
- BATCHWRITE_ITEM_LIMIT
- DEFAULT_BUFFER_TYPE
Public Instance Methods
batch_put_records(records)
click to toggle source
# File lib/fluent/plugin/out_dynamo.rb, line 135 def batch_put_records(records) @dynamo_db.batch_write_item(request_items: { @dynamo_db_table => records }) end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_dynamo.rb, line 34 def configure(conf) compat_parameters_convert(conf, :buffer) super @timef = Fluent::TimeFormatter.new(@time_format, @localtime) end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_dynamo.rb, line 88 def format(tag, time, record) if !record.key?(@hash_key.attribute_name) record[@hash_key.attribute_name] = UUIDTools::UUID.timestamp_create.to_s end match_type!(@hash_key, record) formatted_time = @timef.format(time) if @range_key if !record.key?(@range_key.attribute_name) record[@range_key.attribute_name] = formatted_time end match_type!(@range_key, record) end record['time'] = formatted_time if @add_time_attribute record.to_msgpack end
formatted_to_msgpack_binary?()
click to toggle source
# File lib/fluent/plugin/out_dynamo.rb, line 106 def formatted_to_msgpack_binary? true end
match_type!(key, record)
click to toggle source
# File lib/fluent/plugin/out_dynamo.rb, line 78 def match_type!(key, record) if key.key_type == "NUMBER" potential_value = record[key.attribute_name].to_i if potential_value == 0 log.fatal "Failed attempt to cast hash_key to Integer." end record[key.attribute_name] = potential_value end end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_dynamo.rb, line 110 def multi_workers_ready? true end
restart_session(options)
click to toggle source
# File lib/fluent/plugin/out_dynamo.rb, line 65 def restart_session(options) @dynamo_db = Aws::DynamoDB::Client.new(options) @resource = Aws::DynamoDB::Resource.new(client: @dynamo_db) end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_dynamo.rb, line 41 def start options = {} if @aws_key_id && @aws_sec_key options[:access_key_id] = @aws_key_id options[:secret_access_key] = @aws_sec_key end options[:region] = @dynamo_db_region if @dynamo_db_region options[:endpoint] = @dynamo_db_endpoint options[:proxy_uri] = @proxy_uri if @proxy_uri super begin restart_session(options) valid_table(@dynamo_db_table) rescue Fluent::ConfigError => e log.fatal "ConfigError: Please check your configuration, then restart fluentd. '#{e}'" exit! rescue Exception => e log.fatal "UnknownError: '#{e}'" exit! end end
valid_table(table_name)
click to toggle source
# File lib/fluent/plugin/out_dynamo.rb, line 71 def valid_table(table_name) table = @resource.table(table_name) @hash_key = table.key_schema.select{|e| e.key_type == "HASH" }.first range_key_candidate = table.key_schema.select{|e| e.key_type == "RANGE" } @range_key = range_key_candidate.first if range_key_candidate end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_dynamo.rb, line 114 def write(chunk) batch_size = 0 batch_records = [] chunk.msgpack_each {|record| batch_records << { put_request: { item: record } } batch_size += record.to_json.length # FIXME: heuristic if batch_records.size >= BATCHWRITE_ITEM_LIMIT || batch_size >= BATCHWRITE_CONTENT_SIZE_LIMIT batch_put_records(batch_records) batch_records.clear batch_size = 0 end } unless batch_records.empty? batch_put_records(batch_records) end end