class Kafka::Protocol::OffsetFetchResponse

Attributes

topics[R]

Public Class Methods

decode(decoder) click to toggle source
# File lib/kafka/protocol/offset_fetch_response.rb, line 33
def self.decode(decoder)
  topics = decoder.array {
    topic = decoder.string

    partitions = decoder.array {
      partition = decoder.int32

      info = PartitionOffsetInfo.new(
        offset: decoder.int64,
        metadata: decoder.string,
        error_code: decoder.int16,
      )

      [partition, info]
    }

    [topic, Hash[partitions]]
  }

  new(topics: Hash[topics])
end
new(topics:) click to toggle source
# File lib/kafka/protocol/offset_fetch_response.rb, line 18
def initialize(topics:)
  @topics = topics
end

Public Instance Methods

offset_for(topic, partition) click to toggle source
# File lib/kafka/protocol/offset_fetch_response.rb, line 22
def offset_for(topic, partition)
  offset_info = topics.fetch(topic).fetch(partition, nil)

  if offset_info
    Protocol.handle_error(offset_info.error_code)
    offset_info.offset
  else
    -1
  end
end