class Aws::Xray::Worker

Public Class Methods

new(queue) click to toggle source
# File lib/aws/xray/worker.rb, line 46
def initialize(queue)
  @queue = queue
end
post(segment) click to toggle source

@param [Aws::Xray::Segment] segment to send @param [Aws::Xray::Client] client

# File lib/aws/xray/worker.rb, line 18
def post(segment)
  Aws::Xray.config.logger.debug("#{Thread.current}: Worker.post received a job")
  @post_lock.synchronize do
    refresh_if_forked
    @queue.push(segment)
  end
  Aws::Xray.config.logger.debug("#{Thread.current}: Worker.post pushed a job")
rescue ThreadError => e
  raise QueueIsFullError.new(e)
end
reset(config) click to toggle source

@param [Aws::Xray::Worker::Configuration] config

# File lib/aws/xray/worker.rb, line 30
def reset(config)
  @queue = Thread::SizedQueue.new(config.max_queue_size)
  @workers.each(&:kill) if defined?(@workers) && !@workers.empty?
  @workers = Array.new(config.num) { new(@queue).run }
end

Private Class Methods

refresh_if_forked() click to toggle source
# File lib/aws/xray/worker.rb, line 38
def refresh_if_forked
  if @pid != $$
    reset(Aws::Xray.config.worker)
    @pid = $$
  end
end

Public Instance Methods

run() click to toggle source
# File lib/aws/xray/worker.rb, line 50
def run
  th = Thread.new(@queue) do |queue|
    loop do
      Aws::Xray.config.logger.debug("#{Thread.current}: Worker#run waits a job")
      segment = queue.pop
      Aws::Xray.config.logger.debug("#{Thread.current}: Worker#run received a job")
      if segment
        Client.send_(segment)
        Aws::Xray.config.logger.debug("#{Thread.current}: Worker#run sent a segment")
      else
        Aws::Xray.config.logger.debug("#{Thread.current}: Worker#run received invalid item, ignored it")
      end
    end
  end
  th.abort_on_exception = true
  th
end