module CronoTrigger::Worker

Constants

EXECUTOR_SHUTDOWN_TIMELIMIT
HEARTBEAT_INTERVAL
OTHER_THREAD_SHUTDOWN_TIMELIMIT
SIGNAL_FETCH_INTERVAL

Attributes

polling_threads[R]

Public Class Methods

new() click to toggle source
# File lib/crono_trigger/worker.rb, line 13
def initialize
  @crono_trigger_worker_id = CronoTrigger.config.worker_id
  @stop_flag = ServerEngine::BlockingFlag.new
  @heartbeat_stop_flag = ServerEngine::BlockingFlag.new
  @signal_fetch_stop_flag = ServerEngine::BlockingFlag.new
  @model_queue = Queue.new
  @model_names = CronoTrigger.config.model_names || CronoTrigger::Schedulable.included_by
  @model_names.each do |model_name|
    @model_queue << model_name
  end
  @executor = Concurrent::ThreadPoolExecutor.new(
    min_threads: 1,
    max_threads: CronoTrigger.config.executor_thread,
    max_queue: CronoTrigger.config.executor_thread * 2,
  )
  @execution_counter = Concurrent::AtomicFixnum.new
  @logger = Logger.new(STDOUT) unless @logger
  ActiveRecord::Base.logger = @logger
end

Public Instance Methods

quiet?() click to toggle source
# File lib/crono_trigger/worker.rb, line 71
def quiet?
  @polling_threads&.all?(&:quiet?)
end
run() click to toggle source
# File lib/crono_trigger/worker.rb, line 33
def run
  @heartbeat_thread = run_heartbeat_thread
  @signal_fetcn_thread = run_signal_fetch_thread

  polling_thread_count = CronoTrigger.config.polling_thread || [@model_names.size, Concurrent.processor_count].min
  # Assign local variable for Signal handling
  polling_threads = polling_thread_count.times.map { PollingThread.new(@model_queue, @stop_flag, @logger, @executor, @execution_counter) }
  @polling_threads = polling_threads
  @polling_threads.each(&:run)

  ServerEngine::SignalThread.new do |st|
    st.trap(:TSTP) do
      @logger.info("[worker_id:#{@crono_trigger_worker_id}] Transit to quiet mode")
      polling_threads.each(&:quiet)
      heartbeat
    end
  end

  @polling_threads.each(&:join)

  @executor.shutdown
  @executor.wait_for_termination(EXECUTOR_SHUTDOWN_TIMELIMIT)
  @heartbeat_thread.join(OTHER_THREAD_SHUTDOWN_TIMELIMIT)
  @signal_fetcn_thread.join(OTHER_THREAD_SHUTDOWN_TIMELIMIT)

  unregister
end
stop() click to toggle source
# File lib/crono_trigger/worker.rb, line 61
def stop
  @stop_flag.set!
  @heartbeat_stop_flag.set!
  @signal_fetch_stop_flag.set!
end
stopped?() click to toggle source
# File lib/crono_trigger/worker.rb, line 67
def stopped?
  @stop_flag.set?
end

Private Instance Methods

executor_status() click to toggle source
# File lib/crono_trigger/worker.rb, line 113
def executor_status
  case
  when @executor.shutdown?
    "shutdown"
  when @executor.shuttingdown?
    "shuttingdown"
  when @executor.running?
    if quiet?
      "quiet"
    else
      "running"
    end
  end
end
handle_signal_from_rdb() click to toggle source
# File lib/crono_trigger/worker.rb, line 137
def handle_signal_from_rdb
  CronoTrigger::Models::Signal.connection_pool.with_connection do
    CronoTrigger::Models::Signal.sent_to_me.take(1)[0]&.tap do |s|
      @logger.info("[worker_id:#{@crono_trigger_worker_id}] Receive Signal #{s.signal} from database")
      s.kill_me(to_supervisor: s.signal != "TSTP")
    end
  end
rescue => ex
  CronoTrigger::GlobalExceptionHandler.handle_global_exception(ex)
end
heartbeat() click to toggle source
# File lib/crono_trigger/worker.rb, line 94
def heartbeat
  CronoTrigger::Models::Worker.connection_pool.with_connection do
    begin
      worker_record = CronoTrigger::Models::Worker.find_or_initialize_by(worker_id: @crono_trigger_worker_id)
      worker_record.max_thread_size = @executor.max_length
      worker_record.current_executing_size = @executor.scheduled_task_count
      worker_record.current_queue_size = @execution_counter.value
      worker_record.executor_status = executor_status
      worker_record.polling_model_names = @model_names
      worker_record.last_heartbeated_at = Time.current
      @logger.info("[worker_id:#{@crono_trigger_worker_id}] Send heartbeat to database")
      worker_record.save!
    rescue => ex
      CronoTrigger::GlobalExceptionHandler.handle_global_exception(ex)
      stop
    end
  end
end
run_heartbeat_thread() click to toggle source
# File lib/crono_trigger/worker.rb, line 77
def run_heartbeat_thread
  heartbeat
  Thread.start do
    until @heartbeat_stop_flag.wait_for_set(HEARTBEAT_INTERVAL)
      heartbeat
    end
  end
end
run_signal_fetch_thread() click to toggle source
# File lib/crono_trigger/worker.rb, line 86
def run_signal_fetch_thread
  Thread.start do
    until @signal_fetch_stop_flag.wait_for_set(SIGNAL_FETCH_INTERVAL)
      handle_signal_from_rdb
    end
  end
end
unregister() click to toggle source
# File lib/crono_trigger/worker.rb, line 128
def unregister
  @logger.info("[worker_id:#{@crono_trigger_worker_id}] Unregister worker from database")
  CronoTrigger::Models::Worker.connection_pool.with_connection do
    CronoTrigger::Models::Worker.find_by(worker_id: @crono_trigger_worker_id)&.destroy
  end
rescue => ex
  CronoTrigger::GlobalExceptionHandler.handle_global_exception(ex)
end