class Rudder::Analytics::Worker

Public Class Methods

new(queue, data_plane_url, write_key, options = {}) click to toggle source

public: Creates a new worker

The worker continuously takes messages off the queue and makes requests to the segment.io api

queue - Queue synchronized between client and worker write_key - String of the project's Write key options - Hash of worker options

batch_size - Fixnum of how many items to send in a batch
on_error   - Proc of what to do on an error
# File lib/rudder/analytics/worker.rb, line 24
def initialize(queue, data_plane_url, write_key, options = {})
  symbolize_keys! options
  @queue = queue
  @data_plane_url = data_plane_url
  @write_key = write_key
  @on_error = options[:on_error] || proc { |status, error| }
  batch_size = options[:batch_size] || Defaults::MessageBatch::MAX_SIZE
  @batch = MessageBatch.new(batch_size)
  @lock = Mutex.new
end

Public Instance Methods

is_requesting?() click to toggle source

public: Check whether we have outstanding requests.

# File lib/rudder/analytics/worker.rb, line 54
def is_requesting?
  @lock.synchronize { !@batch.empty? }
end
run() click to toggle source

public: Continuously runs the loop to check for new events

# File lib/rudder/analytics/worker.rb, line 37
def run
  until Thread.current[:should_exit]
    return if @queue.empty?

    @lock.synchronize do
      consume_message_from_queue! until @batch.full? || @queue.empty?
    end

    res = Request.new(data_plane_url: @data_plane_url).post @write_key, @batch
    @on_error.call(res.status, res.error) unless res.status == 200

    @lock.synchronize { @batch.clear }
  end
end

Private Instance Methods

consume_message_from_queue!() click to toggle source
# File lib/rudder/analytics/worker.rb, line 60
def consume_message_from_queue!
  @batch << @queue.pop
rescue MessageBatch::JSONGenerationError => e
  @on_error.call(-1, e.to_s)
end