class CC::Kafka::Consumer
Constants
- MESSAGE_OFFSET_KEY
Public Class Methods
new(client_id, seed_brokers, topic, partition)
click to toggle source
# File lib/cc/kafka/consumer.rb, line 8 def initialize(client_id, seed_brokers, topic, partition) @offset = Kafka.offset_model.find_or_create!( topic: topic, partition: partition, ) Kafka.logger.debug("offset: #{@offset.topic}/#{@offset.partition} #{current_offset(@offset)}") @consumer = Poseidon::PartitionConsumer.consumer_for_partition( client_id, seed_brokers, @offset.topic, @offset.partition, current_offset(@offset) ) @paused = false end
Public Instance Methods
close()
click to toggle source
# File lib/cc/kafka/consumer.rb, line 82 def close @consumer.close end
fetch()
click to toggle source
# File lib/cc/kafka/consumer.rb, line 71 def fetch @consumer.fetch rescue Poseidon::Errors::UnknownTopicOrPartition Kafka.logger.debug("topic #{@offset.topic.inspect} not created yet") [] end
on_message(&block)
click to toggle source
# File lib/cc/kafka/consumer.rb, line 34 def on_message(&block) @on_message = block end
on_start(&block)
click to toggle source
# File lib/cc/kafka/consumer.rb, line 26 def on_start(&block) @on_start = block end
on_stop(&block)
click to toggle source
# File lib/cc/kafka/consumer.rb, line 30 def on_stop(&block) @on_stop = block end
pause()
click to toggle source
# File lib/cc/kafka/consumer.rb, line 59 def pause @paused = true end
paused?()
click to toggle source
# File lib/cc/kafka/consumer.rb, line 67 def paused? @paused end
set_offset(current_offset)
click to toggle source
# File lib/cc/kafka/consumer.rb, line 78 def set_offset(current_offset) @offset.set(current: current_offset) end
start()
click to toggle source
# File lib/cc/kafka/consumer.rb, line 38 def start trap(:TERM) { stop } @running = true @on_start.call(@offset) if @on_start while @running do fetch_messages unless @paused end Kafka.logger.info("shutting down due to TERM signal") ensure @on_stop.call(@offset) if @on_stop close end
stop()
click to toggle source
# File lib/cc/kafka/consumer.rb, line 55 def stop @running = false end
unpause()
click to toggle source
# File lib/cc/kafka/consumer.rb, line 63 def unpause @paused = false end
Private Instance Methods
current_offset(offset)
click to toggle source
# File lib/cc/kafka/consumer.rb, line 88 def current_offset(offset) offset.current || :earliest_offset end
fetch_messages()
click to toggle source
# File lib/cc/kafka/consumer.rb, line 92 def fetch_messages fetch.each do |message| Kafka.statsd.increment("messages.received") Kafka.statsd.time("messages.processing") do set_offset(message.offset + 1) data = BSON.deserialize(message.value) data[MESSAGE_OFFSET_KEY] = [ @offset.topic, @offset.partition, message.offset, ].join("-") @on_message.call(data) end Kafka.statsd.increment("messages.processed") end end