class DruidDB::Writer

Attributes

config[R]
producer[R]
zk[R]

Public Class Methods

new(config, zk) click to toggle source
# File lib/druiddb/writer.rb, line 4
def initialize(config, zk)
  @config = config
  @zk = zk
  init_producer
  zk.register_listener(self, :handle_kafka_state_change)
end

Public Instance Methods

write_point(datasource, datapoint) click to toggle source
# File lib/druiddb/writer.rb, line 11
def write_point(datasource, datapoint)
  raise DruidDB::ConnectionError, 'no kafka brokers available' if producer.nil?
  begin
    producer.produce(datapoint.to_json, topic: datasource)
  rescue Kafka::BufferOverflow
    sleep config.kafka_overflow_wait
    retry
  end
end

Private Instance Methods

broker_list() click to toggle source
# File lib/druiddb/writer.rb, line 23
def broker_list
  zk.registry['/brokers/ids'].map { |instance| broker_name(instance) }.join(',')
end
broker_name(instance) click to toggle source
# File lib/druiddb/writer.rb, line 27
def broker_name(instance)
  "#{instance[:host]}:#{instance[:port]}"
end
handle_kafka_state_change(service) click to toggle source
# File lib/druiddb/writer.rb, line 31
def handle_kafka_state_change(service)
  return unless service == config.kafka_broker_path
  producer.shutdown
  init_producer
end
init_producer() click to toggle source
# File lib/druiddb/writer.rb, line 37
def init_producer
  producer_options = { seed_brokers: broker_list, client_id: config.client_id }

  if broker_list.present?
    kafka = Kafka.new(producer_options)
    producer = kafka.async_producer(delivery_threshold: 100, delivery_interval: 10)
    producer.deliver_messages
  else
    producer = nil
  end

  @producer = producer
end