class Aggregator
Constants
- VERSION
Attributes
logger[RW]
max_batch_size[RW]
max_wait_time[RW]
Public Class Methods
drain()
click to toggle source
# File lib/aggregator.rb, line 26 def self.drain self.instance.drain end
logger=(logger)
click to toggle source
# File lib/aggregator.rb, line 22 def self.logger=(logger) self.instance.logger = logger end
max_batch_size=(value)
click to toggle source
# File lib/aggregator.rb, line 14 def self.max_batch_size=(value) self.instance.max_batch_size = value end
max_wait_time=(value)
click to toggle source
# File lib/aggregator.rb, line 18 def self.max_wait_time=(value) self.instance.max_wait_time = value end
new()
click to toggle source
# File lib/aggregator.rb, line 30 def initialize @queue = Queue.new @mutex = Mutex.new @thread = nil at_exit { stop } end
push(data)
click to toggle source
# File lib/aggregator.rb, line 10 def self.push(data) self.instance.push(data) end
Public Instance Methods
drain()
click to toggle source
# File lib/aggregator.rb, line 43 def drain if running? if ! @queue.empty? log :info, "joining thread #{@thread.inspect} (queue length = #{@queue.length})" @drain = true @thread.join if running? end log :info, "stopping thread #{@thread.inspect} (queue length = #{@queue.length})" @thread = nil elsif ! @queue.empty? start and drain end true end
push(data)
click to toggle source
# File lib/aggregator.rb, line 38 def push(data) @queue.push(data) start unless running? end
Private Instance Methods
finish(collection)
click to toggle source
# File lib/aggregator.rb, line 75 def finish(collection) raise NoMethodError, "#{self.class.name}#finish(collection) must be implemented" end
log(level, message)
click to toggle source
# File lib/aggregator.rb, line 88 def log(level, message) logger.send(level, "[#{self.class.name}] #{message}") end
process(collection, item)
click to toggle source
# File lib/aggregator.rb, line 70 def process(collection, item) raise NoMethodError, "#{self.class.name}#process(collection, item) must be implemented" end
process_queue()
click to toggle source
# File lib/aggregator.rb, line 92 def process_queue raise StopIteration if @queue.empty? && @drain processed_items = 0 start_time = Time.now while processed_items < max_batch_size && (Time.now - start_time) < max_wait_time raise StopIteration if @queue.empty? && @drain if @queue.empty? sleep 0.1 else collection = process(collection, @queue.pop(true)) processed_items += 1 end end ensure finish(collection) if collection end
running?()
click to toggle source
# File lib/aggregator.rb, line 80 def running? @thread && @thread.alive? end
start()
click to toggle source
# File lib/aggregator.rb, line 111 def start @mutex.synchronize do return false if running? @drain = false @thread = Thread.new do begin log :info, "starting thread #{Thread.current}" loop do process_queue end rescue Exception => e log :warn, "thread crashed with exception: #{e.inspect}" end end @thread.priority = 2 @thread end end
stop()
click to toggle source
# File lib/aggregator.rb, line 135 def stop if running? drain else log :info, "thread not running - nothing to stop" return false end end