class Tennis::Backend::Rabbit
Public Class Methods
new(logger:, url:, namespace: "tennis")
click to toggle source
Calls superclass method
# File lib/tennis/backend/rabbit.rb, line 15 def initialize(logger:, url:, namespace: "tennis") super(logger: logger) @rabbit_url = url @rabbit_namespace = namespace @payload_queue = RabbitQueue.new @job_classes = nil @setup = false end
Public Instance Methods
ack(task)
click to toggle source
# File lib/tennis/backend/rabbit.rb, line 38 def ack(task) delivery_info = task.meta["_backend"]["delivery_info"] channel.acknowledge(delivery_info.delivery_tag, false) end
enqueue(job:, method:, args:, delay: nil)
click to toggle source
Delayed jobs are not yet supported with Rabbit
backend
# File lib/tennis/backend/rabbit.rb, line 25 def enqueue(job:, method:, args:, delay: nil) queue(job.class) meta = { "enqueued_at" => Time.now.to_i } task = Task.new(self, generate_task_id, job, method, args, meta) exchange.publish(serialize(task), routing_key: routing_key(job.class)) end
receive(job_classes:, timeout: 1.0)
click to toggle source
# File lib/tennis/backend/rabbit.rb, line 32 def receive(job_classes:, timeout: 1.0) setup(job_classes) payload = @payload_queue.pop(timeout) payload && deserialize_task(*payload) end
requeue(task)
click to toggle source
# File lib/tennis/backend/rabbit.rb, line 43 def requeue(task) delivery_info = task.meta["_backend"]["delivery_info"] channel.reject(delivery_info.delivery_tag, true) end
reset()
click to toggle source
Reset the backend:
-
stop getting messages from the queues
-
requeue unacked messages
# File lib/tennis/backend/rabbit.rb, line 51 def reset return unless @setup @consumers.each(&:cancel) @job_classes = nil @setup = false end
Private Instance Methods
channel()
click to toggle source
# File lib/tennis/backend/rabbit.rb, line 124 def channel @channel ||= client.create_channel.tap do |chan| chan.prefetch(10) end end
client()
click to toggle source
# File lib/tennis/backend/rabbit.rb, line 130 def client @client ||= ::Bunny.new(@rabbit_url).tap(&:start) end
deserialize_task(delivery_info, properties, serialized_task)
click to toggle source
# File lib/tennis/backend/rabbit.rb, line 86 def deserialize_task(delivery_info, properties, serialized_task) hash = Serializer.new.load(serialized_task) hash["meta"].merge!("_backend" => { "delivery_info" => delivery_info, "properties" => properties, }) Task.new(self, hash["id"], hash["job"], hash["method"], hash["args"], hash["meta"]) end
exchange()
click to toggle source
# File lib/tennis/backend/rabbit.rb, line 120 def exchange @echange ||= channel.topic(@rabbit_namespace, durable: true) end
generate_task_id()
click to toggle source
# File lib/tennis/backend/rabbit.rb, line 95 def generate_task_id SecureRandom.hex(10) end
queue(klass)
click to toggle source
# File lib/tennis/backend/rabbit.rb, line 99 def queue(klass) @queue ||= {} @queue[klass] ||= channel.queue(queue_name(klass), durable: true).tap do |q| q.bind(exchange, routing_key: routing_key(klass)) end end
queue_name(klass)
click to toggle source
# File lib/tennis/backend/rabbit.rb, line 107 def queue_name(klass) @queue_name ||= {} @queue_name[klass] ||= "%{namespace}:queue:%{klass_name}" % { namespace: @rabbit_namespace, klass_name: routing_key(klass), } end
routing_key(klass)
click to toggle source
# File lib/tennis/backend/rabbit.rb, line 115 def routing_key(klass) @routing_key ||= {} @routing_key[klass] ||= klass.name.gsub("::", "-").downcase end
serialize(task)
click to toggle source
# File lib/tennis/backend/rabbit.rb, line 74 def serialize(task) filtered_meta = task.meta.dup filtered_meta.delete("_backend") Serializer.new.dump({ "id" => task.task_id, "job" => task.job, "method" => task.method, "args" => task.args, "meta" => filtered_meta, }) end
setup(job_classes)
click to toggle source
# File lib/tennis/backend/rabbit.rb, line 60 def setup(job_classes) return if @job_classes == job_classes raise "The Rabbit backend can't dynamically update its job_classes" if @setup @job_classes = job_classes @consumers = job_classes.map { |klass| subscribe(klass) } @setup = true end
subscribe(klass)
click to toggle source
# File lib/tennis/backend/rabbit.rb, line 68 def subscribe(klass) queue(klass).subscribe(manual_ack: true) do |delivery_info, properties, payload| @payload_queue << [delivery_info, properties, payload] end end