class Kcl::Proxies::KinesisProxy

Attributes

client[R]

Public Class Methods

new(config) click to toggle source
# File lib/kcl/proxies/kinesis_proxy.rb, line 7
def initialize(config)
  @client = Aws::Kinesis::Client.new(
    {
      access_key_id: config.aws_access_key_id,
      secret_access_key: config.aws_secret_access_key,
      region: config.aws_region,
      endpoint: config.kinesis_endpoint,
      ssl_verify_peer: config.use_ssl
    }
  )
  @stream_name = config.kinesis_stream_name
end

Public Instance Methods

get_records(shard_iterator) click to toggle source

@param [String] shard_iterator @return [Hash]

# File lib/kcl/proxies/kinesis_proxy.rb, line 44
def get_records(shard_iterator)
  res = @client.get_records({ shard_iterator: shard_iterator })
  { records: res.records, next_shard_iterator: res.next_shard_iterator }
end
get_shard_iterator(shard_id, shard_iterator_type = nil, sequence_number = nil) click to toggle source

@param [String] shard_id @param [String] shard_iterator_type @return [String]

# File lib/kcl/proxies/kinesis_proxy.rb, line 29
def get_shard_iterator(shard_id, shard_iterator_type = nil, sequence_number = nil)
  params = {
    stream_name: @stream_name,
    shard_id: shard_id,
    shard_iterator_type: shard_iterator_type || Kcl::Checkpoints::Sentinel::LATEST
  }
  if shard_iterator_type == Kcl::Checkpoints::Sentinel::AFTER_SEQUENCE_NUMBER
    params[:starting_sequence_number] = sequence_number
  end
  res = @client.get_shard_iterator(params)
  res.shard_iterator
end
put_record(data) click to toggle source

@param [Hash] data @return [Hash]

# File lib/kcl/proxies/kinesis_proxy.rb, line 51
def put_record(data)
  res = @client.put_record(data)
  { shard_id: res.shard_id, sequence_number: res.sequence_number }
end
shards() click to toggle source

@return [Array]

# File lib/kcl/proxies/kinesis_proxy.rb, line 21
def shards
  res = @client.describe_stream({ stream_name: @stream_name })
  res.stream_description.shards
end