class SimplerWorkflow::Activity

Attributes

domain[R]
name[R]
next_activity[R]
options[R]
task_list[R]
version[R]

Public Class Methods

[](*activity_tuple) click to toggle source
# File lib/simpler_workflow/activity.rb, line 163
def self.[](*activity_tuple)
  activities[*activity_tuple]
end
[]=(*activity_tuple) click to toggle source
# File lib/simpler_workflow/activity.rb, line 167
def self.[]=(*activity_tuple)
  activities.[]=(*activity_tuple)
end
new(domain, name, version, options = {}) click to toggle source
# File lib/simpler_workflow/activity.rb, line 7
def initialize(domain, name, version, options = {})
  default_options =
    {
    :default_task_list => name,
    :default_task_start_to_close_timeout => 5 * 60,
    :default_task_schedule_to_start_timeout => 5 * 60,
    :default_task_schedule_to_close_timeout => 10 * 60,
    :default_task_heartbeat_timeout => :none
  }
  @options = default_options.merge(options)
  @domain = domain
  @name = name
  @version = version
  @failure_policy = :fail
  @task_list = name.to_s
end
register(domain, name, version, activity) click to toggle source
# File lib/simpler_workflow/activity.rb, line 171
def self.register(domain, name, version, activity)
  activities.register(domain, name, version, activity)
end

Protected Class Methods

activities() click to toggle source
# File lib/simpler_workflow/activity.rb, line 181
def self.activities
  @activities ||= ActivityRegistry.new
end
swf() click to toggle source
# File lib/simpler_workflow/activity.rb, line 185
def self.swf
  SimplerWorkflow.swf
end

Public Instance Methods

count() click to toggle source
# File lib/simpler_workflow/activity.rb, line 159
def count
  domain.activity_tasks.count(name).to_i
end
failure_policy() click to toggle source
# File lib/simpler_workflow/activity.rb, line 42
def failure_policy
  @failure_policy || :fail
end
on_fail(failure_policy) click to toggle source
# File lib/simpler_workflow/activity.rb, line 38
def on_fail(failure_policy)
  @failure_policy = failure_policy.to_sym
end
on_success(activity, version = nil) click to toggle source
# File lib/simpler_workflow/activity.rb, line 24
def on_success(activity, version = nil)
  case activity
  when Hash
    name = activity[:name].to_sym
    version = activity[:version]
  when String
    name = activity.to_sym
  when Symbol
    name = activity
  end

  @next_activity = Activity[domain, name, version]
end
perform_activity(&block) click to toggle source
# File lib/simpler_workflow/activity.rb, line 46
def perform_activity(&block)
  @perform_task = block
end
perform_task(task) click to toggle source
# File lib/simpler_workflow/activity.rb, line 54
def perform_task(task)
  logger.info("Performing task #{name}")
  @perform_task.call(task)
rescue => e
  context = {}
  context[:activity_type] = [name.to_s, version]
  context[:input] = task.input
  context[:activity_id] = task.activity_id
  SimplerWorkflow.exception_reporter.report(e, context)
  task.fail! :reason => e.message[0..250], :details => {:failure_policy => failure_policy}.to_json
end
persist_attributes() click to toggle source
# File lib/simpler_workflow/activity.rb, line 70
def persist_attributes
  activities.persist_attributes(self)
end
poll_for_single_task() click to toggle source
# File lib/simpler_workflow/activity.rb, line 154
def poll_for_single_task
  logger.info("Polling for single task for #{name}")
  domain.activity_tasks.poll_for_single_task(name.to_s)
end
simple_db_attributes() click to toggle source
# File lib/simpler_workflow/activity.rb, line 74
def simple_db_attributes
  attributes = {
    domain: domain.name,
    name: name,
    version: version,
    failure_policy: failure_policy
  }

  if (next_activity)
    attributes[:next_activity_name] = next_activity.name
    attributes[:next_activity_version] = next_activity.version
  end

  attributes
end
simple_db_name() click to toggle source
# File lib/simpler_workflow/activity.rb, line 90
def simple_db_name
  "#{name}-#{version}"
end
start_activity_loop() click to toggle source
# File lib/simpler_workflow/activity.rb, line 94
def start_activity_loop
  SimplerWorkflow.child_processes << fork do

    $0 = "Activity: #{name} #{version}"

    Signal.trap('QUIT') do
      # Don't log in trap, ruby 2 complains
      # since we need to exit quickly, only delay quit
      # if we are in the middle of a task
      if @in_task
        @time_to_exit = true 
      else
        Process.exit 0
      end
    end

    Signal.trap('INT') do
      # Don't log in trap, ruby 2 complains
      Process.exit!(0)
    end


    if SimplerWorkflow.after_fork
      SimplerWorkflow.after_fork.call
    end

    loop do
      begin
        logger.info("Starting activity_loop for #{name}")
        domain.activity_tasks.poll(task_list) do |task|
          begin
            logger.info("Received task...")
            @in_task = true
            perform_task(task)
            unless task.responded?
              task.complete!
            end
          rescue => e
            context = {}
            context[:activity_type] = [name.to_s, version]
            context[:input] = task.input
            context[:activity_id] = task.activity_id
            SimplerWorkflow.exception_reporter.report(e, context)
            task.fail! :reason => e.message, :details => { :failure_policy => :fail }.to_json unless task.responded?
          ensure
            @in_task = false
          end
        end
        Process.exit(0) if @time_to_exit
      rescue Timeout::Error
        if @time_to_exit
          Process.exit(0)
        else
          retry
        end
      end
    end
  end
end
to_activity_type() click to toggle source
# File lib/simpler_workflow/activity.rb, line 66
def to_activity_type
  domain.activity_types[name, version]
end

Protected Instance Methods

activities() click to toggle source
# File lib/simpler_workflow/activity.rb, line 177
def activities
  self.class.activities
end
logger() click to toggle source
# File lib/simpler_workflow/activity.rb, line 189
def logger
  $logger || Rails.logger
end