class HerdstWorker::Queue::Processor
Attributes
app[RW]
attempt_threshold[RW]
enabled[RW]
ignored_notifications[RW]
job_count[RW]
max_jobs[RW]
poller[RW]
processor_status[RW]
queue_url[RW]
queue_wait_time[RW]
restart_time[RW]
start_time[RW]
visibility_timeout[RW]
Public Class Methods
new(app, enabled, queue_url, queue_wait_time)
click to toggle source
# File lib/herdst_worker/queue/processor.rb, line 17 def initialize(app, enabled, queue_url, queue_wait_time) self.app = app self.enabled = enabled self.queue_url = queue_url self.queue_wait_time = queue_wait_time self.poller = Aws::SQS::QueuePoller.new(queue_url) self.job_count = 0 self.max_jobs = 10 self.attempt_threshold = 6 self.visibility_timeout = 15 self.ignored_notifications = [ "AmazonSnsSubscriptionSucceeded" ] # Set the start time self.reset_time # Start the processor as working self.set_status "starting" # Log queue stats self.poller.before_request do |stats| before_request(stats) end end
Public Instance Methods
before_request(stats)
click to toggle source
# File lib/herdst_worker/queue/processor.rb, line 103 def before_request(stats) if self.app.config.is_dev? self.app.logger.queue_stats.info "STATS (#{self.processor_status}): #{stats.inspect}" end # After 1 hour of running terminate application. # The app will automatically restart in production current_time = Time.now.utc.to_i if (self.processor_status == "working") && (current_time >= self.restart_time) runtime = current_time - self.start_time self.app.logger.queue.info "Stopping after #{runtime} seconds of work" set_status "stopping" # On finishing wait for jobs to complete and then set status # to idle elsif self.processor_status == "finishing" if self.job_count == 0 self.app.logger.queue.info "Setting processor status to idle" set_status "idle" end # On stopping wait for jobs to complete and then set status # to stopped. Once stopped the polling will terminate. elsif self.processor_status == "stopping" if self.job_count == 0 self.app.logger.queue.info "Setting processor status to stopped" set_status "stopped" end end if self.processor_status == "stopped" self.app.logger.queue.info "Exiting program, Service requested to stop" throw :stop_polling end end
halt()
click to toggle source
Sets the processor status to finishing. The sqs before action will take care of setting the idle state once all jobs have finished.
# File lib/herdst_worker/queue/processor.rb, line 71 def halt return if self.processor_status === "finishing" set_status "finishing" end
process_message(msg)
click to toggle source
# File lib/herdst_worker/queue/processor.rb, line 141 def process_message(msg) if self.processor_status == "working" # If the app is already processing the max number of jobs # put the message back in the queue with a short wait time if self.job_count >= self.max_jobs self.poller.change_message_visibility_timeout(msg, self.visibility_timeout) throw :skip_delete end # Find out how many attempts there has been already for # the message. msg_attrs = msg.message_attributes.dup attempt_number = msg_attrs.include?("attempts") ? msg_attrs["attempts"]["string_value"].to_i + 1 : 1 will_fail_permanently = attempt_number > self.attempt_threshold # Run the job and increase the job count # Once successful the job count is decreased by one # and the message is deleted. # If an error occured the job count is decreased by # one and the error is logged locally and with sentry self.job_count += 1 message = JSON.parse(msg.body) process_message!(message, msg, will_fail_permanently).then { self.job_count -= 1 }.rescue { |ex| if will_fail_permanently self.app.logger.queue.error "Message failed #{attempt_number} times, Reporting and failing permanently. \n#{ex.to_s} \n#{ex.backtrace.join("\n")}" Raven.capture_exception(ex, { :level => "fatal", :extra => { "queue_attempts" => attempt_number, "queue_message_body" => msg.body } }) else self.app.logger.queue.error "Message failed #{attempt_number} times, Adding back to queue." if self.app.config.is_dev? puts ex.inspect puts ex.backtrace end replaced_message = { :queue_url => self.poller.queue_url, :message_body => msg.body, :delay_seconds => self.visibility_timeout, :message_attributes => msg_attrs.merge({ "attempts" => { :string_value => attempt_number.to_s, :data_type => "Number" } }) } self.poller.client.send_message replaced_message end if self.app.config.is_dev? self.app.logger.queue.error "Processor Error:" self.app.logger.queue.error ex.message self.app.logger.queue.error ex.backtrace end self.job_count -= 1 }.execute else self.poller.change_message_visibility_timeout(msg, self.visibility_timeout * 2) throw :skip_delete end end
set_status(status)
click to toggle source
Set the processor status. The status is alos logged to file so services like capastranio can see the current status
# File lib/herdst_worker/queue/processor.rb, line 87 def set_status(status) statuses = ["starting", "idle", "working", "finishing", "stopping", "stopped"] if statuses.include? status # Set status self.processor_status = status # Write the current status to file for capastranio to use process_file = self.app.config.paths.temp + "/process_status" File.open(process_file, "w") { |file| file.write(status) } else raise "Invalid status (#{status})" end end
start()
click to toggle source
Starts or resets the application to a working status
# File lib/herdst_worker/queue/processor.rb, line 55 def start if self.processor_status == "starting" self.set_status "working" self.reset_time self.start_poller else return if self.processor_status == "working" self.set_status "working" self.reset_time end end
start_poller()
click to toggle source
Runs the poller
# File lib/herdst_worker/queue/processor.rb, line 45 def start_poller if self.enabled self.poller.poll(:wait_time_seconds => self.queue_wait_time, :skip_delete => false) do |msg| process_message(msg) end end end
stop()
click to toggle source
Sets the processor status to stopping. The sqs before action will take care of stopping the application once all jobs have finished.
# File lib/herdst_worker/queue/processor.rb, line 79 def stop return if self.processor_status == "stopping" set_status "stopping" end
Private Instance Methods
reset_time()
click to toggle source
# File lib/herdst_worker/queue/processor.rb, line 216 def reset_time self.start_time = Time.now.utc.to_i self.restart_time = self.start_time + (3600) # One hour end