class HerdstWorker::Queue::Processor

Attributes

app[RW]
attempt_threshold[RW]
enabled[RW]
ignored_notifications[RW]
job_count[RW]
max_jobs[RW]
poller[RW]
processor_status[RW]
queue_url[RW]
queue_wait_time[RW]
restart_time[RW]
start_time[RW]
visibility_timeout[RW]

Public Class Methods

new(app, enabled, queue_url, queue_wait_time) click to toggle source
# File lib/herdst_worker/queue/processor.rb, line 17
def initialize(app, enabled, queue_url, queue_wait_time)
    self.app = app
    self.enabled = enabled
    self.queue_url = queue_url
    self.queue_wait_time = queue_wait_time
    self.poller = Aws::SQS::QueuePoller.new(queue_url)
    self.job_count = 0
    self.max_jobs = 10
    self.attempt_threshold = 6
    self.visibility_timeout = 15
    self.ignored_notifications = [
        "AmazonSnsSubscriptionSucceeded"
    ]
    
    # Set the start time
    self.reset_time
    
    # Start the processor as working
    self.set_status "starting"

    # Log queue stats
    self.poller.before_request do |stats|
        before_request(stats)
    end
end

Public Instance Methods

before_request(stats) click to toggle source
# File lib/herdst_worker/queue/processor.rb, line 103
def before_request(stats)
    if self.app.config.is_dev?
        self.app.logger.queue_stats.info "STATS (#{self.processor_status}): #{stats.inspect}"
    end
    
    # After 1 hour of running terminate application.
    # The app will automatically restart in production
    current_time = Time.now.utc.to_i
    if (self.processor_status == "working") && (current_time >= self.restart_time)
        runtime = current_time - self.start_time
        self.app.logger.queue.info "Stopping after #{runtime} seconds of work"
        set_status "stopping"
    
    # On finishing wait for jobs to complete and then set status
    # to idle
    elsif self.processor_status == "finishing"
        if self.job_count == 0
            self.app.logger.queue.info "Setting processor status to idle"
            set_status "idle"
        end
    
    # On stopping wait for jobs to complete and then set status
    # to stopped. Once stopped the polling will terminate.
    elsif self.processor_status == "stopping"
        if self.job_count == 0
            self.app.logger.queue.info "Setting processor status to stopped"
            set_status "stopped"
        end
    
    end
    
    if self.processor_status == "stopped"
        self.app.logger.queue.info "Exiting program, Service requested to stop"
        throw :stop_polling
    end
end
halt() click to toggle source

Sets the processor status to finishing. The sqs before action will take care of setting the idle state once all jobs have finished.

# File lib/herdst_worker/queue/processor.rb, line 71
def halt
    return if self.processor_status === "finishing"
    set_status "finishing"
end
process_message(msg) click to toggle source
# File lib/herdst_worker/queue/processor.rb, line 141
def process_message(msg)
    if self.processor_status == "working"
        # If the app is already processing the max number of jobs
        # put the message back in the queue with a short wait time
        if self.job_count >= self.max_jobs
            self.poller.change_message_visibility_timeout(msg, self.visibility_timeout)
            throw :skip_delete
        end
        
        # Find out how many attempts there has been already for
        # the message.
        msg_attrs = msg.message_attributes.dup
        attempt_number = msg_attrs.include?("attempts") ? msg_attrs["attempts"]["string_value"].to_i + 1 : 1
        will_fail_permanently = attempt_number > self.attempt_threshold
        
        # Run the job and increase the job count
        # Once successful the job count is decreased by one
        # and the message is deleted.
        # If an error occured the job count is decreased by
        # one and the error is logged locally and with sentry
        self.job_count += 1
        message = JSON.parse(msg.body)
        process_message!(message, msg, will_fail_permanently).then {
            self.job_count -= 1
            
        }.rescue { |ex|
            if will_fail_permanently
                self.app.logger.queue.error "Message failed #{attempt_number} times, Reporting and failing permanently. \n#{ex.to_s} \n#{ex.backtrace.join("\n")}"
                Raven.capture_exception(ex, {
                    :level => "fatal",
                    :extra => {
                        "queue_attempts" => attempt_number,
                        "queue_message_body" => msg.body
                    }
                })

            else
                self.app.logger.queue.error "Message failed #{attempt_number} times, Adding back to queue."
                
                if self.app.config.is_dev?
                    puts ex.inspect
                    puts ex.backtrace
                end
                
                replaced_message = {
                    :queue_url => self.poller.queue_url,
                    :message_body => msg.body,
                    :delay_seconds => self.visibility_timeout,
                    :message_attributes => msg_attrs.merge({
                        "attempts" => {
                            :string_value => attempt_number.to_s,
                            :data_type => "Number"
                        }
                    })
                }
                
                self.poller.client.send_message replaced_message
            end
            
            if self.app.config.is_dev?
                self.app.logger.queue.error "Processor Error:"
                self.app.logger.queue.error ex.message
                self.app.logger.queue.error ex.backtrace
            end
            
            self.job_count -= 1
        }.execute
    else
        self.poller.change_message_visibility_timeout(msg, self.visibility_timeout * 2)
        throw :skip_delete
    end
end
set_status(status) click to toggle source

Set the processor status. The status is alos logged to file so services like capastranio can see the current status

# File lib/herdst_worker/queue/processor.rb, line 87
def set_status(status)
    statuses = ["starting", "idle", "working", "finishing", "stopping", "stopped"]
    
    if statuses.include? status
        # Set status
        self.processor_status = status
        
        # Write the current status to file for capastranio to use
        process_file = self.app.config.paths.temp + "/process_status"
        File.open(process_file, "w") { |file| file.write(status) }
    else
        raise "Invalid status (#{status})"
    end
end
start() click to toggle source

Starts or resets the application to a working status

# File lib/herdst_worker/queue/processor.rb, line 55
def start
    if self.processor_status == "starting"
        self.set_status "working"
        self.reset_time
        self.start_poller
    else
        return if self.processor_status == "working"
        
        self.set_status "working"
        self.reset_time
    end
end
start_poller() click to toggle source

Runs the poller

# File lib/herdst_worker/queue/processor.rb, line 45
def start_poller
    if self.enabled
        self.poller.poll(:wait_time_seconds => self.queue_wait_time, :skip_delete => false) do |msg|
            process_message(msg)
        end
    end
end
stop() click to toggle source

Sets the processor status to stopping. The sqs before action will take care of stopping the application once all jobs have finished.

# File lib/herdst_worker/queue/processor.rb, line 79
def stop
    return if self.processor_status == "stopping"
    set_status "stopping"
end

Private Instance Methods

reset_time() click to toggle source
# File lib/herdst_worker/queue/processor.rb, line 216
def reset_time
    self.start_time = Time.now.utc.to_i
    self.restart_time = self.start_time + (3600)  # One hour
end