class Glider::Component
Attributes
event[R]
task[R]
Public Class Methods
activities()
click to toggle source
# File lib/glider/activities.rb, line 8 def activities @activities ||= [] end
activity_name_for(task, event)
click to toggle source
used for timeouts and activity task completed
# File lib/glider/workflows.rb, line 93 def activity_name_for(task, event) # taken from SimplerWorkflow completed_event = task.workflow_execution.events.reverse_order.find do |e| e.id == event.attributes.scheduled_event_id end activity_name = completed_event.attributes.activity_type.name inflected_name = ActiveSupport::Inflector.underscore activity_name end
domain(domain_name=nil, retention_period: 10)
click to toggle source
both setter and getter
# File lib/glider/component.rb, line 53 def domain(domain_name=nil, retention_period: 10) if domain_name begin @domain = swf.domains[domain_name.to_s] @domain.status rescue AWS::SimpleWorkflow::Errors::UnknownResourceFault => e # create it if necessary @domain = swf.domains.create(domain_name.to_s, retention_period) end else @domain end end
graceful_exit()
click to toggle source
# File lib/glider/component.rb, line 29 def graceful_exit if @in_task @time_to_exit = true else Process.exit! 0 end end
has_previous_decisions?(workflow_execution)
click to toggle source
let’s us determine if :decised_task_started should be called :workflow_execution_started
# File lib/glider/workflows.rb, line 37 def has_previous_decisions?(workflow_execution) workflow_execution.history_events.each do |event| event_type = ActiveSupport::Inflector.underscore(event.event_type).to_sym return true if event_type == :decision_task_completed end return false end
loop_block_for_activity(activity_type)
click to toggle source
# File lib/glider/activities.rb, line 50 def loop_block_for_activity(activity_type) Proc.new do $0 = "ruby #{activity_type.name}-#{activity_type.version}" signal_handling Glider.logger.info "Startig worker for #{activity_type.name} activity (pid #{Process.pid})" loop do begin domain.activity_tasks.poll activity_type.name do |activity_task| task_lock! do begin workflow_id = activity_task.workflow_execution.workflow_id Glider.logger.info "Executing activity=#{activity_type.name} workflow_id=#{workflow_id}" target_instance = self.new activity_task input = process_input(activity_task.input) activity_result = target_instance.send activity_type.name, input activity_task.complete! result: activity_result.to_s unless activity_task.responded? rescue AWS::SimpleWorkflow::ActivityTask::CancelRequestedError # cleanup after ourselves activity_task.cancel! end end end rescue AWS::SimpleWorkflow::Errors::UnknownResourceFault $logger.error "An action relating to an expired workflow was sent. Probably the activity took longer than the execution timeout span." rescue RuntimeError => e if e.to_s == "already responded" # this error sometimes appear if failing and completing happen very close in time and SWF doesn't report correctly the responded? status Glider.logger.warn "Ignoring error responding to activity task failed. Most likely caused because your task failed the activity already." else raise e end end end end end
loop_block_for_workflow(workflow_type)
click to toggle source
# File lib/glider/workflows.rb, line 136 def loop_block_for_workflow(workflow_type) Proc.new do $0 = "ruby #{workflow_type.name}-#{workflow_type.version}" signal_handling Glider.logger.info "Startig worker for #{workflow_type.name} (pid #{Process.pid})" loop do begin domain.decision_tasks.poll workflow_type.name do |decision_task| task_lock! do process_decision_task workflow_type, decision_task end end rescue AWS::SimpleWorkflow::Errors::UnknownResourceFault $logger.error "An action relating to an expired decision was sent. Probably the decider took longer than the decision timeout span." end end end end
new(task, event=nil)
click to toggle source
# File lib/glider/component.rb, line 9 def initialize(task, event=nil) @task = task @event = event end
process_decision_task(workflow_type, task)
click to toggle source
# File lib/glider/workflows.rb, line 102 def process_decision_task(workflow_type, task) workflow_id = task.workflow_execution.workflow_id task.new_events.each do |event| event_name = ActiveSupport::Inflector.underscore(event.event_type).to_sym if should_call_workflow_target? event_name, task.workflow_execution target_instance = self.new task, event data = workflow_data_for(event_name, event) # convert signals to event names! case event_name when :workflow_execution_signaled event_name = "#{event.attributes.signal_name}_signal".to_sym when :activity_task_completed event_name = "#{activity_name_for(task, event)}_activity_completed".to_sym when :activity_task_failed event_name = "#{activity_name_for(task, event)}_activity_failed".to_sym when :activity_task_timed_out event_name = "#{activity_name_for(task, event)}_activity_timed_out".to_sym end Glider.logger.info "event_name=#{event_name} workflow=#{workflow_type.name} workflow_id=#{workflow_id}" target_instance.send workflow_type.name, event_name, event, data # ensure proper response was given (aka a decision taken) decisions = task.instance_eval {@decisions} Glider.logger.debug decisions if decisions.length == 0 && !task.responded? # the decider didn't add any decision Glider.logger.warn "No decision was made event_name=#{event_name} workflow=#{workflow_type.name} workflow_id=#{task.workflow_execution.workflow_id}" end end end end
process_input(input)
click to toggle source
# File lib/glider/activities.rb, line 35 def process_input(input) if input.nil? nil else # try to parse input as json begin input = ActiveSupport::HashWithIndifferentAccess.new JSON.parse(input) rescue JSON::ParserError input end end end
register_activity(name, version, options={})
click to toggle source
# File lib/glider/activities.rb, line 12 def register_activity(name, version, options={}) default_options = { :default_task_list => name.to_s, :default_task_schedule_to_start_timeout => :none, :default_task_start_to_close_timeout => 60, :default_task_schedule_to_close_timeout => :none, :default_task_heartbeat_timeout => :none } options = default_options.merge options begin activity_type = domain.activity_types.create name.to_s, version.to_s, options rescue AWS::SimpleWorkflow::Errors::TypeAlreadyExistsFault # already registered activity_type = domain.activity_types[name.to_s, version.to_s] end workers.times do ProcessManager.register_worker loop_block_for_activity(activity_type) end end
register_workflow(name, version, options={})
click to toggle source
# File lib/glider/workflows.rb, line 16 def register_workflow(name, version, options={}) default_options = { :default_task_list => name.to_s, :default_child_policy => :request_cancel, :default_task_start_to_close_timeout => 10, # decider timeout :default_execution_start_to_close_timeout => 120 } options = default_options.merge options begin workflow_type = domain.workflow_types.create name.to_s, version.to_s, options rescue AWS::SimpleWorkflow::Errors::TypeAlreadyExistsFault # already registered workflow_type = domain.workflow_types[name.to_s, version.to_s] end workers.times do ProcessManager.register_worker loop_block_for_workflow(workflow_type) end end
should_call_workflow_target?(event_name, workflow_execution)
click to toggle source
# File lib/glider/workflows.rb, line 45 def should_call_workflow_target?(event_name, workflow_execution) case event_name when :activity_task_scheduled, :activity_task_started, :decision_task_scheduled, :decision_task_started, :decision_task_completed, :marker_recorded, :timer_started, :start_child_workflow_execution_initiated, :start_child_workflow_execution_started, :signal_external_workflow_execution_initiated, :request_cancel_external_workflow_execution_initiated Glider.logger.debug "Skipping decider call event=#{event_name} workflow_id=#{workflow_execution.workflow_id}" return false else return true end end
signal_handling()
click to toggle source
# File lib/glider/component.rb, line 37 def signal_handling Signal.trap('USR1') do graceful_exit end end
swf()
click to toggle source
# File lib/glider/component.rb, line 43 def swf @swf ||= AWS::SimpleWorkflow.new end
task_lock!() { || ... }
click to toggle source
# File lib/glider/component.rb, line 21 def task_lock! @in_task = true yield ensure @in_task = false Process.exit! 0 if @time_to_exit # in case an exit signal was received during task processing end
workers(workers_count=nil)
click to toggle source
both setter and getter
# File lib/glider/component.rb, line 48 def workers(workers_count=nil) workers_count ? @workers = workers_count : @workers ||= 1 end
workflow_data_for(event_name, event)
click to toggle source
# File lib/glider/workflows.rb, line 66 def workflow_data_for(event_name, event) data = case event_name when :workflow_execution_started #:decision_task_scheduled event.attributes.input when :workflow_execution_signaled begin event.attributes.input rescue nil end when :activity_task_completed begin event.attributes.result rescue nil end else begin event.attributes.result rescue Glider.logger.debug "no input or result in event, data will be nil event=#{event_name} attributes=#{event.attributes.to_h}" nil end end return data if data.nil? # try to parse as JSON begin ActiveSupport::HashWithIndifferentAccess.new JSON.parse(data) rescue JSON::ParserError data end end # used for timeouts and activity task completed def activity_name_for(task, event) # taken from SimplerWorkflow completed_event = task.workflow_execution.events.reverse_order.find do |e| e.id == event.attributes.scheduled_event_id end activity_name = completed_event.attributes.activity_type.name inflected_name = ActiveSupport::Inflector.underscore activity_name end def process_decision_task(workflow_type, task) workflow_id = task.workflow_execution.workflow_id task.new_events.each do |event| event_name = ActiveSupport::Inflector.underscore(event.event_type).to_sym if should_call_workflow_target? event_name, task.workflow_execution target_instance = self.new task, event data = workflow_data_for(event_name, event) # convert signals to event names! case event_name when :workflow_execution_signaled event_name = "#{event.attributes.signal_name}_signal".to_sym when :activity_task_completed event_name = "#{activity_name_for(task, event)}_activity_completed".to_sym when :activity_task_failed event_name = "#{activity_name_for(task, event)}_activity_failed".to_sym when :activity_task_timed_out event_name = "#{activity_name_for(task, event)}_activity_timed_out".to_sym end Glider.logger.info "event_name=#{event_name} workflow=#{workflow_type.name} workflow_id=#{workflow_id}" target_instance.send workflow_type.name, event_name, event, data # ensure proper response was given (aka a decision taken) decisions = task.instance_eval {@decisions} Glider.logger.debug decisions if decisions.length == 0 && !task.responded? # the decider didn't add any decision Glider.logger.warn "No decision was made event_name=#{event_name} workflow=#{workflow_type.name} workflow_id=#{task.workflow_execution.workflow_id}" end end end end def loop_block_for_workflow(workflow_type) Proc.new do $0 = "ruby #{workflow_type.name}-#{workflow_type.version}" signal_handling Glider.logger.info "Startig worker for #{workflow_type.name} (pid #{Process.pid})" loop do begin domain.decision_tasks.poll workflow_type.name do |decision_task| task_lock! do process_decision_task workflow_type, decision_task end end rescue AWS::SimpleWorkflow::Errors::UnknownResourceFault $logger.error "An action relating to an expired decision was sent. Probably the decider took longer than the decision timeout span." end end end end end end
workflows()
click to toggle source
# File lib/glider/workflows.rb, line 12 def workflows @workflows ||= [] end
Public Instance Methods
activity(name, version)
click to toggle source
# File lib/glider/component.rb, line 14 def activity(name, version) {name: name.to_s, version: version.to_s} end