class Rudder::Analytics::Worker
Public Class Methods
new(queue, data_plane_url, write_key, options = {})
click to toggle source
public: Creates a new worker
The worker continuously takes messages off the queue and makes requests to the segment.io api
queue - Queue synchronized between client and worker write_key - String of the project's Write key options - Hash of worker options
batch_size - Fixnum of how many items to send in a batch on_error - Proc of what to do on an error
# File lib/rudder/analytics/worker.rb, line 24 def initialize(queue, data_plane_url, write_key, options = {}) symbolize_keys! options @queue = queue @data_plane_url = data_plane_url @write_key = write_key @on_error = options[:on_error] || proc { |status, error| } batch_size = options[:batch_size] || Defaults::MessageBatch::MAX_SIZE @batch = MessageBatch.new(batch_size) @lock = Mutex.new end
Public Instance Methods
is_requesting?()
click to toggle source
public: Check whether we have outstanding requests.
# File lib/rudder/analytics/worker.rb, line 54 def is_requesting? @lock.synchronize { !@batch.empty? } end
run()
click to toggle source
public: Continuously runs the loop to check for new events
# File lib/rudder/analytics/worker.rb, line 37 def run until Thread.current[:should_exit] return if @queue.empty? @lock.synchronize do consume_message_from_queue! until @batch.full? || @queue.empty? end res = Request.new(data_plane_url: @data_plane_url).post @write_key, @batch @on_error.call(res.status, res.error) unless res.status == 200 @lock.synchronize { @batch.clear } end end
Private Instance Methods
consume_message_from_queue!()
click to toggle source
# File lib/rudder/analytics/worker.rb, line 60 def consume_message_from_queue! @batch << @queue.pop rescue MessageBatch::JSONGenerationError => e @on_error.call(-1, e.to_s) end