class Anschel::Input

Public Class Methods

new(config, qsize, stats, log, leftovers=[]) click to toggle source
# File lib/anschel/input.rb, line 7
def initialize config, qsize, stats, log, leftovers=[]
  log.info event: 'output-loading'
  log.info event: 'output-config', config: config, qsize: qsize

  @queue = SizedQueue.new(qsize || 2000)

  Thread.new do
    leftovers ||= []
    log.warn event: 'input-leftovers', leftovers_size: leftovers.size
    leftovers.each { |l| @queue << l }
  end

  @inputs = []

  stats.create 'input'
  stats.get 'input'

  config.each do |input|
    case input.delete(:kind)
    when 'kafka'
      @inputs << Input::Kafka.new(@queue, input, stats, log)
      log.info event: 'input-loaded', kind: 'kafka'
    when 'rabbitmq'
      @inputs << Input::RabbitMQ.new(@queue, input, stats, log)
      log.info event: 'input-loaded', kind: 'rabbitmq'
    else
      raise 'Uknown input type'
    end
  end

  log.info event: 'input-fully-loaded'
end

Public Instance Methods

leftovers() click to toggle source
# File lib/anschel/input.rb, line 49
def leftovers
  return @leftovers if defined? @leftovers
end
shift() click to toggle source
# File lib/anschel/input.rb, line 54
def shift
  event = @queue.shift
  case event
  when String
    event
  else
    event.message.to_s
  end
end
stop() click to toggle source
# File lib/anschel/input.rb, line 41
def stop
  return @leftovers if defined? @leftovers
  @inputs.map &:stop
  @leftovers = []
  @leftovers << shift until @queue.empty?
end