class Unrestful::AsyncJob

Constants

ALLOCATED
CHANNEL_TIMEOUT
FAILED
KEY_LENGTH
KEY_TIMEOUT
RUNNING
SUCCESS

Attributes

job_id[R]

Public Class Methods

new(job_id: nil) click to toggle source
# File lib/unrestful/async_job.rb, line 27
def initialize(job_id: nil)
  if job_id.nil?
    @job_id = SecureRandom.hex(KEY_LENGTH)
  else
    @job_id = job_id
  end
end

Public Instance Methods

attributes() click to toggle source
# File lib/unrestful/async_job.rb, line 18
def attributes
  {
    job_id: job_id,
    state: state,
    last_message: last_message,
    ttl: ttl
  }
end
close() click to toggle source
# File lib/unrestful/async_job.rb, line 90
def close
  redis.unsubscribe(job_channel) if redis.subscribed?
ensure
  @redis.quit
end
delete() click to toggle source
# File lib/unrestful/async_job.rb, line 59
def delete
  redis.del(job_key)
  redis.del(job_message)
end
last_message() click to toggle source
# File lib/unrestful/async_job.rb, line 55
def last_message
  redis.get(job_message)
end
publish(message) click to toggle source
# File lib/unrestful/async_job.rb, line 70
def publish(message)
  raise AsyncError, "job #{job_key} doesn't exist" unless valid?

  redis.publish(job_channel, message)
end
redis() click to toggle source
# File lib/unrestful/async_job.rb, line 86
def redis
  @redis ||= Redis.new(url: Unrestful.configuration.redis_address)
end
state() click to toggle source
# File lib/unrestful/async_job.rb, line 51
def state
  redis.get(job_key)
end
subscribe(timeout: CHANNEL_TIMEOUT, &block) click to toggle source
# File lib/unrestful/async_job.rb, line 64
def subscribe(timeout: CHANNEL_TIMEOUT, &block)
  raise AsyncError, "job #{job_key} doesn't exist" unless valid?

  redis.subscribe_with_timeout(timeout, job_channel, &block)
end
ttl() click to toggle source
# File lib/unrestful/async_job.rb, line 47
def ttl
  redis.ttl(job_key)
end
unsubscribe() click to toggle source
# File lib/unrestful/async_job.rb, line 80
def unsubscribe
  redis.unsubscribe(job_channel)
rescue
  # ignore unsub errors
end
update(state, message: '') click to toggle source
# File lib/unrestful/async_job.rb, line 35
def update(state, message: '')
  raise ArgumentError, 'failed states must have a message' if message.blank? && state == FAILED

  redis.set(job_key, state)
  redis.set(job_message, message) unless message.blank?

  if state == ALLOCATED
    redis.expire(job_key, KEY_TIMEOUT)
    redis.expire(job_message, KEY_TIMEOUT)
  end
end
valid?() click to toggle source
# File lib/unrestful/async_job.rb, line 76
def valid?
  redis.exists(job_key)
end

Private Instance Methods

job_channel() click to toggle source
# File lib/unrestful/async_job.rb, line 102
def job_channel
  "unrestful:job:channel:#{@job_id}"
end
job_key() click to toggle source
# File lib/unrestful/async_job.rb, line 98
def job_key
  "unrestful:job:state:#{@job_id}"
end
job_message() click to toggle source
# File lib/unrestful/async_job.rb, line 106
def job_message
  "unrestful:job:message:#{@job_id}"
end