class Kafka::Broker

Public Class Methods

new(connection_builder:, host:, port:, node_id: nil, logger:) click to toggle source
# File lib/kafka/broker.rb, line 9
def initialize(connection_builder:, host:, port:, node_id: nil, logger:)
  @connection_builder = connection_builder
  @connection = nil
  @host = host
  @port = port
  @node_id = node_id
  @logger = TaggedLogger.new(logger)
end

Public Instance Methods

add_offsets_to_txn(**options) click to toggle source
# File lib/kafka/broker.rb, line 185
def add_offsets_to_txn(**options)
  request = Protocol::AddOffsetsToTxnRequest.new(**options)

  send_request(request)
end
add_partitions_to_txn(**options) click to toggle source
# File lib/kafka/broker.rb, line 173
def add_partitions_to_txn(**options)
  request = Protocol::AddPartitionsToTxnRequest.new(**options)

  send_request(request)
end
address_match?(host, port) click to toggle source
# File lib/kafka/broker.rb, line 18
def address_match?(host, port)
  host == @host && port == @port
end
alter_configs(**options) click to toggle source
# File lib/kafka/broker.rb, line 137
def alter_configs(**options)
  request = Protocol::AlterConfigsRequest.new(**options)

  send_request(request)
end
api_versions() click to toggle source
# File lib/kafka/broker.rb, line 155
def api_versions
  request = Protocol::ApiVersionsRequest.new

  send_request(request)
end
commit_offsets(**options) click to toggle source
# File lib/kafka/broker.rb, line 83
def commit_offsets(**options)
  request = Protocol::OffsetCommitRequest.new(**options)

  send_request(request)
end
connected?() click to toggle source

@return [Boolean]

# File lib/kafka/broker.rb, line 33
def connected?
  !@connection.nil?
end
create_partitions(**options) click to toggle source
# File lib/kafka/broker.rb, line 143
def create_partitions(**options)
  request = Protocol::CreatePartitionsRequest.new(**options)

  send_request(request)
end
create_topics(**options) click to toggle source
# File lib/kafka/broker.rb, line 119
def create_topics(**options)
  request = Protocol::CreateTopicsRequest.new(**options)

  send_request(request)
end
delete_topics(**options) click to toggle source
# File lib/kafka/broker.rb, line 125
def delete_topics(**options)
  request = Protocol::DeleteTopicsRequest.new(**options)

  send_request(request)
end
describe_configs(**options) click to toggle source
# File lib/kafka/broker.rb, line 131
def describe_configs(**options)
  request = Protocol::DescribeConfigsRequest.new(**options)

  send_request(request)
end
describe_groups(**options) click to toggle source
# File lib/kafka/broker.rb, line 161
def describe_groups(**options)
  request = Protocol::DescribeGroupsRequest.new(**options)

  send_request(request)
end
disconnect() click to toggle source

@return [nil]

# File lib/kafka/broker.rb, line 28
def disconnect
  connection.close if connected?
end
end_txn(**options) click to toggle source
# File lib/kafka/broker.rb, line 179
def end_txn(**options)
  request = Protocol::EndTxnRequest.new(**options)

  send_request(request)
end
fetch_messages(**options) click to toggle source

Fetches messages from a specified topic and partition.

@param (see Kafka::Protocol::FetchRequest#initialize) @return [Kafka::Protocol::FetchResponse]

# File lib/kafka/broker.rb, line 51
def fetch_messages(**options)
  request = Protocol::FetchRequest.new(**options)

  send_request(request)
end
fetch_metadata(**options) click to toggle source

Fetches cluster metadata from the broker.

@param (see Kafka::Protocol::MetadataRequest#initialize) @return [Kafka::Protocol::MetadataResponse]

# File lib/kafka/broker.rb, line 41
def fetch_metadata(**options)
  request = Protocol::MetadataRequest.new(**options)

  send_request(request)
end
fetch_offsets(**options) click to toggle source
# File lib/kafka/broker.rb, line 77
def fetch_offsets(**options)
  request = Protocol::OffsetFetchRequest.new(**options)

  send_request(request)
end
find_coordinator(**options) click to toggle source
# File lib/kafka/broker.rb, line 107
def find_coordinator(**options)
  request = Protocol::FindCoordinatorRequest.new(**options)

  send_request(request)
end
heartbeat(**options) click to toggle source
# File lib/kafka/broker.rb, line 113
def heartbeat(**options)
  request = Protocol::HeartbeatRequest.new(**options)

  send_request(request)
end
init_producer_id(**options) click to toggle source
# File lib/kafka/broker.rb, line 167
def init_producer_id(**options)
  request = Protocol::InitProducerIDRequest.new(**options)

  send_request(request)
end
join_group(**options) click to toggle source
# File lib/kafka/broker.rb, line 89
def join_group(**options)
  request = Protocol::JoinGroupRequest.new(**options)

  send_request(request)
end
leave_group(**options) click to toggle source
# File lib/kafka/broker.rb, line 101
def leave_group(**options)
  request = Protocol::LeaveGroupRequest.new(**options)

  send_request(request)
end
list_groups() click to toggle source
# File lib/kafka/broker.rb, line 149
def list_groups
  request = Protocol::ListGroupsRequest.new

  send_request(request)
end
list_offsets(**options) click to toggle source

Lists the offset of the specified topics and partitions.

@param (see Kafka::Protocol::ListOffsetRequest#initialize) @return [Kafka::Protocol::ListOffsetResponse]

# File lib/kafka/broker.rb, line 61
def list_offsets(**options)
  request = Protocol::ListOffsetRequest.new(**options)

  send_request(request)
end
produce(**options) click to toggle source

Produces a set of messages to the broker.

@param (see Kafka::Protocol::ProduceRequest#initialize) @return [Kafka::Protocol::ProduceResponse]

# File lib/kafka/broker.rb, line 71
def produce(**options)
  request = Protocol::ProduceRequest.new(**options)

  send_request(request)
end
sync_group(**options) click to toggle source
# File lib/kafka/broker.rb, line 95
def sync_group(**options)
  request = Protocol::SyncGroupRequest.new(**options)

  send_request(request)
end
to_s() click to toggle source

@return [String]

# File lib/kafka/broker.rb, line 23
def to_s
  "#{@host}:#{@port} (node_id=#{@node_id.inspect})"
end
txn_offset_commit(**options) click to toggle source
# File lib/kafka/broker.rb, line 191
def txn_offset_commit(**options)
  request = Protocol::TxnOffsetCommitRequest.new(**options)

  send_request(request)
end

Private Instance Methods

connection() click to toggle source
# File lib/kafka/broker.rb, line 213
def connection
  @connection ||= @connection_builder.build_connection(@host, @port)
end
send_request(request) click to toggle source
# File lib/kafka/broker.rb, line 199
def send_request(request)
  connection.send_request(request)
rescue IdleConnection
  @logger.warn "Connection has been unused for too long, re-connecting..."
  @connection.close rescue nil
  @connection = nil
  retry
rescue ConnectionError
  @connection.close rescue nil
  @connection = nil

  raise
end