class Kafka::Group

Public Class Methods

new(options={}) click to toggle source

Create a Kafka client group

options: :zk_connect => “localhost:2181” - REQUIRED: The connection string for the

zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over.

:zk_connect_timeout => “6000” - (optional) The max time that the client waits while establishing a connection to zookeeper. :group_id => “group” - REQUIRED: The group id to consume on. :topic_id => “topic” - REQUIRED: The topic id to consume on. :reset_beginning => “from-beginning” - (optional) If the consumer does not already have an established offset

to consume from, start with the earliest message present in the log rather than the latest message.

:consumer_restart_on_error => “true” - (optional) Controls if consumer threads are to restart on caught exceptions.

exceptions are logged.
# File lib/jruby-kafka/group.rb, line 33
def initialize(options={})
  validate_required_arguments(options)

  @zk_connect = options[:zk_connect]
  @group_id = options[:group_id]
  @topic = options[:topic_id]
  @zk_session_timeout = '6000'
  @zk_connect_timeout = '6000'
  @zk_sync_time = '2000'
  @auto_offset_reset = 'largest'
  @auto_commit_interval = '1000'
  @running = false
  @rebalance_max_retries = '4'
  @rebalance_backoff_ms = '2000'
  @socket_timeout_ms = "#{30 * 1000}"
  @socket_receive_buffer_bytes = "#{64 * 1024}"
  @fetch_message_max_bytes = "#{1024 * 1024}"
  @auto_commit_enable = "#{true}"
  @queued_max_message_chunks = '10'
  @fetch_min_bytes = '1'
  @fetch_wait_max_ms = '100'
  @refresh_leader_backoff_ms = '200'
  @consumer_timeout_ms = '-1'
  @consumer_restart_on_error = "#{false}"
  @consumer_restart_sleep_ms = '0'
  @consumer_id = nil

  if options[:zk_connect_timeout]
    @zk_connect_timeout = "#{options[:zk_connect_timeout]}"
  end
  if options[:zk_session_timeout]
    @zk_session_timeout = "#{options[:zk_session_timeout]}"
  end
  if options[:zk_sync_time]
    @zk_sync_time = "#{options[:zk_sync_time]}"
  end
  if options[:auto_commit_interval]
    @auto_commit_interval = "#{options[:auto_commit_interval]}"
  end

  if options[:rebalance_max_retries]
    @rebalance_max_retries = "#{options[:rebalance_max_retries]}"
  end

  if options[:rebalance_backoff_ms]
    @rebalance_backoff_ms = "#{options[:rebalance_backoff_ms]}"
  end

  if options[:socket_timeout_ms]
    @socket_timeout_ms = "#{options[:socket_timeout_ms]}"
  end

  if options[:socket_receive_buffer_bytes]
    @socket_receive_buffer_bytes = "#{options[:socket_receive_buffer_bytes]}"
  end

  if options[:fetch_message_max_bytes]
    @fetch_message_max_bytes = "#{options[:fetch_message_max_bytes]}"
  end

  if options[:auto_commit_enable]
    @auto_commit_enable = "#{options[:auto_commit_enable]}"
  end

  if options[:queued_max_message_chunks]
    @queued_max_message_chunks = "#{options[:queued_max_message_chunks]}"
  end

  if options[:fetch_min_bytes]
    @fetch_min_bytes = "#{options[:fetch_min_bytes]}"
  end

  if options[:fetch_wait_max_ms]
    @fetch_wait_max_ms = "#{options[:fetch_wait_max_ms]}"
  end

  if options[:refresh_leader_backoff_ms]
    @refresh_leader_backoff_ms = "#{options[:refresh_leader_backoff_ms]}"
  end

  if options[:consumer_timeout_ms]
    @consumer_timeout_ms = "#{options[:consumer_timeout_ms]}"
  end

  if options[:consumer_restart_on_error]
    @consumer_restart_on_error = "#{options[:consumer_restart_on_error]}"
  end

  if options[:consumer_restart_sleep_ms]
    @consumer_restart_sleep_ms = "#{options[:consumer_restart_sleep_ms]}"
  end


  if options[:reset_beginning]
    if options[:reset_beginning] == 'from-beginning'
      @auto_offset_reset = 'smallest'
    else
      @auto_offset_reset = 'largest'
    end
  end

  if options[:consumer_id]
    @consumer_id = options[:consumer_id]
  end
