module Resque::StuckQueue

Constants

HEARTBEAT_INTERVAL

defaults

HEARTBEAT_KEY
LOGGER
RECOVERED_HANDLER
TRIGGERED_HANDLER

must be called by convention: type_handler

TRIGGERED_KEY
TRIGGER_TIMEOUT
VERSION
WATCHER_INTERVAL

Attributes

config[RW]

Public Class Methods

abort_on_exception() click to toggle source
# File lib/resque_stuck_queue.rb, line 56
def abort_on_exception
  if !config[:abort_on_exception].nil?
    config[:abort_on_exception] # allow overriding w false
  else
    true # default
  end
end
force_stop!() click to toggle source
# File lib/resque_stuck_queue.rb, line 108
def force_stop!
  logger.info("Force stopping")
  @threads.map(&:kill)
  reset!
end
heartbeat_key_for(queue) click to toggle source
# File lib/resque_stuck_queue.rb, line 32
def heartbeat_key_for(queue)
  if config[:heartbeat_key]
    "#{queue}:#{config[:heartbeat_key]}"
  else
    "#{queue}:#{HEARTBEAT_KEY}"
  end
end
heartbeat_keys() click to toggle source
# File lib/resque_stuck_queue.rb, line 48
def heartbeat_keys
  queues.map{|q| heartbeat_key_for(q) }
end
log_starting_info() click to toggle source
# File lib/resque_stuck_queue.rb, line 145
def log_starting_info
  logger.info("Starting StuckQueue with config: #{self.config.inspect}")
end
log_watcher_info(queue_name) click to toggle source
# File lib/resque_stuck_queue.rb, line 149
def log_watcher_info(queue_name)
  logger.info("Lag time for #{queue_name} is #{lag_time(queue_name).inspect} seconds.")
  if triggered_ago = last_triggered(queue_name)
    logger.info("Last triggered for #{queue_name} is #{triggered_ago.inspect} seconds.")
  else
    logger.info("No last trigger found for #{queue_name}.")
  end
end
logger() click to toggle source
# File lib/resque_stuck_queue.rb, line 24
def logger
  @logger ||= (config[:logger] || StuckQueue::LOGGER)
end
queues() click to toggle source
# File lib/resque_stuck_queue.rb, line 52
def queues
  @queues ||= (config[:queues] || [:app])
end
redis() click to toggle source
# File lib/resque_stuck_queue.rb, line 28
def redis
  @redis ||= config[:redis]
end
reset!() click to toggle source
# File lib/resque_stuck_queue.rb, line 114
def reset!
  # clean state so we can stop and start in the same process.
  @config = Config.new # clear, unfreeze
  @queues = nil
  @running = false
  @logger = nil
end
reset_keys() click to toggle source
# File lib/resque_stuck_queue.rb, line 122
def reset_keys
  queues.each do |qn|
    redis.del(heartbeat_key_for(qn))
    redis.del(triggered_key_for(qn))
  end
end
start() click to toggle source

call this after setting config. once started you should't be allowed to modify it

# File lib/resque_stuck_queue.rb, line 72
def start
  @running = true
  @stopped = false
  @threads = []
  config.validate_required_keys!
  config.freeze

  log_starting_info

  reset_keys

  RedisClassy.redis = redis if RedisClassy.redis.nil?

  pretty_process_name

  setup_heartbeat_thread
  setup_watcher_thread

  setup_warn_thread

  # fo-eva.
  @threads.map(&:join)

  logger.info("threads stopped")
  @stopped = true
end
start_in_background() click to toggle source
# File lib/resque_stuck_queue.rb, line 64
def start_in_background
  Thread.new do
    Thread.current.abort_on_exception = abort_on_exception
    self.start
  end
end
stop() click to toggle source
# File lib/resque_stuck_queue.rb, line 99
def stop
  reset!
  # wait for clean thread shutdown
  while @stopped == false
    sleep 1
  end
  logger.info("Stopped")
end
stopped?() click to toggle source
# File lib/resque_stuck_queue.rb, line 129
def stopped?
  @stopped
end
trigger_handler(queue_name, type) click to toggle source
# File lib/resque_stuck_queue.rb, line 133
def trigger_handler(queue_name, type)
  raise 'Must trigger either the recovered or triggered handler!' unless (type == :recovered || type == :triggered)
  handler_name = :"#{type}_handler"
  logger.info("Triggering #{type} handler for #{queue_name} at #{Time.now}.")
  (config[handler_name] || const_get(handler_name.upcase)).call(queue_name, lag_time(queue_name))
  manual_refresh(queue_name, type)
rescue => e
  logger.info("handler #{type} for #{queue_name} crashed: #{e.inspect}")
  logger.info("\n#{e.backtrace.join("\n")}")
  raise e
end
triggered_key_for(queue) click to toggle source
# File lib/resque_stuck_queue.rb, line 40
def triggered_key_for(queue)
  if config[:triggered_key]
    "#{queue}:#{self.config[:triggered_key]}"
  else
    "#{queue}:#{TRIGGERED_KEY}"
  end
end

Private Class Methods

enqueue_jobs() click to toggle source
# File lib/resque_stuck_queue.rb, line 224
def enqueue_jobs
  if config[:heartbeat_job]
    # FIXME config[:heartbeat_job] with mutliple queues is bad semantics
    config[:heartbeat_job].call
  else
    queues.each do |queue_name|
      # Redis::Namespace.new support as well as Redis.new
      namespace = redis.respond_to?(:namespace) ? redis.namespace : nil
      Resque.enqueue_to(queue_name, HeartbeatJob, heartbeat_key_for(queue_name), redis.client.host, redis.client.port, namespace, Time.now.to_i )
    end
  end
