class Aws::Xray::Worker
Public Class Methods
new(queue)
click to toggle source
# File lib/aws/xray/worker.rb, line 46 def initialize(queue) @queue = queue end
post(segment)
click to toggle source
@param [Aws::Xray::Segment] segment to send @param [Aws::Xray::Client] client
# File lib/aws/xray/worker.rb, line 18 def post(segment) Aws::Xray.config.logger.debug("#{Thread.current}: Worker.post received a job") @post_lock.synchronize do refresh_if_forked @queue.push(segment) end Aws::Xray.config.logger.debug("#{Thread.current}: Worker.post pushed a job") rescue ThreadError => e raise QueueIsFullError.new(e) end
reset(config)
click to toggle source
@param [Aws::Xray::Worker::Configuration] config
# File lib/aws/xray/worker.rb, line 30 def reset(config) @queue = Thread::SizedQueue.new(config.max_queue_size) @workers.each(&:kill) if defined?(@workers) && !@workers.empty? @workers = Array.new(config.num) { new(@queue).run } end
Private Class Methods
refresh_if_forked()
click to toggle source
# File lib/aws/xray/worker.rb, line 38 def refresh_if_forked if @pid != $$ reset(Aws::Xray.config.worker) @pid = $$ end end
Public Instance Methods
run()
click to toggle source
# File lib/aws/xray/worker.rb, line 50 def run th = Thread.new(@queue) do |queue| loop do Aws::Xray.config.logger.debug("#{Thread.current}: Worker#run waits a job") segment = queue.pop Aws::Xray.config.logger.debug("#{Thread.current}: Worker#run received a job") if segment Client.send_(segment) Aws::Xray.config.logger.debug("#{Thread.current}: Worker#run sent a segment") else Aws::Xray.config.logger.debug("#{Thread.current}: Worker#run received invalid item, ignored it") end end end th.abort_on_exception = true th end