class Fluent::KinesisAltOutput
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kinesis_alt.rb, line 5 def initialize super require 'aws-sdk' require 'base64' require 'json' require 'logger' end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kinesis_alt.rb, line 29 def configure(conf) super [:aws_key_id, :aws_sec_key, :region, :stream_name].each do |name| unless self.instance_variable_get("@#{name}") raise ConfigError, "'#{name}' is required" end end unless @partition_key or @partition_key_proc raise ConfigError, "'partition_key' or 'partition_key_proc' is required" end if @partition_key_proc @partition_key_proc = eval(@partition_key_proc) end if @explicit_hash_key_proc @explicit_hash_key_proc = eval(@explicit_hash_key_proc) end end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_kinesis_alt.rb, line 62 def format(tag, time, record) record['__tag'] = tag if @include_tag record['__time'] = time if @include_time # XXX: The maximum size of the data blob is 50 kilobytes # http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html data = { :stream_name => @stream_name, :data => encode64(record.to_json), :partition_key => get_key(:partition_key, record) } if @explicit_hash_key or @explicit_hash_key_proc data[:explicit_hash_key] = get_key(:explicit_hash_key, record) end if @sequence_number_for_ordering data[:sequence_number_for_ordering] = @sequence_number_for_ordering end pack_data(data) end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kinesis_alt.rb, line 58 def shutdown super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kinesis_alt.rb, line 51 def start super configure_aws @client = AWS.kinesis.client @client.describe_stream(:stream_name => @stream_name) end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_kinesis_alt.rb, line 85 def write(chunk) buf = chunk.read while (data = unpack_data(buf)) AWS.kinesis.client.put_record(data) end end
Private Instance Methods
configure_aws()
click to toggle source
# File lib/fluent/plugin/out_kinesis_alt.rb, line 95 def configure_aws options = { :access_key_id => @aws_key_id, :secret_access_key => @aws_sec_key, :region => @region } if @debug options.update( :logger => Logger.new($log.out), :log_level => :debug, #:http_wire_trace => true ) end AWS.config(options) end
encode64(str)
click to toggle source
# File lib/fluent/plugin/out_kinesis_alt.rb, line 141 def encode64(str) Base64.encode64(str).delete("\n") end
get_key(name, record)
click to toggle source
# File lib/fluent/plugin/out_kinesis_alt.rb, line 113 def get_key(name, record) key = self.instance_variable_get("@#{name}") key_proc = self.instance_variable_get("@#{name}_proc") value = key ? record[key] : record if key_proc value = key_proc.arity.zero? ? key_proc.call : key_proc.call(value) end value.to_s end
pack_data(data)
click to toggle source
# File lib/fluent/plugin/out_kinesis_alt.rb, line 126 def pack_data(data) data = data.to_msgpack(data) data.force_encoding('ascii-8bit') [data.length].pack('L') + data end
unpack_data(buf)
click to toggle source
# File lib/fluent/plugin/out_kinesis_alt.rb, line 132 def unpack_data(buf) return nil if buf.empty? buf.force_encoding('ascii-8bit') length = buf.slice!(0, 4).unpack('L').first data = buf.slice!(0, length) MessagePack.unpack(data) end