class Aggregator

Constants

VERSION

Attributes

logger[RW]
max_batch_size[RW]
max_wait_time[RW]

Public Class Methods

drain() click to toggle source
# File lib/aggregator.rb, line 26
def self.drain
  self.instance.drain
end
logger=(logger) click to toggle source
# File lib/aggregator.rb, line 22
def self.logger=(logger)
  self.instance.logger = logger
end
max_batch_size=(value) click to toggle source
# File lib/aggregator.rb, line 14
def self.max_batch_size=(value)
  self.instance.max_batch_size = value
end
max_wait_time=(value) click to toggle source
# File lib/aggregator.rb, line 18
def self.max_wait_time=(value)
  self.instance.max_wait_time = value
end
new() click to toggle source
# File lib/aggregator.rb, line 30
def initialize
  @queue = Queue.new
  @mutex = Mutex.new
  @thread = nil

  at_exit { stop }
end
push(data) click to toggle source
# File lib/aggregator.rb, line 10
def self.push(data)
  self.instance.push(data)
end

Public Instance Methods

drain() click to toggle source
# File lib/aggregator.rb, line 43
def drain
  if running?
    if ! @queue.empty?
      log :info, "joining thread #{@thread.inspect} (queue length = #{@queue.length})"
      @drain = true
      @thread.join if running?
    end

    log :info, "stopping thread #{@thread.inspect} (queue length = #{@queue.length})"
    @thread = nil
  elsif ! @queue.empty?
    start and drain
  end

  true
end
push(data) click to toggle source
# File lib/aggregator.rb, line 38
def push(data)
  @queue.push(data)
  start unless running?
end

Private Instance Methods

finish(collection) click to toggle source
# File lib/aggregator.rb, line 75
def finish(collection)
  raise NoMethodError,
    "#{self.class.name}#finish(collection) must be implemented"
end
log(level, message) click to toggle source
# File lib/aggregator.rb, line 88
def log(level, message)
  logger.send(level, "[#{self.class.name}] #{message}")
end
process(collection, item) click to toggle source
# File lib/aggregator.rb, line 70
def process(collection, item)
  raise NoMethodError,
    "#{self.class.name}#process(collection, item) must be implemented"
end
process_queue() click to toggle source
# File lib/aggregator.rb, line 92
def process_queue
  raise StopIteration if @queue.empty? && @drain

  processed_items = 0
  start_time = Time.now

  while processed_items < max_batch_size && (Time.now - start_time) < max_wait_time
    raise StopIteration if @queue.empty? && @drain
    if @queue.empty?
      sleep 0.1
    else
      collection = process(collection, @queue.pop(true))
      processed_items += 1
    end
  end
ensure
  finish(collection) if collection
end
running?() click to toggle source
# File lib/aggregator.rb, line 80
def running?
  @thread && @thread.alive?
end
start() click to toggle source
# File lib/aggregator.rb, line 111
def start
  @mutex.synchronize do
    return false if running?

    @drain = false

    @thread = Thread.new do
      begin
        log :info, "starting thread #{Thread.current}"

        loop do
          process_queue
        end
      rescue Exception => e
        log :warn, "thread crashed with exception: #{e.inspect}"
      end
    end

    @thread.priority = 2

    @thread
  end
end
stop() click to toggle source
# File lib/aggregator.rb, line 135
def stop
  if running?
    drain
  else
    log :info, "thread not running - nothing to stop"
    return false
  end
end