class Franz::Output::Device

STDOUT output for Franz.

Public Class Methods

new(opts={}) click to toggle source

Start a new output in the background. We’ll consume from the input queue and ship events to STDOUT.

@param [Hash] opts options for the output @option opts [Queue] :input ([]) “input” queue

# File lib/franz/output/device.rb, line 17
def initialize opts={}
  opts = {
    logger: Logger.new(STDOUT),
    tags: [],
    input: [],
    output: '/dev/stdout'
  }.deep_merge!(opts)

  @statz = opts[:statz] || Franz::Stats.new
  @statz.create :num_output, 0

  @device = File.open(opts[:output], 'w')
  @logger = opts[:logger]

  @stop = false
  @foreground = opts[:foreground]

  @thread = Thread.new do
    until @stop
      event = opts[:input].shift

      unless opts[:tags].empty?
        event['tags'] ||= []
        event['tags']  += opts[:tags]
      end

      log.debug \
        event: 'publish',
        raw: event

      @device.puts JSON::generate(event)

      @statz.inc :num_output
    end
  end

  log.info event: 'output started'

  @thread.join if @foreground
end

Public Instance Methods

join() click to toggle source

Join the Output thread. Effectively only once.

# File lib/franz/output/device.rb, line 59
def join
  return if @foreground
  @foreground = true
  @thread.join
end
stop() click to toggle source

Stop the Output thread. Effectively only once.

# File lib/franz/output/device.rb, line 66
def stop
  return if @foreground
  @foreground = true
  @thread.kill
  @device.close
  log.info event: 'output stopped'
end

Private Instance Methods

log() click to toggle source
# File lib/franz/output/device.rb, line 75
def log ; @logger end