class Glider::Component

Attributes

event[R]
task[R]

Public Class Methods

activities() click to toggle source
# File lib/glider/activities.rb, line 8
def activities
        @activities ||= []
end
activity_name_for(task, event) click to toggle source

used for timeouts and activity task completed

# File lib/glider/workflows.rb, line 93
def activity_name_for(task, event)
        # taken from SimplerWorkflow
      completed_event = task.workflow_execution.events.reverse_order.find do |e| 
              e.id == event.attributes.scheduled_event_id
      end
      activity_name = completed_event.attributes.activity_type.name
        inflected_name = ActiveSupport::Inflector.underscore activity_name
end
domain(domain_name=nil, retention_period: 10) click to toggle source

both setter and getter

# File lib/glider/component.rb, line 53
def domain(domain_name=nil, retention_period: 10)
        if domain_name
                begin
                        @domain = swf.domains[domain_name.to_s]
                        @domain.status
                rescue AWS::SimpleWorkflow::Errors::UnknownResourceFault => e
                        # create it if necessary
                        @domain = swf.domains.create(domain_name.to_s, retention_period)
                end
        else
                @domain
        end
end
graceful_exit() click to toggle source
# File lib/glider/component.rb, line 29
def graceful_exit
        if @in_task
                @time_to_exit = true
        else
                Process.exit! 0
        end
end
has_previous_decisions?(workflow_execution) click to toggle source

let’s us determine if :decised_task_started should be called :workflow_execution_started

# File lib/glider/workflows.rb, line 37
def has_previous_decisions?(workflow_execution)
        workflow_execution.history_events.each do |event|
                event_type = ActiveSupport::Inflector.underscore(event.event_type).to_sym
                return true if event_type == :decision_task_completed
        end
        return false
end
loop_block_for_activity(activity_type) click to toggle source
# File lib/glider/activities.rb, line 50
def loop_block_for_activity(activity_type)
        Proc.new do
                $0 = "ruby #{activity_type.name}-#{activity_type.version}"
                signal_handling
                Glider.logger.info "Startig worker for #{activity_type.name} activity (pid #{Process.pid})"
                loop do
                        begin
                                domain.activity_tasks.poll activity_type.name do |activity_task|
                                        task_lock! do
                                                begin
                                                        workflow_id = activity_task.workflow_execution.workflow_id
                                                        Glider.logger.info "Executing activity=#{activity_type.name} workflow_id=#{workflow_id}"
                                                        target_instance = self.new activity_task
                                                        input = process_input(activity_task.input)
                                                        activity_result = target_instance.send activity_type.name, input
                                                         
                                                        activity_task.complete! result: activity_result.to_s unless activity_task.responded?

                                                rescue AWS::SimpleWorkflow::ActivityTask::CancelRequestedError
                                                        # cleanup after ourselves
                                                        activity_task.cancel!
                                                end
                                        end
                                end
                        rescue AWS::SimpleWorkflow::Errors::UnknownResourceFault
                                $logger.error "An action relating to an expired workflow was sent. Probably the activity took longer than the execution timeout span."
                        rescue RuntimeError => e
                                if e.to_s == "already responded"
                                        # this error sometimes appear if failing and completing happen very close in time and SWF doesn't report correctly the responded? status
                                        Glider.logger.warn "Ignoring error responding to activity task failed. Most likely caused because your task failed the activity already."
                                else
                                        raise e
                                end
                        end
                end
        end
end
loop_block_for_workflow(workflow_type) click to toggle source
# File lib/glider/workflows.rb, line 136
def loop_block_for_workflow(workflow_type)
        Proc.new do
                $0 = "ruby #{workflow_type.name}-#{workflow_type.version}"
                signal_handling
                Glider.logger.info "Startig worker for #{workflow_type.name} (pid #{Process.pid})"
                loop do
                        begin
                                domain.decision_tasks.poll workflow_type.name do |decision_task|
                                        task_lock! do
                                                process_decision_task workflow_type, decision_task
                                        end
                                end
                        rescue AWS::SimpleWorkflow::Errors::UnknownResourceFault
                                $logger.error "An action relating to an expired decision was sent. Probably the decider took longer than the decision timeout span."
                        end
                end
        end
end
new(task, event=nil) click to toggle source
# File lib/glider/component.rb, line 9
def initialize(task, event=nil)
        @task = task
        @event = event
