class Vx::Common::AMQP::Supervisor::Threaded

Constants

POOL_INTERVAL
Task

Public Class Methods

build(tasks) click to toggle source
# File lib/vx/common/amqp/supervisor/threaded.rb, line 37
def build(tasks)
  supervisor = new
  tasks.each_pair do |k,v|
    v.to_i.times do |n|
      supervisor.add k, :subscribe, n
    end
  end
  supervisor
end
new() click to toggle source
# File lib/vx/common/amqp/supervisor/threaded.rb, line 61
def initialize
  self.class.resume
  @tasks = Array.new
end
resume() click to toggle source
# File lib/vx/common/amqp/supervisor/threaded.rb, line 47
def resume
  @@shutdown = false
end
shutdown() click to toggle source
# File lib/vx/common/amqp/supervisor/threaded.rb, line 55
def shutdown
  @@shutdown = true
end
shutdown?() click to toggle source
# File lib/vx/common/amqp/supervisor/threaded.rb, line 51
def shutdown?
  @@shutdown
end

Public Instance Methods

add(object, method, id) click to toggle source
# File lib/vx/common/amqp/supervisor/threaded.rb, line 66
def add(object, method, id)
  @tasks.push Task.new(object, method, id).freeze
end
run() click to toggle source
# File lib/vx/common/amqp/supervisor/threaded.rb, line 86
def run
  start_all_threads

  loop do
    task = @tasks.shift
    break unless task

    case
    when shutdown?
      log_thread_error task
    when task.alive?
      @tasks.push task
    else
      process_fail task
    end

    sleep POOL_INTERVAL unless shutdown?
  end
end
run_async() click to toggle source
# File lib/vx/common/amqp/supervisor/threaded.rb, line 82
def run_async
  Thread.new { run }.tap{|t| t.abort_on_exception = true }
end
shutdown() click to toggle source
# File lib/vx/common/amqp/supervisor/threaded.rb, line 78
def shutdown
  self.class.shutdown
end
shutdown?() click to toggle source
# File lib/vx/common/amqp/supervisor/threaded.rb, line 74
def shutdown?
  self.class.shutdown?
end
size() click to toggle source
# File lib/vx/common/amqp/supervisor/threaded.rb, line 70
def size
  @tasks.size
end

Private Instance Methods

check_attempt(task) click to toggle source
# File lib/vx/common/amqp/supervisor/threaded.rb, line 165
def check_attempt(task)
  task.attempt.to_i <= Common::AMQP.config.spawn_attempts.to_i
end
create_thread(task, attempt) click to toggle source
# File lib/vx/common/amqp/supervisor/threaded.rb, line 127
def create_thread(task, attempt)
  attempt = 0 if reset_attempt?(task)
  task.dup.tap do |new_task|
    new_task.thread = Thread.new(new_task) do |t|
      Thread.current.abort_on_exception = false
      Thread.current[:vx_amqp_consumer_id] = t.id
      t.object.send t.method
    end
    new_task.attempt  = attempt
    new_task.start_at = Time.now
    new_task.freeze
    instrument("spawn_thread.consumer.ampq", task: new_task.inspect)
    new_task
  end
end
log_thread_error(task) click to toggle source
# File lib/vx/common/amqp/supervisor/threaded.rb, line 143
def log_thread_error(task)
  return unless task.thread

  begin
    task.thread.value
    nil
  rescue Exception => e
    $stderr.puts "#{e.inspect} in #{task.inspect}"
    $stderr.puts e.backtrace.map{|b| "    #{b}" }.join("\n")

    Common::AMQP.config.on_error.call(e, task: task)
    e
  end
end
process_fail(task) click to toggle source
# File lib/vx/common/amqp/supervisor/threaded.rb, line 108
def process_fail(task)
  log_thread_error task
  if check_attempt task
    @tasks.push create_thread(task, task.attempt + 1)
  else
    raise SpawnAttemptsLimitReached
  end
end
reset_attempt?(task) click to toggle source
# File lib/vx/common/amqp/supervisor/threaded.rb, line 158
def reset_attempt?(task)
  return true unless task.start_at

  interval = 60
  (task.start_at + interval) < Time.now
end
start_all_threads() click to toggle source
# File lib/vx/common/amqp/supervisor/threaded.rb, line 117
def start_all_threads
  started_tasks = Array.new
  while task = @tasks.shift
    started_tasks.push create_thread(task, 0)
  end
  while task = started_tasks.shift
    @tasks.push task
  end
end