require 'multi_json' require 'kafka_replicator'

namespace :kafka_replicator do

desc 'Start topics replicator'
task :replicate_topics do |task, _|
  source_brokers = ENV['KAFKA_REPLICATOR_SOURCE_BROKERS'] && MultiJson.load(ENV['KAFKA_REPLICATOR_SOURCE_BROKERS'])
  raise "KAFKA_REPLICATOR_SOURCE_BROKERS environment variable is not set" unless source_brokers

  destination_brokers = ENV['KAFKA_REPLICATOR_DESTINATION_BROKERS'] && MultiJson.load(ENV['KAFKA_REPLICATOR_DESTINATION_BROKERS'])
  raise "KAFKA_REPLICATOR_DESTINATION_BROKERS environment variable is not set" unless destination_brokers

  skip_topics = (ENV['KAFKA_REPLICATOR_SKIP_TOPICS'] && MultiJson.load(ENV['KAFKA_REPLICATOR_SKIP_TOPICS'])) || []

  puts "Replicating from #{source_brokers} to #{destination_brokers}"
  puts "Skipping topics: #{(KafkaReplicator::TopicsReplicator::SKIP_TOPICS | skip_topics).sort}"

  replicator = KafkaReplicator::TopicsReplicator.new(
    source_brokers: source_brokers, 
    destination_brokers: destination_brokers,
    skip_topics: skip_topics
  )

  trap("TERM") { replicator.stop }
  trap("INT") { replicator.stop }

  replicator.start
end

end