class Propono::QueueListener
Attributes
aws_client[R]
message_processor[R]
propono_config[R]
topic_name[R]
visibility_timeout[R]
Public Class Methods
drain(*args, &message_processor)
click to toggle source
# File lib/propono/services/queue_listener.rb, line 8 def self.drain(*args, &message_processor) new(*args, &message_processor).drain end
listen(*args, &message_processor)
click to toggle source
# File lib/propono/services/queue_listener.rb, line 4 def self.listen(*args, &message_processor) new(*args, &message_processor).listen end
new(aws_client, propono_config, topic_name, options = {}, &message_processor)
click to toggle source
# File lib/propono/services/queue_listener.rb, line 13 def initialize(aws_client, propono_config, topic_name, options = {}, &message_processor) @aws_client = aws_client @propono_config = propono_config @topic_name = topic_name @message_processor = message_processor @visibility_timeout = options[:visibility_timeout] || nil end
Public Instance Methods
drain()
click to toggle source
# File lib/propono/services/queue_listener.rb, line 28 def drain raise ProponoError.new("topic_name is nil") unless topic_name true while read_messages_from_queue(main_queue, 10, long_poll: false) true while read_messages_from_queue(slow_queue, 10, long_poll: false) if propono_config.slow_queue_enabled end
listen()
click to toggle source
# File lib/propono/services/queue_listener.rb, line 21 def listen raise ProponoError.new("topic_name is nil") unless topic_name loop do read_messages end end
Private Instance Methods
corrupt_queue()
click to toggle source
# File lib/propono/services/queue_listener.rb, line 113 def corrupt_queue @corrupt_queue ||= subscription.corrupt_queue end
delete_message(raw_sqs_message, queue)
click to toggle source
# File lib/propono/services/queue_listener.rb, line 101 def delete_message(raw_sqs_message, queue) aws_client.delete_from_sqs(queue, raw_sqs_message.receipt_handle) end
failed_queue()
click to toggle source
# File lib/propono/services/queue_listener.rb, line 109 def failed_queue @failed_queue ||= subscription.failed_queue end
handle(sqs_message)
click to toggle source
# File lib/propono/services/queue_listener.rb, line 79 def handle(sqs_message) process_message(sqs_message) rescue => e propono_config.logger.error("Failed to handle message #{e.message} #{e.backtrace}") requeue_message_on_failure(sqs_message, e) end
main_queue()
click to toggle source
# File lib/propono/services/queue_listener.rb, line 105 def main_queue @main_queue ||= subscription.queue end
move_to_corrupt_queue(raw_sqs_message)
click to toggle source
# File lib/propono/services/queue_listener.rb, line 91 def move_to_corrupt_queue(raw_sqs_message) aws_client.send_to_sqs(corrupt_queue, raw_sqs_message.body) end
parse(raw_sqs_message, queue)
click to toggle source
# File lib/propono/services/queue_listener.rb, line 70 def parse(raw_sqs_message, queue) SqsMessage.new(raw_sqs_message) rescue propono_config.logger.error "Error parsing message, moving to corrupt queue", $!, $!.backtrace move_to_corrupt_queue(raw_sqs_message) aws_client.delete_from_sqs(queue, raw_sqs_message.receipt_handle) nil end
process_message(sqs_message)
click to toggle source
# File lib/propono/services/queue_listener.rb, line 86 def process_message(sqs_message) return false unless message_processor message_processor.call(sqs_message.message, sqs_message.context) end
process_raw_message(raw_sqs_message, queue)
click to toggle source
The calls to delete_message
are deliberately duplicated so as to ensure the message is only deleted if the preceeding line has completed succesfully. We do not want to ensure that the message is deleted regardless of what happens in this method.
# File lib/propono/services/queue_listener.rb, line 61 def process_raw_message(raw_sqs_message, queue) sqs_message = parse(raw_sqs_message, queue) return unless sqs_message propono_config.logger.info "Propono [#{sqs_message.context[:id]}]: Received from sqs." handle(sqs_message) aws_client.delete_from_sqs(queue, sqs_message.receipt_handle) end
read_messages()
click to toggle source
# File lib/propono/services/queue_listener.rb, line 36 def read_messages read_messages_from_queue(main_queue, propono_config.num_messages_per_poll) || (propono_config.slow_queue_enabled ? read_messages_from_queue(slow_queue, 1) : nil) end
read_messages_from_queue(queue, num_messages, long_poll: true)
click to toggle source
# File lib/propono/services/queue_listener.rb, line 41 def read_messages_from_queue(queue, num_messages, long_poll: true) messages = aws_client.read_from_sqs(queue, num_messages, long_poll: long_poll, visibility_timeout: visibility_timeout) if messages.empty? false else messages.each { |msg| process_raw_message(msg, queue) } true end rescue => e #Aws::Errors => e propono_config.logger.error "Unexpected error reading from queue #{queue.url}" propono_config.logger.error e.class.name propono_config.logger.error e.message propono_config.logger.error e.backtrace false end
requeue_message_on_failure(sqs_message, exception)
click to toggle source
# File lib/propono/services/queue_listener.rb, line 95 def requeue_message_on_failure(sqs_message, exception) next_queue = (sqs_message.failure_count < propono_config.max_retries) ? main_queue : failed_queue propono_config.logger.error "Error processing message, moving to queue: #{next_queue}" aws_client.send_to_sqs(next_queue, sqs_message.to_json_with_exception(exception)) end
slow_queue()
click to toggle source
# File lib/propono/services/queue_listener.rb, line 117 def slow_queue @slow_queue ||= subscription.slow_queue end
subscription()
click to toggle source
# File lib/propono/services/queue_listener.rb, line 121 def subscription QueueSubscription.create(aws_client, propono_config, topic_name) end