class HerdstWorker::Queue::Runner

Public Instance Methods

execute_message!(company_id, user_id, data) click to toggle source
# File lib/herdst_worker/queue/runner.rb, line 103
def execute_message!(company_id, user_id, data)
    action = data["configurationId"]
    action_name = action.camelize
    
    unless self.app.config.actions["enabled"].include?(action_name)
        message = "Invalid action. #{action} is not an enabled action. Please add this action to the config file."
        
        if self.app.config.is_dev?
            raise message
        else
            Raven.capture_message(message)
            return
        end
    end
    
    action_name.constantize.send(:invoke, company_id, user_id, data)
end
process_message!(message, raw_message = nil, will_fail_permanently = false) click to toggle source
# File lib/herdst_worker/queue/runner.rb, line 9
def process_message!(message, raw_message = nil, will_fail_permanently = false)
    sent_timestamp = raw_message.attributes.include?("SentTimestamp") ? 
        raw_message.attributes["SentTimestamp"].to_i : 
        nil
    trigger_timestamp = raw_message.message_attributes.include?("triggerTimestamp") ? 
        raw_message.message_attributes["triggerTimestamp"]["string_value"].to_i : 
        nil
    expiry = raw_message.message_attributes.include?("expiry") ? 
        raw_message.message_attributes["expiry"]["string_value"].to_i : 
        nil
    
    if expiry && (expiry > 0) && ((Time.now.utc.to_i * 1000) > expiry)
        self.app.logger.queue.info "Job has expired, not running. #{message.inspect}"
        
        return Concurrent::Promise.new {}
    end
    
    if message["Type"] != nil and message["Type"] == "Notification"
        if message["Message"].is_a? String
            message["Message"] = JSON.parse(message["Message"])
        end
        
        # Get the type
        if (message.include? "Subject") && (message["Subject"].include? "Elastic Transcoder")
            type = "Transcoder"
        else
            type = message["Message"]["notificationType"]
        end
        
        # Update the message with sent and triggered timestamp
        message["Message"]["sentTimestamp"] = sent_timestamp
        message["Message"]["triggerTimestamp"] = trigger_timestamp || sent_timestamp
        
        # Update the message with configuration Id
        message["Message"]["configurationId"] = "notification#{type}"
        
        # Since zips take a log time to process we might need to use:
        # poller.change_message_visibility_timeout(msg, 60)
        # To make sure other workers don't pick up the job
        return Concurrent::Promise.new {
            if !self.ignored_notifications.include? type
                execute_message!(nil, nil, message["Message"])
            end
        }
    end
    
    if message["Records"].is_a? Array
        execution_promise = nil
        execution_data = []
        
        message["Records"].each do |record|
            data_source = record["eventSource"].split(":")
            data_origin = data_source.first
            data_operation = data_source.last
            record_data = record[data_operation]                        
            company_id = nil
            user_id = nil
            
            # Update the message with sent and triggered timestamp
            record_data["sentTimestamp"] = sent_timestamp
            record_data["triggerTimestamp"] = trigger_timestamp || sent_timestamp
            
            execution_data << record_data
            
            if data_origin === "application" and record.include? "userIdentity"
                company_id = record["userIdentity"]["companyId"]
                user_id = record["userIdentity"]["principalId"]
            end
            
            if execution_promise == nil
                execution_promise = Concurrent::Promise.new {
                    execute_message!(company_id, user_id, record_data)
                }
            else
                execution_promise = execution_promise.then {
                    execute_message!(company_id, user_id, record_data)
                }
            end
        end
        
        return Concurrent::Promise.new {} if execution_promise == nil
        return execution_promise.rescue { |ex|
            execution_data.each do |data|
                fail_action_permanently(data) if will_fail_permanently
            end
            
            raise ex
        }
    end
    
    return Concurrent::Promise.new {}
end

Protected Instance Methods

fail_action_permanently(data) click to toggle source
# File lib/herdst_worker/queue/runner.rb, line 123
def fail_action_permanently(data)
    if data.include? "action_id"
        # action = Action.get(data["action_id"])
        #
        # if action
        #     action.update_data({ :stats => nil, :errors => nil }, :errored)
        #     action.cleanup
        # end
    end
end