class Kafka::Producer
Public Class Methods
new(options={})
click to toggle source
options: :broker_list => “localhost:9092” - REQUIRED: a seed list of kafka brokers
# File lib/jruby-kafka/producer.rb, line 15 def initialize(options={}) validate_required_arguments(options) @metadata_broker_list = options[:broker_list] @serializer_class = nil @partitioner_class = nil @request_required_acks = nil @compression_codec = nil @compressed_topics = nil @request_timeout_ms = nil @producer_type = nil @key_serializer_class = nil @message_send_max_retries = nil @retry_backoff_ms = nil @topic_metadata_refresh_interval_ms = nil @queue_buffering_max_ms = nil @queue_buffering_max_messages = nil @queue_enqueue_timeout_ms = nil @batch_num_messages = nil @send_buffer_bytes = nil @client_id = nil if options[:partitioner_class] @partitioner_class = "#{options[:partitioner_class]}" end if options[:request_required_acks] valid_acks = %w{ 0 1 -1 } if not valid_acks.include? "#{options[:request_required_acks]}" raise(ArgumentError, "#{options[:request_required_acks]} is not a valid request_required_acks value: #{valid_acks}") end @request_required_acks = "#{options[:request_required_acks]}" end if options[:compression_codec] required_codecs = ["#{Java::kafka::message::NoCompressionCodec.name}", "#{Java::kafka::message::GZIPCompressionCodec.name}", "#{Java::kafka::message::SnappyCompressionCodec.name}"] if not required_codecs.include? "#{options[:compression_codec]}" raise(ArgumentError, "#{options[:compression_codec]} is not one of required codecs: #{required_codecs}") end @compression_codec = "#{options[:compression_codec]}" end if options[:compressed_topics] if @compression_codec != 'none' @compressed_topics = "#{options[:compressed_topics]}" end end if options[:request_timeout_ms] @request_timeout_ms = "#{options[:request_timeout_ms]}" end if options[:producer_type] valid_producer_types = %w{ sync async } if not valid_producer_types.include? "#{options[:producer_type]}" raise(ArgumentError, "#{options[:producer_type]} is not a valid producer type: #{valid_producer_types}") end @producer_type = "#{options[:producer_type]}" end if options[:serializer_class] @serializer_class = "#{options[:serializer_class]}" end if options[:key_serializer_class] @key_serializer_class = "#{options[:key_serializer_class]}" end if options[:message_send_max_retries] @message_send_max_retries = "#{options[:message_send_max_retries]}" end if options[:retry_backoff_ms] @retry_backoff_ms = "#{options[:retry_backoff_ms]}" end if options[:topic_metadata_refresh_interval_ms] @topic_metadata_refresh_interval_ms = "#{options[:topic_metadata_refresh_interval_ms]}" end if options[:queue_buffering_max_ms] @queue_buffering_max_ms = "#{options[:queue_buffering_max_ms]}" end if options[:queue_buffering_max_messages] @queue_buffering_max_messages = "#{options[:queue_buffering_max_messages]}" end if options[:queue_enqueue_timeout_ms] @queue_enqueue_timeout_ms = "#{options[:queue_enqueue_timeout_ms]}" end if options[:batch_num_messages] @batch_num_messages = "#{options[:batch_num_messages]}" end if options[:send_buffer_bytes] @send_buffer_bytes = "#{options[:send_buffer_bytes]}" end if options[:client_id] @client_id = "#{options[:client_id]}" end end
Public Instance Methods
connect()
click to toggle source
# File lib/jruby-kafka/producer.rb, line 130 def connect() @producer = Java::kafka::producer::Producer.new(createProducerConfig) end
createProducerConfig()
click to toggle source
# File lib/jruby-kafka/producer.rb, line 148 def createProducerConfig() # TODO lots more options avaiable here: http://kafka.apache.org/documentation.html#producerconfigs properties = java.util.Properties.new() properties.put("metadata.broker.list", @metadata_broker_list) unless @request_required_acks.nil? properties.put("request.required.acks", @request_required_acks) end unless @partitioner_class.nil? properties.put("partitioner.class", @partitioner_class) end unless @key_serializer_class.nil? properties.put("key.serializer.class", @key_serializer_class) end unless @request_timeout_ms.nil? properties.put("request.timeout.ms", @request_timeout_ms) end unless @producer_type.nil? properties.put('producer.type', @producer_type) end unless @serializer_class.nil? properties.put("serializer.class", @serializer_class) end unless @compression_codec.nil? properties.put("compression.codec", @compression_codec) end unless @compressed_topics.nil? properties.put("compressed.topics", @compressed_topics) end unless @message_send_max_retries.nil? properties.put("message.send.max.retries", @message_send_max_retries) end unless @retry_backoff_ms.nil? properties.put('retry.backoff.ms', @retry_backoff_ms) end unless @topic_metadata_refresh_interval_ms.nil? properties.put('topic.metadata.refresh.interval.ms', @topic_metadata_refresh_interval_ms) end unless @queue_buffering_max_ms.nil? properties.put('queue.buffering.max.ms', @queue_buffering_max_ms) end unless @queue_buffering_max_messages.nil? properties.put('queue.buffering.max.messages', @queue_buffering_max_messages) end unless @queue_enqueue_timeout_ms.nil? properties.put('queue.enqueue.timeout.ms', @queue_enqueue_timeout_ms) end unless @batch_num_messages.nil? properties.put('batch.num.messages', @batch_num_messages) end unless @send_buffer_bytes.nil? properties.put('send.buffer.bytes', @send_buffer_bytes) end unless @client_id.nil? properties.put('client.id', @client_id) end return Java::kafka::producer::ProducerConfig.new(properties) end
sendMsg(topic, key, msg)
click to toggle source
# File lib/jruby-kafka/producer.rb, line 135 def sendMsg(topic, key, msg) m = Java::kafka::producer::KeyedMessage.new(topic=topic, key=key, message=msg) #the send message for a producer is scala varargs, which doesn't seem to play nice w/ jruby # this is the best I could come up with ms = Java::scala::collection::immutable::Vector.new(0,0,0) ms = ms.append_front(m) begin @producer.send(ms) rescue FailedToSendMessageException => e raise KafkaError.new(e), "Got FailedToSendMessageException: #{e}" end end
Private Instance Methods
validate_required_arguments(options={})
click to toggle source
# File lib/jruby-kafka/producer.rb, line 123 def validate_required_arguments(options={}) [:broker_list].each do |opt| raise(ArgumentError, "#{opt} is required.") unless options[opt] end end