end
process_decision_task(workflow_type, task) click to toggle source
# File lib/glider/workflows.rb, line 102
def process_decision_task(workflow_type, task)
        workflow_id = task.workflow_execution.workflow_id
        task.new_events.each do |event| 
                event_name = ActiveSupport::Inflector.underscore(event.event_type).to_sym
                if should_call_workflow_target? event_name, task.workflow_execution
                   target_instance = self.new task, event
                   data = workflow_data_for(event_name, event)
                   # convert signals to event names!
                   case event_name
                   when :workflow_execution_signaled
                           event_name = "#{event.attributes.signal_name}_signal".to_sym
                   when :activity_task_completed
                           event_name = "#{activity_name_for(task, event)}_activity_completed".to_sym
                   when :activity_task_failed
                           event_name = "#{activity_name_for(task, event)}_activity_failed".to_sym
                   when :activity_task_timed_out
                           event_name = "#{activity_name_for(task, event)}_activity_timed_out".to_sym
                        end
                        
                        Glider.logger.info "event_name=#{event_name} workflow=#{workflow_type.name} workflow_id=#{workflow_id}"
                        target_instance.send workflow_type.name, event_name, event, data
                        

                        # ensure proper response was given (aka a decision taken)
                        decisions = task.instance_eval {@decisions}
                        Glider.logger.debug decisions
                        if decisions.length == 0 && !task.responded?
                                # the decider didn't add any decision
                                Glider.logger.warn "No decision was made event_name=#{event_name} workflow=#{workflow_type.name} workflow_id=#{task.workflow_execution.workflow_id}"
                        end
                end
        end
end
process_input(input) click to toggle source
# File lib/glider/activities.rb, line 35
def process_input(input)
        if input.nil?
                nil
        else
                # try to parse input as json
                begin
                        input = ActiveSupport::HashWithIndifferentAccess.new JSON.parse(input)
                rescue JSON::ParserError
                        input
                end
        end
end
register_activity(name, version, options={}) click to toggle source
# File lib/glider/activities.rb, line 12
def register_activity(name, version, options={})
        default_options = {
                :default_task_list => name.to_s,
                :default_task_schedule_to_start_timeout => :none,
                :default_task_start_to_close_timeout => 60,
                :default_task_schedule_to_close_timeout => :none,
                :default_task_heartbeat_timeout => :none

        }

        options = default_options.merge options

        begin
                activity_type = domain.activity_types.create name.to_s, version.to_s, options
        rescue AWS::SimpleWorkflow::Errors::TypeAlreadyExistsFault
                # already registered
                activity_type = domain.activity_types[name.to_s, version.to_s]
        end
        workers.times do 
                ProcessManager.register_worker loop_block_for_activity(activity_type)
        end
end
register_workflow(name, version, options={}) click to toggle source
# File lib/glider/workflows.rb, line 16
def register_workflow(name, version, options={})

        default_options = {
                :default_task_list => name.to_s,
                :default_child_policy => :request_cancel,
                :default_task_start_to_close_timeout => 10, # decider timeout
                :default_execution_start_to_close_timeout => 120
        }
        options = default_options.merge options
        begin
                workflow_type = domain.workflow_types.create name.to_s, version.to_s, options
        rescue AWS::SimpleWorkflow::Errors::TypeAlreadyExistsFault
                # already registered
                workflow_type = domain.workflow_types[name.to_s, version.to_s]
        end
        workers.times do 
                ProcessManager.register_worker loop_block_for_workflow(workflow_type)
        end
end
should_call_workflow_target?(event_name, workflow_execution) click to toggle source
# File lib/glider/workflows.rb, line 45
def should_call_workflow_target?(event_name, workflow_execution)
        case event_name
        when        :activity_task_scheduled,
                        :activity_task_started,
                        :decision_task_scheduled,
                        :decision_task_started,
                        :decision_task_completed,
                        :marker_recorded,
                        :timer_started,
                        :start_child_workflow_execution_initiated,
                        :start_child_workflow_execution_started,
                        :signal_external_workflow_execution_initiated,
                        :request_cancel_external_workflow_execution_initiated

                Glider.logger.debug "Skipping decider call event=#{event_name} workflow_id=#{workflow_execution.workflow_id}"
                return false
        else
                return true
        end
end
signal_handling() click to toggle source
# File lib/glider/component.rb, line 37
def signal_handling
        Signal.trap('USR1') do
                graceful_exit
        end
end
swf() click to toggle source
# File lib/glider/component.rb, line 43
def swf
        @swf ||= AWS::SimpleWorkflow.new
