class Tennis::Backend::Redis
Public Class Methods
new(logger:, url:, namespace: "tennis")
click to toggle source
Calls superclass method
# File lib/tennis/backend/redis.rb, line 15 def initialize(logger:, url:, namespace: "tennis") super(logger: logger) @redis_url = url @redis_namespace = namespace end
Public Instance Methods
ack(_)
click to toggle source
# File lib/tennis/backend/redis.rb, line 36 def ack(_) # Nothing to do here end
enqueue(job:, method:, args:, delay: nil)
click to toggle source
Delayed jobs are not yet supported with Redis
backend
# File lib/tennis/backend/redis.rb, line 22 def enqueue(job:, method:, args:, delay: nil) meta = { "enqueued_at" => Time.now.to_i } task = Task.new(self, generate_task_id, job, method, args, meta) client_push(task) end
receive(job_classes:, timeout: 1)
click to toggle source
# File lib/tennis/backend/redis.rb, line 28 def receive(job_classes:, timeout: 1) unordered_queues = queues(job_classes).shuffle serialized_task = timeout && timeout >= 1 ? client_pop_with_timeout(unordered_queues, timeout) : client_pop(unordered_queues) serialized_task && deserialize_task(serialized_task) end
requeue(task)
click to toggle source
# File lib/tennis/backend/redis.rb, line 40 def requeue(task) (task.meta["requeued_at"] ||= []) << Time.now.to_i client_push(task) end
Private Instance Methods
client()
click to toggle source
# File lib/tennis/backend/redis.rb, line 47 def client @client ||= begin redis = ::Redis.new(url: @redis_url) ::Redis::Namespace.new(@redis_namespace, redis: redis) end end
client_pop(unordered_queues)
click to toggle source
# File lib/tennis/backend/redis.rb, line 65 def client_pop(unordered_queues) script_lines = ["local element = nil"] unordered_queues.each do |queue_name| script_lines << <<-LUA element = redis.call("RPOP", "#{@redis_namespace}:#{queue_name}") if element ~= nil then return element end LUA end client.eval(script_lines.join("\n")) end
client_pop_with_timeout(unordered_queues, timeout)
click to toggle source
# File lib/tennis/backend/redis.rb, line 59 def client_pop_with_timeout(unordered_queues, timeout) unordered_queues << timeout.to_i _, serialized_task = client.brpop(*unordered_queues) serialized_task end
client_push(task)
click to toggle source
# File lib/tennis/backend/redis.rb, line 54 def client_push(task) serialized_task = serialize(task) client.lpush(queue_name(task.job.class), serialized_task) end
deserialize_task(serialized_task)
click to toggle source
# File lib/tennis/backend/redis.rb, line 88 def deserialize_task(serialized_task) hash = Serializer.new.load(serialized_task) Task.new(self, hash["id"], hash["job"], hash["method"], hash["args"], hash["meta"]) end
generate_task_id()
click to toggle source
# File lib/tennis/backend/redis.rb, line 93 def generate_task_id SecureRandom.hex(10) end
queue_name(klass)
click to toggle source
# File lib/tennis/backend/redis.rb, line 102 def queue_name(klass) @queue_names ||= {} @queue_names[klass] ||= "queue:#{klass.name.gsub("::", "-").downcase}" end
queues(job_classes)
click to toggle source
# File lib/tennis/backend/redis.rb, line 97 def queues(job_classes) @queues ||= {} @queues[job_classes] ||= job_classes.map { |klass| queue_name(klass) } end
serialize(task)
click to toggle source
# File lib/tennis/backend/redis.rb, line 78 def serialize(task) Serializer.new.dump({ "id" => task.task_id, "job" => task.job, "method" => task.method, "args" => task.args, "meta" => task.meta, }) end