class Kafka::Consumer

Public Class Methods

new(a_stream, a_threadNumber, a_queue, a_bool_restart_on_exception, a_sleep_ms) click to toggle source
# File lib/jruby-kafka/consumer.rb, line 17
def initialize(a_stream, a_threadNumber, a_queue, a_bool_restart_on_exception, a_sleep_ms)
  @m_threadNumber = a_threadNumber
  @m_stream = a_stream
  @m_queue = a_queue
  @m_restart_on_exception = a_bool_restart_on_exception
  @m_sleep_ms = 1.0 / 1000.0 * Float(a_sleep_ms)
end

Public Instance Methods

run() click to toggle source
# File lib/jruby-kafka/consumer.rb, line 25
def run
  it = @m_stream.iterator()
  begin
    while it.hasNext()
      begin
        @m_queue << it.next().message()
      end
    end
  rescue Exception => e
    puts("#{self.class.name} caught exception: #{e.class.name}")
    puts(e.message) if e.message != ''
    if @m_restart_on_exception
      sleep(@m_sleep_ms)
      retry
    else
      raise e
    end
  end
end