class Yabeda::Datadog::Worker

Perform actions async

Constants

REGISTER
SEND

Attributes

logger[R]
queue[R]
threads[R]

Public Class Methods

new(queue) click to toggle source
# File lib/yabeda/datadog/worker.rb, line 21
def initialize(queue)
  @queue = queue
  @threads = []
end
start(config) click to toggle source
# File lib/yabeda/datadog/worker.rb, line 10
def self.start(config)
  queue_size = config.queue_size
  num_threads = config.num_threads
  Logging.instance.info("start worker; queue size: #{queue_size}; threads #{num_threads} ")

  queue = SizedQueue.new(queue_size)
  instance = new(queue)
  instance.spawn_threads(num_threads)
  instance
end

Public Instance Methods

enqueue(action, payload) click to toggle source
# File lib/yabeda/datadog/worker.rb, line 26
def enqueue(action, payload)
  queue.push([action, payload])
end
spawn_threads(num_threads) click to toggle source
# File lib/yabeda/datadog/worker.rb, line 30
def spawn_threads(num_threads)
  num_threads.times do
    threads << Thread.new do
      grouped_actions = Hash.new { |hash, key| hash[key] = [] }

      while running? || actions_left?
        batch_size = 0
        # wait for actions, blocks the current thread
        action_key, action_payload = wait_for_action
        if action_key
          grouped_actions[action_key].push(action_payload)
          batch_size += 1
        end

        # group a batch of actions
        while batch_size < Yabeda::Datadog.config.batch_size
          begin
            action_key, action_payload = dequeue_action
            grouped_actions[action_key].push(action_payload)
            batch_size += 1
          rescue ThreadError
            break # exit batch loop if we drain the queue
          end
        end

        # invoke actions in batches
        grouped_actions.each_pair do |group_key, group_payload|
          self.class.const_get(group_key, false).call(group_payload)
        end

        grouped_actions.clear
      end
    end
  end

  true
end
spawned_threads_count() click to toggle source
# File lib/yabeda/datadog/worker.rb, line 68
def spawned_threads_count
  threads.size
end
stop() click to toggle source
# File lib/yabeda/datadog/worker.rb, line 72
def stop
  Logging.instance.info("stop worker")
  queue.close
  threads.each(&:exit)
  threads.clear
  true
end

Private Instance Methods

actions_left?() click to toggle source
# File lib/yabeda/datadog/worker.rb, line 84
def actions_left?
  !queue.empty?
end
dequeue_action() click to toggle source
# File lib/yabeda/datadog/worker.rb, line 92
def dequeue_action
  queue.pop(true)
end
no_acitons?() click to toggle source
# File lib/yabeda/datadog/worker.rb, line 88
def no_acitons?
  queue.empty?
end
running?() click to toggle source
# File lib/yabeda/datadog/worker.rb, line 100
def running?
  !queue.closed?
end
wait_for_action() click to toggle source
# File lib/yabeda/datadog/worker.rb, line 96
def wait_for_action
  queue.pop(false)
end