class Zhong::Job

Attributes

at[R]
category[R]
every[R]
id[R]
last_ran[R]
name[R]

Public Class Methods

new(job_name, config = {}, callbacks = {}, &block) click to toggle source
# File lib/zhong/job.rb, line 8
def initialize(job_name, config = {}, callbacks = {}, &block)
  @name = job_name
  @category = config[:category]
  @logger = config[:logger]
  @config = config
  @callbacks = callbacks

  @at = config[:at] ? At.parse(config[:at], grace: config.fetch(:grace, 15.minutes)) : nil
  @every = config[:every] ? Every.parse(config[:every]) : nil

  raise "must specific either `at` or `every` for job: #{self}" unless @at || @every

  @block = block

  @if = config[:if]
  @long_running_timeout = config[:long_running_timeout]
  @running = false
  @first_run = true
  @last_ran = nil
  @id = Digest::SHA256.hexdigest(@name)
end

Public Instance Methods

clear() click to toggle source
# File lib/zhong/job.rb, line 124
def clear
  redis.del(last_ran_key)
end
desired_at_key() click to toggle source
# File lib/zhong/job.rb, line 132
def desired_at_key
  "zhong:at:#{self}"
end
disable() click to toggle source
# File lib/zhong/job.rb, line 98
def disable
  fire_callbacks(:before_disable, self)
  redis.set(disabled_key, "true")
  fire_callbacks(:after_disable, self)
end
disabled?() click to toggle source
# File lib/zhong/job.rb, line 110
def disabled?
  !redis.get(disabled_key).nil?
end
disabled_key() click to toggle source
# File lib/zhong/job.rb, line 136
def disabled_key
  "zhong:disabled:#{self}"
end
enable() click to toggle source
# File lib/zhong/job.rb, line 104
def enable
  fire_callbacks(:before_enable, self)
  redis.del(disabled_key)
  fire_callbacks(:after_enable, self)
end
last_ran_key() click to toggle source
# File lib/zhong/job.rb, line 128
def last_ran_key
  "zhong:last_ran:#{self}"
end
lock_key() click to toggle source
# File lib/zhong/job.rb, line 140
def lock_key
  "zhong:lock:#{self}"
end
next_at() click to toggle source
# File lib/zhong/job.rb, line 118
def next_at
  every_time = @every.next_at(@last_ran) if @last_ran && @every
  at_time = @at.next_at(Time.now) if @at
  [every_time, at_time, Time.now].compact.max || "now"
end
refresh_last_ran() click to toggle source
# File lib/zhong/job.rb, line 93
def refresh_last_ran
  last_ran_val = redis.get(last_ran_key)
  @last_ran = last_ran_val ? Time.at(last_ran_val.to_i) : nil
end
run(time = Time.now, error_handler = nil) click to toggle source
# File lib/zhong/job.rb, line 40
def run(time = Time.now, error_handler = nil)
  return unless run?(time)

  locked = false
  errored = false
  ran = false

  begin
    redis_lock.lock do
      locked = true
      @running = true

      refresh_last_ran

      # we need to check again, as another process might have acquired
      # the lock right before us and obviated our need to do anything
      break unless run?(time)

      if disabled?
        logger.info "not running, disabled: #{self}"
        break
      end

      logger.info "running: #{self}"

      if @block
        begin
          @block.call
          ran = true
        rescue => boom
          logger.error "#{self} failed: #{boom}"
          error_handler.call(boom, self) if error_handler
        end
      end

      ran!(time)
    end
  rescue Suo::LockClientError => boom
    logger.error "unable to run due to client error: #{boom}"
    errored = true
  end

  @running = false

  logger.info "unable to acquire exclusive run lock: #{self}" if !locked && !errored

  ran
end
run?(time = Time.now) click to toggle source
# File lib/zhong/job.rb, line 30
def run?(time = Time.now)
  if @first_run
    clear_last_ran_if_at_changed if @at
    refresh_last_ran
    @first_run = false
  end

  run_every?(time) && run_at?(time) && run_if?(time)
end
running?() click to toggle source
# File lib/zhong/job.rb, line 89
def running?
  @running
end
to_s() click to toggle source
# File lib/zhong/job.rb, line 114
def to_s
  @to_s ||= [@category, @name].compact.join(".").freeze
end

Private Instance Methods

clear_last_ran_if_at_changed() click to toggle source

if the @at value is changed across runs, the last_run becomes invalid so clear it

# File lib/zhong/job.rb, line 154
def clear_last_ran_if_at_changed
  previous_at_msgpack = redis.get(desired_at_key)

  if previous_at_msgpack
    previous_at = At.deserialize(previous_at_msgpack)

    if previous_at != @at
      logger.error "#{self} period changed (from #{previous_at} to #{@at}), clearing last run"
      clear
    end
  end

  redis.set(desired_at_key, @at.serialize)
end
fire_callbacks(event, *args) click to toggle source
# File lib/zhong/job.rb, line 146
def fire_callbacks(event, *args)
  @callbacks[event].to_a.map do |callback|
    callback.call(*args)
  end.compact.all? # do not skip on nils
end
ran!(time) click to toggle source
# File lib/zhong/job.rb, line 181
def ran!(time)
  @last_ran = time
  redis.set(last_ran_key, @last_ran.to_i)
end
redis_lock() click to toggle source
# File lib/zhong/job.rb, line 186
def redis_lock
  @lock ||= Suo::Client::Redis.new(lock_key, client: redis, stale_lock_expiration: @long_running_timeout)
end
run_at?(time) click to toggle source
# File lib/zhong/job.rb, line 173
def run_at?(time)
  !@at || @at.next_at(time) <= time
end
run_every?(time) click to toggle source
# File lib/zhong/job.rb, line 169
def run_every?(time)
  !@last_ran || !@every || @every.next_at(@last_ran) <= time
end
run_if?(time) click to toggle source
# File lib/zhong/job.rb, line 177
def run_if?(time)
  !@if || @if.call(time)
end