class KafkaETLBase::Base

Public Class Methods

new(zookeeper, kafka_brokers, kafka_topic, opts={}) click to toggle source
# File lib/kafka_etl_base/base.rb, line 10
def initialize(zookeeper, kafka_brokers, kafka_topic, opts={})
  
  @num_threads = opts[:num_threads] ? opts[:num_threads] : 2 
  @max_fetch_bytes = opts[:max_fetch_bytes] ? opts[:max_fetch_bytes] : 5_000_000
  @min_fetch_bytes = opts[:min_fetch_bytes] ? opts[:min_fetch_bytes] : 0
  @kafka_client_id = opts[:kafka_client_id] ? opts[:kafka_client_id] : "my_consumer"
  @kafka_part_num = opts[:kafka_topic_part_num] ? opts[:kafka_topic_part_num] : 2
  
  @zookeeper = zookeeper
  @kafka_brokers = kafka_brokers
  @kafka_topic = kafka_topic
  
  $log = Logger.new(STDOUT) if $log.nil?

  $log.debug("zookeeper: #{@zookeeper}")
  $log.debug("kafka_brokers: #{@kafka_brokers}")
  $log.debug("kafka_topic: #{@kafka_topic}")
  $log.debug("kafka_client_id: #{@kafka_client_id}")
  
  @partition_shuffle = true
  
  @mutex = Mutex.new
  
  @total_procs = 0
end

Public Instance Methods

proccess_thread(zk, part_no) click to toggle source
# File lib/kafka_etl_base/base.rb, line 107
def proccess_thread(zk, part_no)
  zk_part_node = "/part_offset_#{part_no}"
  
  num_cur_part_procs = 0
  
  if ! zk.exists?(zk_part_node)
    zk.create(zk_part_node, "0")
  end
  
  offset = nil
  begin
    value, stat = zk.get(zk_part_node)
    offset = value.to_i
    if offset == 0
      part_offset = :earliest_offset
      #part_offset = :latest_offset
    else
      part_offset = offset
    end
  rescue ZK::Exceptions::NoNode => e
    part_offset = :earliest_offset
    #part_offset = :latest_offset
  end
  $log.debug "part: #{part_no}, offset: #{part_offset}"
  
  cons = Poseidon::PartitionConsumer.consumer_for_partition(@kafka_client_id,
                                                            @kafka_brokers,
                                                            @kafka_topic,
                                                            part_no,
                                                            part_offset,
                                                            :max_wait_ms => 0,
                                                            :max_bytes => @max_fetch_bytes,
                                                            :min_bytes => @min_fetch_bytes
                                                           )
  begin
    num_cur_part_procs += process_messages(cons, part_no)
    
    next_offset = cons.next_offset
    last_offset = cons.highwater_mark
    $log.info "part: #{part_no}, next_offset: #{next_offset}, last_offset: #{last_offset},  proc: #{num_cur_part_procs}"
    @mutex.synchronize do
      @total_procs += num_cur_part_procs
    end
    # set next offset to zookeper
    zk.set(zk_part_node, next_offset.to_s) if next_offset >= offset
    return last_offset - next_offset
  rescue Poseidon::Connection::ConnectionFailedError
    $log.error "kafka connection failed"
    return 0
  rescue BackendError
    $log.debug "backend error"
    return 0
  ensure
    cons.close if ! cons.nil?
  end
rescue Poseidon::Errors::OffsetOutOfRange => e
  zk.set(zk_part_node, "0")
  $log.error e.inspect
  return 0
rescue Poseidon::Errors::NotLeaderForPartition, Poseidon::Errors::UnableToFetchMetadata => e
  $log.error e.inspect
  return 0
end
process() click to toggle source
# File lib/kafka_etl_base/base.rb, line 36
def process()
  
  @total_procs = 0
  
  zk = ZK.new(@zookeeper)
  begin
    #zk.create("/", ignore: :node_exists)
    if zk.exists?("/stop_etl")
      $log.info("zk: /stop_etl exist.")
      $log.info("stop message processing.")
      return
    end
  ensure
    zk.close!
  end
  
  seq = [ * 0 ... @kafka_part_num ]
  seq.shuffle! if @partition_shuffle == true
  
  r = Parallel.each(seq, :in_threads => @num_threads) do |part_no|
    zk_th = nil
    begin
      zk_th = ZK.new(@zookeeper)
      zk_lock = "lock_hdfs_part_#{part_no}"
      locker = zk_th.locker(zk_lock)
      begin
        if locker.lock!
          remain = proccess_thread(zk_th, part_no)
        else
          $log.info("part: #{part_no} is already locked skip")
        end
      ensure
        begin
          locker.unlock!
        rescue => e
          $log.error(e.inspect)
          $log.error(e.backtrace)
        end
      end
    rescue ZK::Exceptions::ConnectionLoss => e
      $log.error(e.inspect)
      $log.error(e.backtrace)
    ensure
      begin
        zk_th.close! if ! zk_th.nil?
      rescue
      end
    end
  end
  $log.info "total procs: #{@total_procs}"
end
process_messages(cons, part_no) click to toggle source
# File lib/kafka_etl_base/base.rb, line 171
def process_messages(cons, part_no)
  messages = cons.fetch
  messages.each do |m|
    key = m.key
    val = m.value
    puts "part: #{part_no}, key: #{key}, val: #{val}"
  end
  messages.size
end
process_partition(part_no) click to toggle source
# File lib/kafka_etl_base/base.rb, line 88
def process_partition(part_no)
  zk = ZK.new(@zookeeper)
  begin
    zk_lock = "lock_hdfs_part_#{part_no}"
    locker = zk.locker(zk_lock)
    begin
      if locker.lock!
        remain = proccess_thread(zk, part_no)
      else
        $log.info("part: #{part_no} is already locked skip")
      end
    ensure
      locker.unlock!
    end
  ensure
    zk.close
  end
end