class Franz::Output::Device
STDOUT output for Franz
.
Public Class Methods
new(opts={})
click to toggle source
Start a new output in the background. We’ll consume from the input queue and ship events to STDOUT.
@param [Hash] opts options for the output @option opts [Queue] :input ([]) “input” queue
# File lib/franz/output/device.rb, line 17 def initialize opts={} opts = { logger: Logger.new(STDOUT), tags: [], input: [], output: '/dev/stdout' }.deep_merge!(opts) @statz = opts[:statz] || Franz::Stats.new @statz.create :num_output, 0 @device = File.open(opts[:output], 'w') @logger = opts[:logger] @stop = false @foreground = opts[:foreground] @thread = Thread.new do until @stop event = opts[:input].shift unless opts[:tags].empty? event['tags'] ||= [] event['tags'] += opts[:tags] end log.debug \ event: 'publish', raw: event @device.puts JSON::generate(event) @statz.inc :num_output end end log.info event: 'output started' @thread.join if @foreground end
Public Instance Methods
join()
click to toggle source
Join the Output
thread. Effectively only once.
# File lib/franz/output/device.rb, line 59 def join return if @foreground @foreground = true @thread.join end
stop()
click to toggle source
Stop the Output
thread. Effectively only once.
# File lib/franz/output/device.rb, line 66 def stop return if @foreground @foreground = true @thread.kill @device.close log.info event: 'output stopped' end
Private Instance Methods
log()
click to toggle source
# File lib/franz/output/device.rb, line 75 def log ; @logger end