class HerdstWorker::Queue::Runner
Public Instance Methods
execute_message!(company_id, user_id, data)
click to toggle source
# File lib/herdst_worker/queue/runner.rb, line 103 def execute_message!(company_id, user_id, data) action = data["configurationId"] action_name = action.camelize unless self.app.config.actions["enabled"].include?(action_name) message = "Invalid action. #{action} is not an enabled action. Please add this action to the config file." if self.app.config.is_dev? raise message else Raven.capture_message(message) return end end action_name.constantize.send(:invoke, company_id, user_id, data) end
process_message!(message, raw_message = nil, will_fail_permanently = false)
click to toggle source
# File lib/herdst_worker/queue/runner.rb, line 9 def process_message!(message, raw_message = nil, will_fail_permanently = false) sent_timestamp = raw_message.attributes.include?("SentTimestamp") ? raw_message.attributes["SentTimestamp"].to_i : nil trigger_timestamp = raw_message.message_attributes.include?("triggerTimestamp") ? raw_message.message_attributes["triggerTimestamp"]["string_value"].to_i : nil expiry = raw_message.message_attributes.include?("expiry") ? raw_message.message_attributes["expiry"]["string_value"].to_i : nil if expiry && (expiry > 0) && ((Time.now.utc.to_i * 1000) > expiry) self.app.logger.queue.info "Job has expired, not running. #{message.inspect}" return Concurrent::Promise.new {} end if message["Type"] != nil and message["Type"] == "Notification" if message["Message"].is_a? String message["Message"] = JSON.parse(message["Message"]) end # Get the type if (message.include? "Subject") && (message["Subject"].include? "Elastic Transcoder") type = "Transcoder" else type = message["Message"]["notificationType"] end # Update the message with sent and triggered timestamp message["Message"]["sentTimestamp"] = sent_timestamp message["Message"]["triggerTimestamp"] = trigger_timestamp || sent_timestamp # Update the message with configuration Id message["Message"]["configurationId"] = "notification#{type}" # Since zips take a log time to process we might need to use: # poller.change_message_visibility_timeout(msg, 60) # To make sure other workers don't pick up the job return Concurrent::Promise.new { if !self.ignored_notifications.include? type execute_message!(nil, nil, message["Message"]) end } end if message["Records"].is_a? Array execution_promise = nil execution_data = [] message["Records"].each do |record| data_source = record["eventSource"].split(":") data_origin = data_source.first data_operation = data_source.last record_data = record[data_operation] company_id = nil user_id = nil # Update the message with sent and triggered timestamp record_data["sentTimestamp"] = sent_timestamp record_data["triggerTimestamp"] = trigger_timestamp || sent_timestamp execution_data << record_data if data_origin === "application" and record.include? "userIdentity" company_id = record["userIdentity"]["companyId"] user_id = record["userIdentity"]["principalId"] end if execution_promise == nil execution_promise = Concurrent::Promise.new { execute_message!(company_id, user_id, record_data) } else execution_promise = execution_promise.then { execute_message!(company_id, user_id, record_data) } end end return Concurrent::Promise.new {} if execution_promise == nil return execution_promise.rescue { |ex| execution_data.each do |data| fail_action_permanently(data) if will_fail_permanently end raise ex } end return Concurrent::Promise.new {} end
Protected Instance Methods
fail_action_permanently(data)
click to toggle source
# File lib/herdst_worker/queue/runner.rb, line 123 def fail_action_permanently(data) if data.include? "action_id" # action = Action.get(data["action_id"]) # # if action # action.update_data({ :stats => nil, :errors => nil }, :errored) # action.cleanup # end end end