end
task_lock!() { || ... } click to toggle source
# File lib/glider/component.rb, line 21
def task_lock!
        @in_task = true
        yield
ensure
        @in_task = false
        Process.exit! 0 if @time_to_exit # in case an exit signal was received during task processing
end
workers(workers_count=nil) click to toggle source

both setter and getter

# File lib/glider/component.rb, line 48
def workers(workers_count=nil)
        workers_count ? @workers = workers_count : @workers ||= 1
end
workflow_data_for(event_name, event) click to toggle source
# File lib/glider/workflows.rb, line 66
                def workflow_data_for(event_name, event)
                        data =      case event_name
                                        when :workflow_execution_started #:decision_task_scheduled
                                                event.attributes.input
                                        when :workflow_execution_signaled
                                                begin event.attributes.input rescue nil end
                                        when :activity_task_completed
                                                begin event.attributes.result rescue nil end
                                        else
                                                begin 
                                                        event.attributes.result
                                                rescue
                                                        Glider.logger.debug "no input or result in event, data will be nil event=#{event_name} attributes=#{event.attributes.to_h}"
                                                        nil
                                                end
                                        end 
                        return data if data.nil?
                        # try to parse as JSON
                        begin

                                ActiveSupport::HashWithIndifferentAccess.new JSON.parse(data)
                        rescue JSON::ParserError
                                data
                        end
                end

                # used for timeouts and activity task completed
                def activity_name_for(task, event)
                        # taken from SimplerWorkflow
                      completed_event = task.workflow_execution.events.reverse_order.find do |e| 
                              e.id == event.attributes.scheduled_event_id
                      end
                      activity_name = completed_event.attributes.activity_type.name
                        inflected_name = ActiveSupport::Inflector.underscore activity_name
                end

                def process_decision_task(workflow_type, task)
                        workflow_id = task.workflow_execution.workflow_id
                        task.new_events.each do |event| 
                                event_name = ActiveSupport::Inflector.underscore(event.event_type).to_sym
                                if should_call_workflow_target? event_name, task.workflow_execution
                                   target_instance = self.new task, event
                                   data = workflow_data_for(event_name, event)
                                   # convert signals to event names!
                                   case event_name
                                   when :workflow_execution_signaled
                                           event_name = "#{event.attributes.signal_name}_signal".to_sym
                                   when :activity_task_completed
                                           event_name = "#{activity_name_for(task, event)}_activity_completed".to_sym
                                   when :activity_task_failed
                                           event_name = "#{activity_name_for(task, event)}_activity_failed".to_sym
                                   when :activity_task_timed_out
                                           event_name = "#{activity_name_for(task, event)}_activity_timed_out".to_sym
                                        end
                                        
                                        Glider.logger.info "event_name=#{event_name} workflow=#{workflow_type.name} workflow_id=#{workflow_id}"
                                        target_instance.send workflow_type.name, event_name, event, data
                                        

                                        # ensure proper response was given (aka a decision taken)
                                        decisions = task.instance_eval {@decisions}
                                        Glider.logger.debug decisions
                                        if decisions.length == 0 && !task.responded?
                                                # the decider didn't add any decision
                                                Glider.logger.warn "No decision was made event_name=#{event_name} workflow=#{workflow_type.name} workflow_id=#{task.workflow_execution.workflow_id}"
                                        end
                                end
                        end
                end

                def loop_block_for_workflow(workflow_type)
                        Proc.new do
                                $0 = "ruby #{workflow_type.name}-#{workflow_type.version}"
                                signal_handling
                                Glider.logger.info "Startig worker for #{workflow_type.name} (pid #{Process.pid})"
                                loop do
                                        begin
                                                domain.decision_tasks.poll workflow_type.name do |decision_task|
                                                        task_lock! do
                                                                process_decision_task workflow_type, decision_task
                                                        end
                                                end
                                        rescue AWS::SimpleWorkflow::Errors::UnknownResourceFault
                                                $logger.error "An action relating to an expired decision was sent. Probably the decider took longer than the decision timeout span."
                                        end
                                end
                        end
                end

        end

end
workflows() click to toggle source
# File lib/glider/workflows.rb, line 12
def workflows
        @workflows ||= []
end

Public Instance Methods

activity(name, version) click to toggle source
# File lib/glider/component.rb, line 14
def activity(name, version)
        {name: name.to_s, version: version.to_s}
end