module Que::CommandLineInterface
Constants
- RAILS_ENVIRONMENT_FILE
Have a sensible default require file for
Rails
.
Public Class Methods
parse( args:, output:, default_require_file: RAILS_ENVIRONMENT_FILE )
click to toggle source
Need to rely on dependency injection a bit to make this method cleanly testable :/
# File lib/que/command_line_interface.rb, line 15 def parse( args:, output:, default_require_file: RAILS_ENVIRONMENT_FILE ) options = {} queues = [] log_level = 'info' log_internals = false poll_interval = 5 connection_url = nil worker_count = nil worker_priorities = nil parser = OptionParser.new do |opts| opts.banner = 'usage: que [options] [file/to/require] ...' opts.on( '-h', '--help', "Show this help text.", ) do output.puts opts.help return 0 end opts.on( '-i', '--poll-interval [INTERVAL]', Float, "Set maximum interval between polls for available jobs, " \ "in seconds (default: 5)", ) do |i| poll_interval = i end opts.on( '--listen [LISTEN]', String, "Set to false to disable listen mode (default: true)" ) do |listen| options[:listen] = listen != "false" end opts.on( '-l', '--log-level [LEVEL]', String, "Set level at which to log to STDOUT " \ "(debug, info, warn, error, fatal) (default: info)", ) do |l| log_level = l end opts.on( '-p', '--worker-priorities [LIST]', Array, "List of priorities to assign to workers (default: 10,30,50,any,any,any)", ) do |priority_array| worker_priorities = priority_array.map do |p| case p when /\Aany\z/i nil when /\A\d+\z/ Integer(p) else output.puts "Invalid priority option: '#{p}'. Please use an integer or the word 'any'." return 1 end end end opts.on( '-q', '--queue-name [NAME]', String, "Set a queue name to work jobs from. " \ "Can be passed multiple times. " \ "(default: the default queue only)", ) do |queue_name| queues << queue_name end opts.on( '-w', '--worker-count [COUNT]', Integer, "Set number of workers in process (default: 6)", ) do |w| worker_count = w end opts.on( '-v', '--version', "Print Que version and exit.", ) do require 'que' output.puts "Que version #{Que::VERSION}" return 0 end opts.on( '--connection-url [URL]', String, "Set a custom database url to connect to for locking purposes.", ) do |url| options[:connection_url] = url end opts.on( '--log-internals', "Log verbosely about Que's internal state. " \ "Only recommended for debugging issues", ) do |l| log_internals = true end opts.on( '--maximum-buffer-size [SIZE]', Integer, "Set maximum number of jobs to be locked and held in this " \ "process awaiting a worker (default: 8)", ) do |s| options[:maximum_buffer_size] = s end opts.on( '--minimum-buffer-size [SIZE]', Integer, "Unused (deprecated)", ) do |s| warn "The --minimum-buffer-size SIZE option has been deprecated and will be removed in v2.0 (it's actually been unused since v1.0.0.beta4)" end opts.on( '--wait-period [PERIOD]', Float, "Set maximum interval between checks of the in-memory job queue, " \ "in milliseconds (default: 50)", ) do |p| options[:wait_period] = p end opts.on( '--pidfile [PATH]', String, "Write the PID of this process to the specified file.", ) do |p| options[:pidfile] = File.expand_path(p) end end parser.parse!(args) options[:worker_priorities] = if worker_count && worker_priorities worker_priorities.values_at(0...worker_count) elsif worker_priorities worker_priorities elsif worker_count Array.new(worker_count) { nil } else [10, 30, 50, nil, nil, nil] end if args.length.zero? if File.exist?(default_require_file) args << default_require_file else output.puts <<-OUTPUT You didn't include any Ruby files to require! Que needs to be able to load your application before it can process jobs. (Or use `que -h` for a list of options) OUTPUT return 1 end end args.each do |file| begin require file rescue LoadError => e output.puts "Could not load file '#{file}': #{e}" return 1 end end Que.logger ||= Logger.new(STDOUT) if log_internals Que.internal_logger = Que.logger end begin Que.get_logger.level = Logger.const_get(log_level.upcase) rescue NameError output.puts "Unsupported logging level: #{log_level} (try debug, info, warn, error, or fatal)" return 1 end if queues.any? queues_hash = {} queues.each do |queue| name, interval = queue.split('=') p = interval ? Float(interval) : poll_interval Que.assert(p > 0.01) { "Poll intervals can't be less than 0.01 seconds!" } queues_hash[name] = p end options[:queues] = queues_hash end options[:poll_interval] = poll_interval locker = begin Que::Locker.new(**options) rescue => e output.puts(e.message) return 1 end # It's a bit sloppy to use a global for this when a local variable would # do, but we want to stop the locker from the CLI specs, so... $stop_que_executable = false %w[INT TERM].each { |signal| trap(signal) { $stop_que_executable = true } } output.puts( <<~STARTUP Que #{Que::VERSION} started worker process with: Worker threads: #{locker.workers.length} (priorities: #{locker.workers.map { |w| w.priority || 'any' }.join(', ')}) Buffer size: #{locker.job_buffer.maximum_size} Queues: #{locker.queues.map { |queue, interval| " - #{queue} (poll interval: #{interval}s)" }.join("\n")} Que waiting for jobs... STARTUP ) loop do sleep 0.01 break if $stop_que_executable || locker.stopping? end output.puts "\nFinishing Que's current jobs before exiting..." locker.stop! output.puts "Que's jobs finished, exiting..." return 0 end