class Franz::Output::Kafka
Public Class Methods
new(opts={})
click to toggle source
Start a new output in the background. We’ll consume from the input queue and ship events to STDOUT.
@param [Hash] opts options for the output @option opts [Queue] :input ([]) “input” queue @option opts [Queue] :output ([]) “output” configuration
# File lib/franz/output/kafka.rb, line 22 def initialize opts={} opts = { logger: Logger.new(STDOUT), tags: [], input: [], output: { topic: 'franz', flush_interval: 10, flush_size: 500, client_id: @@host, type: 'sync', compression_codec: 'snappy', metadata_refresh_interval_ms: 600000, max_send_retries: 3, retry_backoff_ms: 100, required_acks: 0, ack_timeout_ms: 1500, socket_timeout_ms: 10000 } }.deep_merge!(opts) @statz = opts[:statz] || Franz::Stats.new @statz.create :num_output, 0 @logger = opts[:logger] @stop = false @foreground = opts[:foreground] @flush_size = opts[:output].delete :flush_size @flush_interval = opts[:output].delete :flush_interval @topic = opts[:output].delete :topic @kafka_brokers = opts[:output].delete(:brokers) || %w[ localhost:9092 ] @kafka_client_id = opts[:output].delete :client_id @kafka_config = opts[:output].map { |k,v| [ k, v.is_a?(String) ? v.to_sym : v ] } kafka_connect @lock = Mutex.new @messages = [] @thread = Thread.new do until @stop @lock.synchronize do num_messages = kafka_send @messages log.debug \ event: 'periodic flush', num_messages: num_messages end sleep @flush_interval end end @thread = Thread.new do until @stop event = opts[:input].shift unless opts[:tags].empty? event['tags'] ||= [] event['tags'] += opts[:tags] end log.debug \ event: 'publish', raw: event payload = JSON::generate(event) @lock.synchronize do @messages << Poseidon::MessageToSend.new(@topic, payload) if @messages.size >= @flush_size num_messages = kafka_send @messages log.debug \ event: 'flush', num_messages: num_messages end end end end log.info event: 'output started' @thread.join if @foreground end
Public Instance Methods
join()
click to toggle source
Join the Output
thread. Effectively only once.
# File lib/franz/output/kafka.rb, line 117 def join return if @foreground @foreground = true @thread.join end
stop()
click to toggle source
Stop the Output
thread. Effectively only once.
# File lib/franz/output/kafka.rb, line 125 def stop return if @foreground @foreground = true @thread.kill log.info event: 'output stopped' end
Private Instance Methods
kafka_connect()
click to toggle source
# File lib/franz/output/kafka.rb, line 136 def kafka_connect @kafka = Poseidon::Producer.new \ @kafka_brokers, @kafka_client_id, Hash[@kafka_config] end
kafka_send(messages)
click to toggle source
# File lib/franz/output/kafka.rb, line 143 def kafka_send messages return 0 if @messages.empty? @kafka.send_messages @messages @statz.inc :num_output, @messages.length size = @messages.size @messages = [] return size rescue Poseidon::Errors::UnableToFetchMetadata log.warn event: 'output dropped' kafka_connect sleep 1 retry end
log()
click to toggle source
# File lib/franz/output/kafka.rb, line 134 def log ; @logger end