class CC::Kafka::Consumer

Constants

MESSAGE_OFFSET_KEY

Public Class Methods

new(client_id, seed_brokers, topic, partition) click to toggle source
# File lib/cc/kafka/consumer.rb, line 8
def initialize(client_id, seed_brokers, topic, partition)
  @offset = Kafka.offset_model.find_or_create!(
    topic: topic,
    partition: partition,
  )

  Kafka.logger.debug("offset: #{@offset.topic}/#{@offset.partition} #{current_offset(@offset)}")

  @consumer = Poseidon::PartitionConsumer.consumer_for_partition(
    client_id,
    seed_brokers,
    @offset.topic,
    @offset.partition,
    current_offset(@offset)
  )
  @paused = false
end

Public Instance Methods

close() click to toggle source
# File lib/cc/kafka/consumer.rb, line 82
def close
  @consumer.close
end
fetch() click to toggle source
# File lib/cc/kafka/consumer.rb, line 71
def fetch
  @consumer.fetch
rescue Poseidon::Errors::UnknownTopicOrPartition
  Kafka.logger.debug("topic #{@offset.topic.inspect} not created yet")
  []
end
on_message(&block) click to toggle source
# File lib/cc/kafka/consumer.rb, line 34
def on_message(&block)
  @on_message = block
end
on_start(&block) click to toggle source
# File lib/cc/kafka/consumer.rb, line 26
def on_start(&block)
  @on_start = block
end
on_stop(&block) click to toggle source
# File lib/cc/kafka/consumer.rb, line 30
def on_stop(&block)
  @on_stop = block
end
pause() click to toggle source
# File lib/cc/kafka/consumer.rb, line 59
def pause
  @paused = true
end
paused?() click to toggle source
# File lib/cc/kafka/consumer.rb, line 67
def paused?
  @paused
end
set_offset(current_offset) click to toggle source
# File lib/cc/kafka/consumer.rb, line 78
def set_offset(current_offset)
  @offset.set(current: current_offset)
end
start() click to toggle source
# File lib/cc/kafka/consumer.rb, line 38
def start
  trap(:TERM) { stop }

  @running = true
  @on_start.call(@offset) if @on_start

  while @running do
    fetch_messages unless @paused
  end

  Kafka.logger.info("shutting down due to TERM signal")
ensure
  @on_stop.call(@offset) if @on_stop

  close
end
stop() click to toggle source
# File lib/cc/kafka/consumer.rb, line 55
def stop
  @running = false
end
unpause() click to toggle source
# File lib/cc/kafka/consumer.rb, line 63
def unpause
  @paused = false
end

Private Instance Methods

current_offset(offset) click to toggle source
# File lib/cc/kafka/consumer.rb, line 88
def current_offset(offset)
  offset.current || :earliest_offset
end
fetch_messages() click to toggle source
# File lib/cc/kafka/consumer.rb, line 92
def fetch_messages
  fetch.each do |message|
    Kafka.statsd.increment("messages.received")
    Kafka.statsd.time("messages.processing") do
      set_offset(message.offset + 1)

      data = BSON.deserialize(message.value)
      data[MESSAGE_OFFSET_KEY] = [
        @offset.topic,
        @offset.partition,
        message.offset,
      ].join("-")

      @on_message.call(data)
    end
    Kafka.statsd.increment("messages.processed")
  end
end