class DruidDB::Writer
Attributes
config[R]
producer[R]
zk[R]
Public Class Methods
new(config, zk)
click to toggle source
# File lib/druiddb/writer.rb, line 4 def initialize(config, zk) @config = config @zk = zk init_producer zk.register_listener(self, :handle_kafka_state_change) end
Public Instance Methods
write_point(datasource, datapoint)
click to toggle source
# File lib/druiddb/writer.rb, line 11 def write_point(datasource, datapoint) raise DruidDB::ConnectionError, 'no kafka brokers available' if producer.nil? begin producer.produce(datapoint.to_json, topic: datasource) rescue Kafka::BufferOverflow sleep config.kafka_overflow_wait retry end end
Private Instance Methods
broker_list()
click to toggle source
# File lib/druiddb/writer.rb, line 23 def broker_list zk.registry['/brokers/ids'].map { |instance| broker_name(instance) }.join(',') end
broker_name(instance)
click to toggle source
# File lib/druiddb/writer.rb, line 27 def broker_name(instance) "#{instance[:host]}:#{instance[:port]}" end
handle_kafka_state_change(service)
click to toggle source
# File lib/druiddb/writer.rb, line 31 def handle_kafka_state_change(service) return unless service == config.kafka_broker_path producer.shutdown init_producer end
init_producer()
click to toggle source
# File lib/druiddb/writer.rb, line 37 def init_producer producer_options = { seed_brokers: broker_list, client_id: config.client_id } if broker_list.present? kafka = Kafka.new(producer_options) producer = kafka.async_producer(delivery_threshold: 100, delivery_interval: 10) producer.deliver_messages else producer = nil end @producer = producer end