class Wukong::Load::KafkaLoader

Loads data into Kafka.

Uses the `kafka-rb` gem to create a Kafka::Producer to write to Kafka.

Allows loading records into a given topic on a given partition. Records can have fields `_topic` and `_partition` which override the given topic and partition on a per-record basis.

The names of these fields within each record (`_topic` and `_partition`) can be customized.

Attributes

producer[RW]

The Kafka producer used to send messages to Kafka.

Public Instance Methods

load(record) click to toggle source

Load a single record into Kafka.

@param [Hash] record

# File lib/wukong-load/loaders/kafka.rb, line 65
def load record
  begin
    topic     = topic_for(record)
    partition = partition_for(record)
    bytes     = producer.send(topic, messages_for(record), :partition => partition)
    log.info("Wrote #{bytes} bytes to #{topic}/#{partition}")
  rescue => e
    handle_error(record, e)
  end
end
setup() click to toggle source

Creates the producer.

# File lib/wukong-load/loaders/kafka.rb, line 48
def setup
  begin
    require 'kafka'
  rescue LoadError => e
    raise Error.new("Please ensure that the 'kafka-rb' gem is installed and available (in your Gemfile)")
  end
  log.debug("Connecting to Kafka broker at #{host}:#{port}...")
  begin
    self.producer = Kafka::MultiProducer.new(:host => host, :port => port)
  rescue => e
    raise Error.new(e.message)
  end
end