class SimplerWorkflow::Workflow

Attributes

domain[R]
initial_activity_type[R]
name[R]
options[R]
task_list[R]
version[R]

Public Class Methods

[](name, version) click to toggle source
# File lib/simpler_workflow/workflow.rb, line 109
def self.[](name, version)
  workflows[[name, version]]
end
new(domain, name, version, options = {}) click to toggle source
# File lib/simpler_workflow/workflow.rb, line 7
def initialize(domain, name, version, options = {})
  Workflow.workflows[[name, version]] ||= begin
    default_options = {
      :default_task_list => name,
      :default_task_start_to_close_timeout => 2 * 60,
      :default_execution_start_to_close_timeout => 2 * 60,
      :default_child_policy => :terminate
    }
    @options = default_options.merge(options)
    @domain = domain
    @name = name
    @version = version
    self
  end
end
register(name, version, workflow) click to toggle source
# File lib/simpler_workflow/workflow.rb, line 113
def self.register(name, version, workflow)
  workflows[[name, version]] = workflow
end

Protected Class Methods

swf() click to toggle source
# File lib/simpler_workflow/workflow.rb, line 134
def self.swf
  SimplerWorkflow.swf
end
workflows() click to toggle source
# File lib/simpler_workflow/workflow.rb, line 130
def self.workflows
  @workflows ||= {}
end

Public Instance Methods

decision_loop() click to toggle source
# File lib/simpler_workflow/workflow.rb, line 28
def decision_loop
  SimplerWorkflow.child_processes << fork do

    $0 = "Workflow: #{name} #{version}"

    Signal.trap('QUIT') do
      # Don't log in trap, ruby 2 complains
      # since we need to exit quickly, only delay quit
      # if we are in the middle of a task
      if @in_task
        @time_to_exit = true 
      else
        Process.exit 0
      end
    end

    Signal.trap('INT') do
      # Don't log in trap, ruby 2 complains
      Process.exit!(0)
    end

    if SimplerWorkflow.after_fork
      SimplerWorkflow.after_fork.call
    end

    loop do
      begin
        logger.info("Waiting for a decision task for #{name.to_s}, #{version} listening to #{task_list}")
        domain.decision_tasks.poll_for_single_task(task_list) do |decision_task|
          @in_task = true # lock for TERM signal handling
          handle_decision_task(decision_task)
        end
        Process.exit 0 if @time_to_exit
      rescue Timeout::Error => e
        if @time_to_exit
          Process.exit 0
        else
          retry
        end
      rescue => e
        context = {
          :workflow => to_workflow_type
        }
        SimplerWorkflow.exception_reporter.report(e, context)
        raise e
      ensure
        @in_task = false
      end
    end
  end
end
initial_activity(name, version = nil) click to toggle source
# File lib/simpler_workflow/workflow.rb, line 23
def initial_activity(name, version = nil)
  activity = Activity[domain, name.to_sym, version]
  @initial_activity_type = activity.to_activity_type
end
last_activity(decision_task, event) click to toggle source
# File lib/simpler_workflow/workflow.rb, line 121
def last_activity(decision_task, event)
  scheduled_event(decision_task, event).attributes.activity_type
end
last_input(decision_task, event) click to toggle source
# File lib/simpler_workflow/workflow.rb, line 125
def last_input(decision_task, event)
  scheduled_event(decision_task, event).attributes.input
end
on_activity_completed(&block) click to toggle source
# File lib/simpler_workflow/workflow.rb, line 97
def on_activity_completed(&block)
  event_handlers['ActivityTaskCompleted'] = block
end
on_activity_failed(&block) click to toggle source
# File lib/simpler_workflow/workflow.rb, line 101
def on_activity_failed(&block)
  event_handlers['ActivityTaskFailed'] = block
end
on_activity_timed_out(&block) click to toggle source
# File lib/simpler_workflow/workflow.rb, line 105
def on_activity_timed_out(&block)
  event_handlers['ActivityTaskTimedOut'] = block
end
on_start_execution(&block) click to toggle source
# File lib/simpler_workflow/workflow.rb, line 93
def on_start_execution(&block)
  event_handlers['WorkflowExecutionStarted'] = block
end
scheduled_event(decision_task, event) click to toggle source
# File lib/simpler_workflow/workflow.rb, line 117
def scheduled_event(decision_task, event)
  decision_task.scheduled_event(event)
end
start_workflow(input, options = {}) click to toggle source
# File lib/simpler_workflow/workflow.rb, line 88
def start_workflow(input, options = {})
  options[:input] = input
  domain.workflow_types[name.to_s, version].start_execution(options)
end
to_workflow_type() click to toggle source
# File lib/simpler_workflow/workflow.rb, line 84
def to_workflow_type
  { :name => name, :version => version }
end

Protected Instance Methods

event_handlers() click to toggle source
# File lib/simpler_workflow/workflow.rb, line 151
def event_handlers
  @event_handlers ||= Map[
    :WorkflowExecutionStarted , WorkflowExecutionStartedHandler.new(self) ,
    :ActivityTaskCompleted    , ActivityTaskCompletedHandler.new(self)    ,
    :ActivityTaskFailed       , ActivityTaskFailedHandler.new(self)       ,
    :ActivityTaskTimedOut     , ActivityTaskTimedOutHandler.new(self)
    ]
end
handle_decision_task(decision_task) click to toggle source
# File lib/simpler_workflow/workflow.rb, line 142
def handle_decision_task(decision_task)
  decision_task.extend AWS::SimpleWorkflow::DecisionTaskAdditions
  logger.info("Received decision task")
  decision_task.new_events.each do |event|
    logger.info("Processing #{event.event_type}")
    event_handlers.fetch(event.event_type, DefaultEventHandler.new(self)).call(decision_task, event)
  end
end
logger() click to toggle source
# File lib/simpler_workflow/workflow.rb, line 138
def logger
  SimplerWorkflow.logger
end