class Unrestful::AsyncJob
Constants
- ALLOCATED
- CHANNEL_TIMEOUT
- FAILED
- KEY_LENGTH
- KEY_TIMEOUT
- RUNNING
- SUCCESS
Attributes
job_id[R]
Public Class Methods
new(job_id: nil)
click to toggle source
# File lib/unrestful/async_job.rb, line 27 def initialize(job_id: nil) if job_id.nil? @job_id = SecureRandom.hex(KEY_LENGTH) else @job_id = job_id end end
Public Instance Methods
attributes()
click to toggle source
# File lib/unrestful/async_job.rb, line 18 def attributes { job_id: job_id, state: state, last_message: last_message, ttl: ttl } end
close()
click to toggle source
# File lib/unrestful/async_job.rb, line 90 def close redis.unsubscribe(job_channel) if redis.subscribed? ensure @redis.quit end
delete()
click to toggle source
# File lib/unrestful/async_job.rb, line 59 def delete redis.del(job_key) redis.del(job_message) end
last_message()
click to toggle source
# File lib/unrestful/async_job.rb, line 55 def last_message redis.get(job_message) end
publish(message)
click to toggle source
# File lib/unrestful/async_job.rb, line 70 def publish(message) raise AsyncError, "job #{job_key} doesn't exist" unless valid? redis.publish(job_channel, message) end
redis()
click to toggle source
# File lib/unrestful/async_job.rb, line 86 def redis @redis ||= Redis.new(url: Unrestful.configuration.redis_address) end
state()
click to toggle source
# File lib/unrestful/async_job.rb, line 51 def state redis.get(job_key) end
subscribe(timeout: CHANNEL_TIMEOUT, &block)
click to toggle source
# File lib/unrestful/async_job.rb, line 64 def subscribe(timeout: CHANNEL_TIMEOUT, &block) raise AsyncError, "job #{job_key} doesn't exist" unless valid? redis.subscribe_with_timeout(timeout, job_channel, &block) end
ttl()
click to toggle source
# File lib/unrestful/async_job.rb, line 47 def ttl redis.ttl(job_key) end
unsubscribe()
click to toggle source
# File lib/unrestful/async_job.rb, line 80 def unsubscribe redis.unsubscribe(job_channel) rescue # ignore unsub errors end
update(state, message: '')
click to toggle source
# File lib/unrestful/async_job.rb, line 35 def update(state, message: '') raise ArgumentError, 'failed states must have a message' if message.blank? && state == FAILED redis.set(job_key, state) redis.set(job_message, message) unless message.blank? if state == ALLOCATED redis.expire(job_key, KEY_TIMEOUT) redis.expire(job_message, KEY_TIMEOUT) end end
valid?()
click to toggle source
# File lib/unrestful/async_job.rb, line 76 def valid? redis.exists(job_key) end
Private Instance Methods
job_channel()
click to toggle source
# File lib/unrestful/async_job.rb, line 102 def job_channel "unrestful:job:channel:#{@job_id}" end
job_key()
click to toggle source
# File lib/unrestful/async_job.rb, line 98 def job_key "unrestful:job:state:#{@job_id}" end
job_message()
click to toggle source
# File lib/unrestful/async_job.rb, line 106 def job_message "unrestful:job:message:#{@job_id}" end