class EventQ::Amazon::QueueWorkerV2
Constants
- APPROXIMATE_RECEIVE_COUNT
- MESSAGE
Attributes
is_running[RW]
Public Class Methods
new()
click to toggle source
# File lib/eventq_aws/aws_queue_worker_v2.rb, line 13 def initialize @forks = [] @is_running = false @on_retry_exceeded_block = nil @on_retry_block = nil @on_error_block = nil @hash_helper = HashKit::Helper.new @serialization_provider_manager = EventQ::SerializationProviders::Manager.new @signature_provider_manager = EventQ::SignatureProviders::Manager.new @queue_poll_wait = 10 end
Public Instance Methods
call_on_error_block(error:, message: nil)
click to toggle source
# File lib/eventq_aws/aws_queue_worker_v2.rb, line 106 def call_on_error_block(error:, message: nil) if @on_error_block EventQ.logger.debug { "[#{self.class}] - Executing on_error block." } begin @on_error_block.call(error, message) rescue => e EventQ.logger.error("[#{self.class}] - An error occurred executing the on_error block. Error: #{e}") end else EventQ.logger.debug { "[#{self.class}] - No on_error block specified to execute." } end end
call_on_retry_block(message)
click to toggle source
# File lib/eventq_aws/aws_queue_worker_v2.rb, line 132 def call_on_retry_block(message) if @on_retry_block EventQ.logger.debug { "[#{self.class}] - Executing on_retry block." } begin @on_retry_block.call(message, abort) rescue => e EventQ.logger.error("[#{self.class}] - An error occurred executing the on_retry block. Error: #{e}") end else EventQ.logger.debug { "[#{self.class}] - No on_retry block specified." } end end
call_on_retry_exceeded_block(message)
click to toggle source
# File lib/eventq_aws/aws_queue_worker_v2.rb, line 119 def call_on_retry_exceeded_block(message) if @on_retry_exceeded_block != nil EventQ.logger.debug { "[#{self.class}] - Executing on_retry_exceeded block." } begin @on_retry_exceeded_block.call(message) rescue => e EventQ.logger.error("[#{self.class}] - An error occurred executing the on_retry_exceeded block. Error: #{e}") end else EventQ.logger.debug { "[#{self.class}] - No on_retry_exceeded block specified." } end end
deserialize_message(payload)
click to toggle source
# File lib/eventq_aws/aws_queue_worker_v2.rb, line 169 def deserialize_message(payload) provider = @serialization_provider_manager.get_provider(EventQ::Configuration.serialization_provider) return provider.deserialize(payload) end
on_error(&block)
click to toggle source
# File lib/eventq_aws/aws_queue_worker_v2.rb, line 160 def on_error(&block) @on_error_block = block return nil end
on_retry(&block)
click to toggle source
# File lib/eventq_aws/aws_queue_worker_v2.rb, line 155 def on_retry(&block) @on_retry_block = block return nil end
on_retry_exceeded(&block)
click to toggle source
# File lib/eventq_aws/aws_queue_worker_v2.rb, line 151 def on_retry_exceeded(&block) @retry_exceeded_block = block end
running?()
click to toggle source
# File lib/eventq_aws/aws_queue_worker_v2.rb, line 165 def running? return @is_running end
serialize_message(msg)
click to toggle source
# File lib/eventq_aws/aws_queue_worker_v2.rb, line 174 def serialize_message(msg) provider = @serialization_provider_manager.get_provider(EventQ::Configuration.serialization_provider) return provider.serialize(msg) end
start(queue, options = {}, &block)
click to toggle source
# File lib/eventq_aws/aws_queue_worker_v2.rb, line 28 def start(queue, options = {}, &block) EventQ.logger.info("[#{self.class}] - Preparing to start listening for messages.") configure(queue, options) if options[:client] == nil raise "[#{self.class}] - :client (QueueClient) must be specified." end raise "[#{self.class}] - Worker is already running." if running? client = options[:client] EventQ.logger.debug do "[#{self.class} #start] - Listening for messages on queue: #{queue.name}, Queue Url: #{client.get_queue_url(queue)}, Queue arn: #{client.get_queue_arn(queue)}" end EventQ.logger.info("[#{self.class}] - Listening for messages.") @forks = [] if @fork_count > 1 Thread.new do @fork_count.times do pid = fork do start_process(options, queue, block) end @forks.push(pid) end @forks.each { |pid| Process.wait(pid) } end else start_process(options, queue, block) end return true end
start_process(options, queue, block)
click to toggle source
# File lib/eventq_aws/aws_queue_worker_v2.rb, line 66 def start_process(options, queue, block) %w'INT TERM'.each do |sig| Signal.trap(sig) { stop exit } end @is_running = true Thread.new do client = options[:client] manager = EventQ::Amazon::QueueManager.new({ client: client }) queue_url = manager.get_queue(queue) poller = Aws::SQS::QueuePoller.new(queue_url, attribute_names: [APPROXIMATE_RECEIVE_COUNT]) poller.poll(skip_delete: true) do |msg, stats| begin tag_processing_thread process_message(msg, poller, queue, block) rescue => e EventQ.logger.error do "[#{self.class}] - An unhandled error occurred. Error: #{e} | Backtrace: #{e.backtrace}" end call_on_error_block(error: e) ensure untag_processing_thread end end end if (options.key?(:wait) && options[:wait] == true) || (options.key?(:fork_count) && options[:fork_count] > 1) while running? do sleep 5 end end end
stop()
click to toggle source
# File lib/eventq_aws/aws_queue_worker_v2.rb, line 145 def stop EventQ.logger.info("[#{self.class}] - Stopping.") @is_running = false return true end
Private Instance Methods
configure(queue, options = {})
click to toggle source
# File lib/eventq_aws/aws_queue_worker_v2.rb, line 276 def configure(queue, options = {}) @queue = queue if options.key?(:thread_count) EventQ.logger.warn("[#{self.class}] - :thread_count is deprecated.") end if options.key?(:sleep) EventQ.logger.warn("[#{self.class}] - :sleep is deprecated.") end @fork_count = 1 if options.key?(:fork_count) @fork_count = options[:fork_count] end EventQ.logger.info("[#{self.class}] - Configuring. Process Count: #{@fork_count}.") return true end
process_message(msg, poller, queue, block)
click to toggle source
# File lib/eventq_aws/aws_queue_worker_v2.rb, line 181 def process_message(msg, poller, queue, block) retry_attempts = msg.attributes[APPROXIMATE_RECEIVE_COUNT].to_i - 1 # deserialize the message payload payload = JSON.load(msg.body) message = deserialize_message(payload[MESSAGE]) message_args = EventQ::MessageArgs.new( type: message.type, retry_attempts: retry_attempts, context: message.context, content_type: message.content_type, id: message.id, sent: message.created ) EventQ.logger.info("[#{self.class}] - Message received. Retry Attempts: #{retry_attempts}") @signature_provider_manager.validate_signature(message: message, queue: queue) if(!EventQ::NonceManager.is_allowed?(message.id)) EventQ.logger.info("[#{self.class}] - Duplicate Message received. Ignoring message.") return false end # begin worker block for queue message begin block.call(message.content, message_args) if message_args.abort == true EventQ.logger.info("[#{self.class}] - Message aborted.") else # accept the message as processed poller.delete_message(msg) EventQ.logger.info("[#{self.class}] - Message acknowledged.") end rescue => e EventQ.logger.error("[#{self.class}] - An unhandled error happened while attempting to process a queue message. Error: #{e} | Backtrace: #{e.backtrace}") error = true call_on_error_block(error: e, message: message) end if message_args.abort || error EventQ::NonceManager.failed(message.id) reject_message(queue, poller, msg, retry_attempts, message, message_args.abort) else EventQ::NonceManager.complete(message.id) end return true end
reject_message(queue, poller, msg, retry_attempts, message, abort)
click to toggle source
# File lib/eventq_aws/aws_queue_worker_v2.rb, line 235 def reject_message(queue, poller, msg, retry_attempts, message, abort) if !queue.allow_retry || retry_attempts >= queue.max_retry_attempts EventQ.logger.info("[#{self.class}] - Message rejected removing from queue. Message: #{serialize_message(message)}") # remove the message from the queue so that it does not get retried again poller.delete_message(msg) if retry_attempts >= queue.max_retry_attempts EventQ.logger.info("[#{self.class}] - Message retry attempt limit exceeded.") call_on_retry_exceeded_block(message) end elsif queue.allow_retry retry_attempts += 1 EventQ.logger.info("[#{self.class}] - Message rejected requesting retry. Attempts: #{retry_attempts}") if queue.allow_retry_back_off == true EventQ.logger.debug { "[#{self.class}] - Calculating message back off retry delay. Attempts: #{retry_attempts} * Delay: #{queue.retry_delay}" } visibility_timeout = (queue.retry_delay * retry_attempts) / 1000 if visibility_timeout > (queue.max_retry_delay / 1000) EventQ.logger.debug { "[#{self.class}] - Max message back off retry delay reached." } visibility_timeout = queue.max_retry_delay / 1000 end else EventQ.logger.debug { "[#{self.class}] - Setting fixed retry delay for message." } visibility_timeout = queue.retry_delay / 1000 end if visibility_timeout > 43200 EventQ.logger.debug { "[#{self.class}] - AWS max visibility timeout of 12 hours has been exceeded. Setting message retry delay to 12 hours." } visibility_timeout = 43200 end EventQ.logger.debug { "[#{self.class}] - Sending message for retry. Message TTL: #{visibility_timeout}" } poller.change_message_visibility_timeout(msg, visibility_timeout) call_on_retry_block(message) end end