class SnowplowTracker::AsyncEmitter

Public Class Methods

new(endpoint, config={}) click to toggle source
Calls superclass method SnowplowTracker::Emitter::new
# File lib/snowplow-tracker/emitters.rb, line 231
def initialize(endpoint, config={})
  @queue = Queue.new()
  # @all_processed_condition and @results_unprocessed are used to emulate Python's Queue.task_done()
  @queue.extend(MonitorMixin)
  @all_processed_condition = @queue.new_cond
  @results_unprocessed = 0
  (config[:thread_count] || 1).times do
    t = Thread.new do
      consume
    end
  end
  super(endpoint, config)
end

Public Instance Methods

consume() click to toggle source
# File lib/snowplow-tracker/emitters.rb, line 245
def consume
  loop do
    work_unit = @queue.pop
    send_requests(work_unit)
    @queue.synchronize do
      @results_unprocessed -= 1
      @all_processed_condition.broadcast
    end
  end
end
flush(async=true) click to toggle source

Flush the buffer

If async is false, block until the queue is empty
# File lib/snowplow-tracker/emitters.rb, line 259
def flush(async=true)
  loop do
    @lock.synchronize do
      @queue.synchronize do
        @results_unprocessed += 1
      end
      @queue << @buffer
      @buffer = []
    end
    if not async
      LOGGER.info('Starting synchronous flush')
      @queue.synchronize do
        @all_processed_condition.wait_while { @results_unprocessed > 0 }
        LOGGER.info('Finished synchronous flush')
      end
    end
    break if @buffer.size < 1
  end
end