class SuckerPunch::Queue
Constants
- DEFAULT_EXECUTOR_OPTIONS
- DEFAULT_MAX_QUEUE_SIZE
- PAUSE_TIME
- QUEUES
Attributes
name[R]
Public Class Methods
all()
click to toggle source
# File lib/sucker_punch/queue.rb, line 33 def self.all queues = Concurrent::Array.new QUEUES.each_pair do |name, pool| queues.push new(name, pool) end queues end
clear()
click to toggle source
# File lib/sucker_punch/queue.rb, line 41 def self.clear # susceptible to race conditions--only use in testing old = all QUEUES.clear SuckerPunch::Counter::Busy.clear SuckerPunch::Counter::Processed.clear SuckerPunch::Counter::Failed.clear old.each { |queue| queue.kill } end
find_or_create(name, num_workers = 2, num_jobs_max = nil)
click to toggle source
# File lib/sucker_punch/queue.rb, line 19 def self.find_or_create(name, num_workers = 2, num_jobs_max = nil) pool = QUEUES.fetch_or_store(name) do options = DEFAULT_EXECUTOR_OPTIONS .merge( min_threads: num_workers, max_threads: num_workers, max_queue: num_jobs_max || DEFAULT_MAX_QUEUE_SIZE ) Concurrent::ThreadPoolExecutor.new(**options) end new(name, pool) end
new(name, pool)
click to toggle source
Calls superclass method
# File lib/sucker_punch/queue.rb, line 134 def initialize(name, pool) super() @running = true @name, @pool = name, pool end
shutdown_all()
click to toggle source
# File lib/sucker_punch/queue.rb, line 74 def self.shutdown_all deadline = Time.now + SuckerPunch.shutdown_timeout if SuckerPunch::RUNNING.make_false # If a job is enqueued right before the script exits # (command line, rake task, etc.), the system needs an # interval to allow the enqueue jobs to make it in to the system # otherwise the queue will look idle sleep PAUSE_TIME queues = all # Issue shutdown to each queue and let them wrap up their work. This # prevents new jobs from being enqueued and lets the pool clean up # after itself queues.each { |queue| queue.shutdown } # return if every queue is empty and workers in every queue are idle return if queues.all? { |queue| queue.idle? } SuckerPunch.logger.info("Pausing to allow workers to finish...") remaining = deadline - Time.now # Continue to loop through each queue and test if it's idle, while # respecting the shutdown timeout while remaining > PAUSE_TIME return if queues.all? { |queue| queue.idle? } sleep PAUSE_TIME remaining = deadline - Time.now end # Queues haven't finished work. Aggressively kill them. SuckerPunch.logger.warn("Queued jobs didn't finish before shutdown_timeout...killing remaining jobs") queues.each { |queue| queue.kill } end end
stats()
click to toggle source
# File lib/sucker_punch/queue.rb, line 51 def self.stats queues = {} all.each do |queue| queues[queue.name] = { "workers" => { "total" => queue.total_workers, "busy" => queue.busy_workers, "idle" => queue.idle_workers, }, "jobs" => { "processed" => queue.processed_jobs, "failed" => queue.failed_jobs, "enqueued" => queue.enqueued_jobs, } } end queues end
Public Instance Methods
==(other)
click to toggle source
# File lib/sucker_punch/queue.rb, line 148 def ==(other) pool == other.pool end
busy_workers()
click to toggle source
# File lib/sucker_punch/queue.rb, line 152 def busy_workers SuckerPunch::Counter::Busy.new(name).value end
failed_jobs()
click to toggle source
# File lib/sucker_punch/queue.rb, line 164 def failed_jobs SuckerPunch::Counter::Failed.new(name).value end
idle?()
click to toggle source
# File lib/sucker_punch/queue.rb, line 144 def idle? enqueued_jobs == 0 && busy_workers == 0 end
idle_workers()
click to toggle source
# File lib/sucker_punch/queue.rb, line 156 def idle_workers total_workers - busy_workers end
kill()
click to toggle source
# File lib/sucker_punch/queue.rb, line 179 def kill @pool.kill end
post(*args, &block)
click to toggle source
# File lib/sucker_punch/queue.rb, line 168 def post(*args, &block) synchronize do if @running @pool.post(*args, &block) else false end end end
processed_jobs()
click to toggle source
# File lib/sucker_punch/queue.rb, line 160 def processed_jobs SuckerPunch::Counter::Processed.new(name).value end
running?()
click to toggle source
# File lib/sucker_punch/queue.rb, line 140 def running? synchronize { @running } end
shutdown()
click to toggle source
# File lib/sucker_punch/queue.rb, line 183 def shutdown synchronize { @running = false } @pool.shutdown end
Protected Instance Methods
pool()
click to toggle source
# File lib/sucker_punch/queue.rb, line 190 def pool @pool end