class Superbolt::Future::Runner::Requeue
Public Instance Methods
future_queue()
click to toggle source
# File lib/superbolt/future/runner/requeue.rb, line 40 def future_queue Superbolt::Queue.new(queue.name) end
process(message)
click to toggle source
# File lib/superbolt/future/runner/requeue.rb, line 12 def process(message) parsed_message = message.parse requeue(parsed_message) message.ack block.call(parsed_message, logger) if block sleep sleep_time rescue Exception => e on_error(message.parse, e) end
requeue(parsed_message)
click to toggle source
# File lib/superbolt/future/runner/requeue.rb, line 28 def requeue(parsed_message) time = parsed_message["future"] q = if Time.parse(time) > Time.now logger.info("Requeueing message to #{time}") future_queue else logger.info("Sending message #{parsed_message} to worker queue") worker_queue end q.push(parsed_message) end
sleep_time()
click to toggle source
# File lib/superbolt/future/runner/requeue.rb, line 24 def sleep_time 0.5 # should this be dynamic? end
subscribe()
click to toggle source
# File lib/superbolt/future/runner/requeue.rb, line 5 def subscribe queue.subscribe(ack: true, block: true) do |delivery_info, metadata, payload| message = Superbolt::IncomingMessage.new(delivery_info, payload, channel) process(message) end end
worker_queue()
click to toggle source
# File lib/superbolt/future/runner/requeue.rb, line 44 def worker_queue Superbolt::Queue.new(worker_queue_name) end
worker_queue_name()
click to toggle source
# File lib/superbolt/future/runner/requeue.rb, line 48 def worker_queue_name queue.name.sub(/\.future$/, '') end