class Anschel::Input
Public Class Methods
new(config, qsize, stats, log, leftovers=[])
click to toggle source
# File lib/anschel/input.rb, line 7 def initialize config, qsize, stats, log, leftovers=[] log.info event: 'output-loading' log.info event: 'output-config', config: config, qsize: qsize @queue = SizedQueue.new(qsize || 2000) Thread.new do leftovers ||= [] log.warn event: 'input-leftovers', leftovers_size: leftovers.size leftovers.each { |l| @queue << l } end @inputs = [] stats.create 'input' stats.get 'input' config.each do |input| case input.delete(:kind) when 'kafka' @inputs << Input::Kafka.new(@queue, input, stats, log) log.info event: 'input-loaded', kind: 'kafka' when 'rabbitmq' @inputs << Input::RabbitMQ.new(@queue, input, stats, log) log.info event: 'input-loaded', kind: 'rabbitmq' else raise 'Uknown input type' end end log.info event: 'input-fully-loaded' end
Public Instance Methods
leftovers()
click to toggle source
# File lib/anschel/input.rb, line 49 def leftovers return @leftovers if defined? @leftovers end
shift()
click to toggle source
# File lib/anschel/input.rb, line 54 def shift event = @queue.shift case event when String event else event.message.to_s end end
stop()
click to toggle source
# File lib/anschel/input.rb, line 41 def stop return @leftovers if defined? @leftovers @inputs.map &:stop @leftovers = [] @leftovers << shift until @queue.empty? end