class Yabeda::Datadog::Worker
Perform actions async¶ ↑
Constants
- REGISTER
- SEND
Attributes
logger[R]
queue[R]
threads[R]
Public Class Methods
new(queue)
click to toggle source
# File lib/yabeda/datadog/worker.rb, line 21 def initialize(queue) @queue = queue @threads = [] end
start(config)
click to toggle source
# File lib/yabeda/datadog/worker.rb, line 10 def self.start(config) queue_size = config.queue_size num_threads = config.num_threads Logging.instance.info("start worker; queue size: #{queue_size}; threads #{num_threads} ") queue = SizedQueue.new(queue_size) instance = new(queue) instance.spawn_threads(num_threads) instance end
Public Instance Methods
enqueue(action, payload)
click to toggle source
# File lib/yabeda/datadog/worker.rb, line 26 def enqueue(action, payload) queue.push([action, payload]) end
spawn_threads(num_threads)
click to toggle source
# File lib/yabeda/datadog/worker.rb, line 30 def spawn_threads(num_threads) num_threads.times do threads << Thread.new do grouped_actions = Hash.new { |hash, key| hash[key] = [] } while running? || actions_left? batch_size = 0 # wait for actions, blocks the current thread action_key, action_payload = wait_for_action if action_key grouped_actions[action_key].push(action_payload) batch_size += 1 end # group a batch of actions while batch_size < Yabeda::Datadog.config.batch_size begin action_key, action_payload = dequeue_action grouped_actions[action_key].push(action_payload) batch_size += 1 rescue ThreadError break # exit batch loop if we drain the queue end end # invoke actions in batches grouped_actions.each_pair do |group_key, group_payload| self.class.const_get(group_key, false).call(group_payload) end grouped_actions.clear end end end true end
spawned_threads_count()
click to toggle source
# File lib/yabeda/datadog/worker.rb, line 68 def spawned_threads_count threads.size end
stop()
click to toggle source
# File lib/yabeda/datadog/worker.rb, line 72 def stop Logging.instance.info("stop worker") queue.close threads.each(&:exit) threads.clear true end
Private Instance Methods
actions_left?()
click to toggle source
# File lib/yabeda/datadog/worker.rb, line 84 def actions_left? !queue.empty? end
dequeue_action()
click to toggle source
# File lib/yabeda/datadog/worker.rb, line 92 def dequeue_action queue.pop(true) end
no_acitons?()
click to toggle source
# File lib/yabeda/datadog/worker.rb, line 88 def no_acitons? queue.empty? end
running?()
click to toggle source
# File lib/yabeda/datadog/worker.rb, line 100 def running? !queue.closed? end
wait_for_action()
click to toggle source
# File lib/yabeda/datadog/worker.rb, line 96 def wait_for_action queue.pop(false) end