class Awful::Streams

Public Instance Methods

dump(arn) click to toggle source
# File lib/awful/dynamodb_streams.rb, line 38
def dump(arn)
  streams.describe_stream(stream_arn: arn).stream_description.output do |stream|
    puts YAML.dump(stringify_keys(stream.to_hash))
  end
end
get_records(arn, shard_id) click to toggle source
# File lib/awful/dynamodb_streams.rb, line 52
def get_records(arn, shard_id)
  iterator = streams.get_shard_iterator(stream_arn: arn, shard_id: shard_id, shard_iterator_type: 'TRIM_HORIZON').shard_iterator

  streams.get_records(shard_iterator: iterator).records.output do |records|
    print_table records.map { |r| [r.event_id, r.event_name, r.dynamodb.sequence_number, r.dynamodb.size_bytes] }
  end
end
ls(table_name = nil, exclusive_start_stream_arn = nil) click to toggle source
# File lib/awful/dynamodb_streams.rb, line 17
def ls(table_name = nil, exclusive_start_stream_arn = nil)
  response = streams.list_streams(
    table_name: table_name,
    exclusive_start_stream_arn: exclusive_start_stream_arn
  )

  streams = response.streams

  ## output
  if options[:long]
    print_table streams.map{ |s| [s.table_name, s.stream_arn] }.sort
  else
    puts streams.map(&:stream_arn)
  end

  ## recurse if there is more data to fetch
  streams += ls(table_name, response.last_evaluated_stream_arn) if response.last_evaluated_stream_arn
  streams
end
shards(arn) click to toggle source
# File lib/awful/dynamodb_streams.rb, line 45
def shards(arn)
  streams.describe_stream(stream_arn: arn).stream_description.shards.output do |shards|
    puts shards.map(&:shard_id)
  end
end
streams() click to toggle source
# File lib/awful/dynamodb_streams.rb, line 10
def streams
  @streams ||= Aws::DynamoDBStreams::Client.new
end