module Cumulus::Kinesis

Public Class Methods

describe_stream(stream_name) click to toggle source

Public - Load the entire stream description with all shards

Returns a Aws::Kinesis::Types::StreamDescription with all shards loaded

# File lib/kinesis/Kinesis.rb, line 30
def describe_stream(stream_name)
  stream = @@client.describe_stream({
    stream_name: stream_name
  }).stream_description

  while stream.has_more_shards do
    stream_continued = @@client.describe_stream({
      stream_name: stream_name,
      exclusive_start_shard_id: stream.shards.last.shard_id
    }).stream_description

    stream.shards.concat(stream_continued.shards)
    stream.has_more_shards = stream_continued.has_more_shards
  end

  stream
end
named_streams() click to toggle source

Public - Returns a Hash of stream name to Aws::Kinesis::Types::StreamDescription with all shards loaded

# File lib/kinesis/Kinesis.rb, line 13
def named_streams
  @named_streams ||= Hash[stream_names.map { |name| [name, describe_stream(name)] }]
end
stream_names() click to toggle source

Public - Returns an array of all the stream names

# File lib/kinesis/Kinesis.rb, line 18
def stream_names
  @stream_names ||= init_stream_names
end
stream_tags() click to toggle source

Public - Returns a Hash of stream name to tags

# File lib/kinesis/Kinesis.rb, line 23
def stream_tags
  @stream_tags ||= Hash[stream_names.map { |name| [name, init_tags(name) ] }]
end

Private Class Methods

init_stream_names() click to toggle source

Internal - Load the list of stream names

Returns the stream names as an Array

# File lib/kinesis/Kinesis.rb, line 75
def init_stream_names
  streams = []

  has_more_streams = true

  while has_more_streams do
    response = @@client.list_streams({
      exclusive_start_stream_name: streams.last
    })

    streams.concat(response.stream_names)
    has_more_streams = response.has_more_streams
  end

  streams
end
init_tags(stream_name) click to toggle source

Internal - Load the tags for a stream

Returns a Hash containing the tags as key/value pairs

# File lib/kinesis/Kinesis.rb, line 53
def init_tags(stream_name)
  response = @@client.list_tags_for_stream({
    stream_name: stream_name,
  })

  tags = response.tags

  while response.has_more_tags do
    response = @@client.list_tags_for_stream({
      stream_name: stream_name,
      exclusive_start_tag_key: tags.last.key
    })

    tags.concat(response.tags)
  end

  Hash[tags.map { |tag| [tag.key, tag.value] }]
end