class Kafka::Consumer
Public Class Methods
new(a_stream, a_threadNumber, a_queue, a_bool_restart_on_exception, a_sleep_ms)
click to toggle source
# File lib/jruby-kafka/consumer.rb, line 17 def initialize(a_stream, a_threadNumber, a_queue, a_bool_restart_on_exception, a_sleep_ms) @m_threadNumber = a_threadNumber @m_stream = a_stream @m_queue = a_queue @m_restart_on_exception = a_bool_restart_on_exception @m_sleep_ms = 1.0 / 1000.0 * Float(a_sleep_ms) end
Public Instance Methods
run()
click to toggle source
# File lib/jruby-kafka/consumer.rb, line 25 def run it = @m_stream.iterator() begin while it.hasNext() begin @m_queue << it.next().message() end end rescue Exception => e puts("#{self.class.name} caught exception: #{e.class.name}") puts(e.message) if e.message != '' if @m_restart_on_exception sleep(@m_sleep_ms) retry else raise e end end end