class Toiler::Actor::Supervisor

Actor that starts and supervises Toiler's actors

Attributes

client[RW]

Public Class Methods

new() click to toggle source
# File lib/toiler/actor/supervisor.rb, line 12
def initialize
  @client = ::Aws::SQS::Client.new
  spawn_processors
  spawn_fetchers
end

Public Instance Methods

on_message(_msg) click to toggle source
# File lib/toiler/actor/supervisor.rb, line 18
def on_message(_msg)
  pass
end
spawn_fetchers() click to toggle source
# File lib/toiler/actor/supervisor.rb, line 22
def spawn_fetchers
  Toiler.active_worker_class_registry.each do |queue, klass|
    count = klass.concurrency
    begin
      fetcher = Actor::Fetcher.spawn! name: "fetcher_#{queue}".to_sym,
                                      supervise: true,
                                      args: [queue, client, count]
      Toiler.set_fetcher queue, fetcher
    rescue StandardError => e
      error "Failed to start Fetcher for queue #{queue}: #{e.message}\n#{e.backtrace.join("\n")}"
    end
  end
end
spawn_processors() click to toggle source
# File lib/toiler/actor/supervisor.rb, line 36
def spawn_processors
  Toiler.active_worker_class_registry.each do |queue, klass|
    name = "processor_pool_#{queue}".to_sym
    count = klass.concurrency
    begin
      pool = Concurrent::Actor::Utils::Pool.spawn! name, count do |index|
        Actor::Processor.spawn name: "processor_#{queue}_#{index}".to_sym,
                               supervise: true,
                               args: [queue]
      end
      Toiler.set_processor_pool queue, pool
    rescue StandardError => e
      error "Failed to spawn Processor Pool for queue #{queue}: #{e.message}\n#{e.backtrace.join("\n")}"
    end
  end
end