end
lag_time(queue_name) click to toggle source
# File lib/resque_stuck_queue.rb, line 262
def lag_time(queue_name)
  Time.now.to_i - last_successful_heartbeat(queue_name)
end
last_successful_heartbeat(queue_name) click to toggle source
# File lib/resque_stuck_queue.rb, line 237
def last_successful_heartbeat(queue_name)
  time_set = read_from_redis(heartbeat_key_for(queue_name))
  if time_set
    time_set
  else
    logger.info("manually refreshing #{queue_name} for :first_time")
    manual_refresh(queue_name, :first_time)
   end.to_i
end
last_triggered(queue_name) click to toggle source
# File lib/resque_stuck_queue.rb, line 266
def last_triggered(queue_name)
  time_set = read_from_redis(triggered_key_for(queue_name))
  if !time_set.nil?
    Time.now.to_i - time_set.to_i
  end
end
log_starting_thread(type) click to toggle source
# File lib/resque_stuck_queue.rb, line 160
def log_starting_thread(type)
  interval_keyname = "#{type}_interval".to_sym
  logger.info("Starting #{type} thread with interval of #{config[interval_keyname]} seconds")
end
manual_refresh(queue_name, type) click to toggle source
# File lib/resque_stuck_queue.rb, line 247
def manual_refresh(queue_name, type)
  if type == :triggered
    time = Time.now.to_i
    redis.set(triggered_key_for(queue_name), time)
    time
  elsif type == :recovered
    redis.del(triggered_key_for(queue_name))
    nil
  elsif type == :first_time
    time = Time.now.to_i
    redis.set(heartbeat_key_for(queue_name), time)
    time
  end
end
max_wait_time() click to toggle source
# File lib/resque_stuck_queue.rb, line 309
def max_wait_time
  config[:trigger_timeout] || TRIGGER_TIMEOUT
end
pretty_process_name() click to toggle source
# File lib/resque_stuck_queue.rb, line 313
def pretty_process_name
  $0 = "rake --trace resque:stuck_queue #{redis.inspect} QUEUES=#{queues.join(",")}"
end
read_from_redis(keyname) click to toggle source
# File lib/resque_stuck_queue.rb, line 165
def read_from_redis(keyname)
  redis.get(keyname)
end
setup_heartbeat_thread() click to toggle source
# File lib/resque_stuck_queue.rb, line 194
def setup_heartbeat_thread
  @threads << Thread.new do
    Thread.current.abort_on_exception = abort_on_exception
    log_starting_thread(:heartbeat)
    while @running
      # we want to go through resque jobs, because that's what we're trying to test here:
      # ensure that jobs get executed and the time is updated!
      wait_for_it(:heartbeat_interval)
      logger.info("Sending heartbeat jobs")
      enqueue_jobs
    end
  end
end
setup_warn_thread() click to toggle source
# File lib/resque_stuck_queue.rb, line 208
def setup_warn_thread
  if config[:warn_interval]
    @threads << Thread.new do
      Thread.current.abort_on_exception = abort_on_exception
      log_starting_thread(:warn)
      while @running
        queues.each do |qn|
          trigger_handler(qn, :triggered) if should_trigger?(qn, true)
        end
        wait_for_it(:warn_interval)
      end
    end
  end
end
setup_watcher_thread() click to toggle source
# File lib/resque_stuck_queue.rb, line 169
def setup_watcher_thread
  @threads << Thread.new do
    Thread.current.abort_on_exception = abort_on_exception
    log_starting_thread(:watcher)
    while @running
      mutex = RedisMutex.new(:resque_stuck_queue_lock)
      if mutex.lock
        begin
          queues.each do |queue_name|
            log_watcher_info(queue_name)
            if should_trigger?(queue_name)
              trigger_handler(queue_name, :triggered)
            elsif should_recover?(queue_name)
              trigger_handler(queue_name, :recovered)
            end
          end
        ensure
          mutex.unlock
        end
      end
      wait_for_it(:watcher_interval)
    end
  end
end
should_recover?(queue_name) click to toggle source
# File lib/resque_stuck_queue.rb, line 273
def should_recover?(queue_name)
  last_triggered(queue_name) &&
    lag_time(queue_name) < max_wait_time
end
should_trigger?(queue_name, force_trigger = false) click to toggle source
# File lib/resque_stuck_queue.rb, line 278
def should_trigger?(queue_name, force_trigger = false)
  if lag_time(queue_name) >= max_wait_time
    last_trigger = last_triggered(queue_name)

    if force_trigger
      return true
    end

    if last_trigger.nil?
      # if it hasn't been triggered before, do it
      return true
    end

    # if it already triggered in the past don't trigger again.
    # :recovered should clearn out last_triggered so the cycle (trigger<->recover) continues
    return false
  end
end
wait_for_it(type) click to toggle source
# File lib/resque_stuck_queue.rb, line 297
def wait_for_it(type)
  if type == :heartbeat_interval
    sleep config[:heartbeat_interval] || HEARTBEAT_INTERVAL
  elsif type == :watcher_interval
    sleep config[:watcher_interval]   || WATCHER_INTERVAL
  elsif type == :warn_interval
    sleep config[:warn_interval]
  else
    raise 'Must sleep for :watcher_interval interval or :heartbeat_interval or :warn_interval interval!'
  end
end