class SnowplowTracker::AsyncEmitter
Public Class Methods
new(endpoint, config={})
click to toggle source
Calls superclass method
SnowplowTracker::Emitter::new
# File lib/snowplow-tracker/emitters.rb, line 231 def initialize(endpoint, config={}) @queue = Queue.new() # @all_processed_condition and @results_unprocessed are used to emulate Python's Queue.task_done() @queue.extend(MonitorMixin) @all_processed_condition = @queue.new_cond @results_unprocessed = 0 (config[:thread_count] || 1).times do t = Thread.new do consume end end super(endpoint, config) end
Public Instance Methods
consume()
click to toggle source
# File lib/snowplow-tracker/emitters.rb, line 245 def consume loop do work_unit = @queue.pop send_requests(work_unit) @queue.synchronize do @results_unprocessed -= 1 @all_processed_condition.broadcast end end end
flush(async=true)
click to toggle source
Flush the buffer
If async is false, block until the queue is empty
# File lib/snowplow-tracker/emitters.rb, line 259 def flush(async=true) loop do @lock.synchronize do @queue.synchronize do @results_unprocessed += 1 end @queue << @buffer @buffer = [] end if not async LOGGER.info('Starting synchronous flush') @queue.synchronize do @all_processed_condition.wait_while { @results_unprocessed > 0 } LOGGER.info('Finished synchronous flush') end end break if @buffer.size < 1 end end