class Superbolt::Future::Runner::Requeue

Public Instance Methods

future_queue() click to toggle source
# File lib/superbolt/future/runner/requeue.rb, line 40
def future_queue
  Superbolt::Queue.new(queue.name)
end
process(message) click to toggle source
# File lib/superbolt/future/runner/requeue.rb, line 12
def process(message)
  parsed_message = message.parse
  requeue(parsed_message)

  message.ack

  block.call(parsed_message, logger) if block
  sleep sleep_time
rescue Exception => e
  on_error(message.parse, e)
end
requeue(parsed_message) click to toggle source
# File lib/superbolt/future/runner/requeue.rb, line 28
def requeue(parsed_message)
  time = parsed_message["future"]
  q = if Time.parse(time) > Time.now
    logger.info("Requeueing message to #{time}")
    future_queue
  else
    logger.info("Sending message #{parsed_message} to worker queue")
    worker_queue
  end
  q.push(parsed_message)
end
sleep_time() click to toggle source
# File lib/superbolt/future/runner/requeue.rb, line 24
def sleep_time
  0.5 # should this be dynamic?
end
subscribe() click to toggle source
# File lib/superbolt/future/runner/requeue.rb, line 5
def subscribe
  queue.subscribe(ack: true, block: true) do |delivery_info, metadata, payload|
    message = Superbolt::IncomingMessage.new(delivery_info, payload, channel)
    process(message)
  end
end
worker_queue() click to toggle source
# File lib/superbolt/future/runner/requeue.rb, line 44
def worker_queue
  Superbolt::Queue.new(worker_queue_name)
end
worker_queue_name() click to toggle source
# File lib/superbolt/future/runner/requeue.rb, line 48
def worker_queue_name
  queue.name.sub(/\.future$/, '')
end