class KafkaReplicator::OffsetsSync
Attributes
consumer_group[R]
destination_consumer[R]
destination_kafka[R]
logger[R]
source_kafka[R]
topics[R]
Public Class Methods
new(source_brokers:, destination_brokers:, consumer_group:)
click to toggle source
# File lib/kafka_replicator/offsets_sync.rb, line 10 def initialize(source_brokers:, destination_brokers:, consumer_group:) @source_brokers = source_brokers @destination_brokers = destination_brokers @consumer_group = consumer_group @topics = Hash.new { |h, k| h[k] = {} } @logger = Logger.new(STDOUT) end
Public Instance Methods
calculate_destination_consumer_offsets()
click to toggle source
# File lib/kafka_replicator/offsets_sync.rb, line 80 def calculate_destination_consumer_offsets logger.info "calculate_destination_consumer_offsets" @topics.each do |topic, partitions| partitions.each do |partition, info| delta = info[:source_producer_offset] - info[:destination_producer_offset] info[:destination_consumer_offsets] = info[:source_consumer_offset] - delta end end end
load_destination_producer_offsets()
click to toggle source
# File lib/kafka_replicator/offsets_sync.rb, line 70 def load_destination_producer_offsets logger.info "load_source_producer_offsets" destination_kafka.last_offsets_for(*@topics.keys).each do |topic, partitions| partitions.each do |partition, offset| @topics[topic][partition][:destination_producer_offset] = offset end end end
load_source_consumer_offsets()
click to toggle source
# File lib/kafka_replicator/offsets_sync.rb, line 50 def load_source_consumer_offsets logger.info "load_source_consumer_offsets" source_consumer_offsets.topics.each do |topic, partitions| partitions.map do |partition, info| @topics[topic][partition] = { source_consumer_offset: info.offset } end end end
load_source_producer_offsets()
click to toggle source
# File lib/kafka_replicator/offsets_sync.rb, line 60 def load_source_producer_offsets logger.info "load_destination_producer_offsets" source_kafka.last_offsets_for(*@topics.keys).each do |topic, partitions| partitions.each do |partition, offset| @topics[topic][partition][:source_producer_offset] = offset end end end
set_destination_consumer_offsets()
click to toggle source
# File lib/kafka_replicator/offsets_sync.rb, line 91 def set_destination_consumer_offsets logger.info "set_destination_consumer_offsets" @topics.each do |topic, partitions| destination_consumer.subscribe(topic) partitions.each do |partition, info| offset = info[:destination_consumer_offsets] logger.info "Seeking consumer offset for: #{m.topic}/#{m.partition} to #{offset}" destination_consumer.seek(topic, partition, offset) end end opts = { automatically_mark_as_processed: false } destination_consumer.each_message(opts) do |m| logger.info "Setting consumer offset for: #{m.topic}/#{m.partition}" break end end
source_consumer_offsets()
click to toggle source
# File lib/kafka_replicator/offsets_sync.rb, line 45 def source_consumer_offsets Kafka::Protocol::OffsetFetchRequest.send(:define_method, "api_version") { 2 } source_group_cordinator.fetch_offsets(group_id: consumer_group, topics: nil) end
source_group_cordinator()
click to toggle source
# File lib/kafka_replicator/offsets_sync.rb, line 38 def source_group_cordinator source_kafka.instance_variable_get('@cluster').send( :get_group_coordinator, group_id: consumer_group ) end
sync()
click to toggle source
# File lib/kafka_replicator/offsets_sync.rb, line 110 def sync load_source_consumer_offsets load_destination_producer_offsets load_source_producer_offsets calculate_destination_consumer_offsets set_destination_consumer_offsets end