end

Public Instance Methods

run(a_numThreads, a_queue) click to toggle source
# File lib/jruby-kafka/group.rb, line 158
def run(a_numThreads, a_queue)
  begin
    if @auto_offset_reset == 'smallest'
      Java::kafka::utils::ZkUtils.maybeDeletePath(@zk_connect, "/consumers/#{@group_id}")
    end

    @consumer = Java::kafka::consumer::Consumer.createJavaConsumerConnector(createConsumerConfig())
  rescue ZkException => e
    raise KafkaError.new(e), "Got ZkException: #{e}"
  end
  topicCountMap = java.util.HashMap.new()
  thread_value = a_numThreads.to_java Java::int
  topicCountMap.put(@topic, thread_value)
  consumerMap = @consumer.createMessageStreams(topicCountMap)
  streams = Array.new(consumerMap[@topic])

  @executor = Executors.newFixedThreadPool(a_numThreads)
  @executor_submit = @executor.java_method(:submit, [Java::JavaLang::Runnable.java_class])

  threadNumber = 0
  for stream in streams
    @executor_submit.call(Kafka::Consumer.new(stream, threadNumber, a_queue, @consumer_restart_on_error, @consumer_restart_sleep_ms))
    threadNumber += 1
  end
  @running = true
end
running?() click to toggle source
# File lib/jruby-kafka/group.rb, line 186
def running?
  @running
end
shutdown() click to toggle source
# File lib/jruby-kafka/group.rb, line 147
def shutdown()
  if @consumer
    @consumer.shutdown()
  end
  if @executor
    @executor.shutdown()
  end
  @running = false
end

Private Instance Methods

createConsumerConfig() click to toggle source
# File lib/jruby-kafka/group.rb, line 191
def createConsumerConfig()
  properties = java.util.Properties.new()
  properties.put("zookeeper.connect", @zk_connect)
  properties.put("group.id", @group_id)
  properties.put("zookeeper.connection.timeout.ms", @zk_connect_timeout)
  properties.put("zookeeper.session.timeout.ms", @zk_session_timeout)
  properties.put("zookeeper.sync.time.ms", @zk_sync_time)
  properties.put("auto.commit.interval.ms", @auto_commit_interval)
  properties.put("auto.offset.reset", @auto_offset_reset)
  properties.put("rebalance.max.retries", @rebalance_max_retries)
  properties.put("rebalance.backoff.ms", @rebalance_backoff_ms)
  properties.put("socket.timeout.ms", @socket_timeout_ms)
  properties.put("socket.receive.buffer.bytes", @socket_receive_buffer_bytes)
  properties.put("fetch.message.max.bytes", @fetch_message_max_bytes)
  properties.put("auto.commit.enable", @auto_commit_enable)
  properties.put("queued.max.message.chunks", @queued_max_message_chunks)
  properties.put("fetch.min.bytes", @fetch_min_bytes)
  properties.put("fetch.wait.max.ms", @fetch_wait_max_ms)
  properties.put("refresh.leader.backoff.ms", @refresh_leader_backoff_ms)
  properties.put("consumer.timeout.ms", @consumer_timeout_ms)
  unless @consumer_id.nil?
    properties.put('consumer.id', @consumer_id)
  end
  return Java::kafka::consumer::ConsumerConfig.new(properties)
end
validate_required_arguments(options={}) click to toggle source
# File lib/jruby-kafka/group.rb, line 140
def validate_required_arguments(options={})
  [:zk_connect, :group_id, :topic_id].each do |opt|
    raise(ArgumentError, "#{opt} is required.") unless options[opt]
  end
end