class Fluent::KinesisAltOutput

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kinesis_alt.rb, line 5
def initialize
  super
  require 'aws-sdk'
  require 'base64'
  require 'json'
  require 'logger'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kinesis_alt.rb, line 29
def configure(conf)
  super

  [:aws_key_id, :aws_sec_key, :region, :stream_name].each do |name|
    unless self.instance_variable_get("@#{name}")
      raise ConfigError, "'#{name}' is required"
    end
  end

  unless @partition_key or @partition_key_proc
    raise ConfigError, "'partition_key' or 'partition_key_proc' is required"
  end

  if @partition_key_proc
    @partition_key_proc = eval(@partition_key_proc)
  end

  if @explicit_hash_key_proc
    @explicit_hash_key_proc = eval(@explicit_hash_key_proc)
  end
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_kinesis_alt.rb, line 62
def format(tag, time, record)
  record['__tag']  = tag  if @include_tag
  record['__time'] = time if @include_time

  # XXX: The maximum size of the data blob is 50 kilobytes
  # http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html
  data = {
    :stream_name   => @stream_name,
    :data          => encode64(record.to_json),
    :partition_key => get_key(:partition_key, record)
  }

  if @explicit_hash_key or @explicit_hash_key_proc
    data[:explicit_hash_key] = get_key(:explicit_hash_key, record)
  end

  if @sequence_number_for_ordering
    data[:sequence_number_for_ordering] = @sequence_number_for_ordering
  end

  pack_data(data)
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kinesis_alt.rb, line 58
def shutdown
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kinesis_alt.rb, line 51
def start
  super
  configure_aws
  @client = AWS.kinesis.client
  @client.describe_stream(:stream_name => @stream_name)
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_kinesis_alt.rb, line 85
def write(chunk)
  buf = chunk.read

  while (data = unpack_data(buf))
    AWS.kinesis.client.put_record(data)
  end
end

Private Instance Methods

configure_aws() click to toggle source
# File lib/fluent/plugin/out_kinesis_alt.rb, line 95
def configure_aws
  options = {
    :access_key_id     => @aws_key_id,
    :secret_access_key => @aws_sec_key,
    :region            => @region
  }

  if @debug
    options.update(
      :logger          => Logger.new($log.out),
      :log_level       => :debug,
      #:http_wire_trace => true
    )
  end

  AWS.config(options)
end
encode64(str) click to toggle source
# File lib/fluent/plugin/out_kinesis_alt.rb, line 141
def encode64(str)
  Base64.encode64(str).delete("\n")
end
get_key(name, record) click to toggle source
# File lib/fluent/plugin/out_kinesis_alt.rb, line 113
def get_key(name, record)
  key = self.instance_variable_get("@#{name}")
  key_proc = self.instance_variable_get("@#{name}_proc")

  value = key ? record[key] : record

  if key_proc
    value = key_proc.arity.zero? ? key_proc.call : key_proc.call(value)
  end

  value.to_s
end
pack_data(data) click to toggle source
# File lib/fluent/plugin/out_kinesis_alt.rb, line 126
def pack_data(data)
  data = data.to_msgpack(data)
  data.force_encoding('ascii-8bit')
  [data.length].pack('L') + data
end
unpack_data(buf) click to toggle source
# File lib/fluent/plugin/out_kinesis_alt.rb, line 132
def unpack_data(buf)
  return nil if buf.empty?

  buf.force_encoding('ascii-8bit')
  length = buf.slice!(0, 4).unpack('L').first
  data = buf.slice!(0, length)
  MessagePack.unpack(data)
end