class Cadence::Workflow::Context
Attributes
dispatcher[R]
metadata[R]
state_manager[R]
Public Class Methods
new(state_manager, dispatcher, metadata)
click to toggle source
# File lib/cadence/workflow/context.rb, line 18 def initialize(state_manager, dispatcher, metadata) @state_manager = state_manager @dispatcher = dispatcher @metadata = metadata end
Public Instance Methods
cancel(target, cancelation_id)
click to toggle source
# File lib/cadence/workflow/context.rb, line 233 def cancel(target, cancelation_id) case target.type when History::EventTarget::ACTIVITY_TYPE cancel_activity(cancelation_id) when History::EventTarget::TIMER_TYPE cancel_timer(cancelation_id) else raise "#{target} can not be canceled" end end
cancel_activity(activity_id)
click to toggle source
# File lib/cadence/workflow/context.rb, line 227 def cancel_activity(activity_id) decision = Decision::RequestActivityCancellation.new(activity_id: activity_id) schedule_decision(decision) end
cancel_timer(timer_id)
click to toggle source
# File lib/cadence/workflow/context.rb, line 180 def cancel_timer(timer_id) decision = Decision::CancelTimer.new(timer_id: timer_id) schedule_decision(decision) end
complete(result = nil)
click to toggle source
TODO: check if workflow can be completed
# File lib/cadence/workflow/context.rb, line 186 def complete(result = nil) decision = Decision::CompleteWorkflow.new(result: result) schedule_decision(decision) end
execute_activity(activity_class, *input, **args)
click to toggle source
# File lib/cadence/workflow/context.rb, line 38 def execute_activity(activity_class, *input, **args) options = args.delete(:options) || {} input << args unless args.empty? execution_options = ExecutionOptions.new(activity_class, options) decision = Decision::ScheduleActivity.new( activity_id: options[:activity_id], activity_type: execution_options.name, input: input, domain: execution_options.domain, task_list: execution_options.task_list, retry_policy: execution_options.retry_policy, timeouts: execution_options.timeouts, headers: execution_options.headers ) target, cancelation_id = schedule_decision(decision) future = Future.new(target, self, cancelation_id: cancelation_id) dispatcher.register_handler(target, 'completed') do |result| future.set(result) future.callbacks.each { |callback| call_in_fiber(callback, result) } end dispatcher.register_handler(target, 'failed') do |reason, details| future.fail(reason, details) end future end
execute_activity!(activity_class, *input, **args)
click to toggle source
# File lib/cadence/workflow/context.rb, line 70 def execute_activity!(activity_class, *input, **args) future = execute_activity(activity_class, *input, **args) result = future.get if future.failed? reason, details = result error_class = safe_constantize(reason) || Cadence::ActivityException raise error_class, details end result end
execute_local_activity(activity_class, *input, **args)
click to toggle source
TODO: how to handle failures?
# File lib/cadence/workflow/context.rb, line 86 def execute_local_activity(activity_class, *input, **args) input << args unless args.empty? side_effect do # TODO: this probably requires a local context implementation context = Activity::Context.new(nil, nil) activity_class.execute_in_context(context, input) end end
execute_workflow(workflow_class, *input, **args)
click to toggle source
# File lib/cadence/workflow/context.rb, line 96 def execute_workflow(workflow_class, *input, **args) options = args.delete(:options) || {} input << args unless args.empty? execution_options = ExecutionOptions.new(workflow_class, options) decision = Decision::StartChildWorkflow.new( workflow_id: options[:workflow_id] || SecureRandom.uuid, workflow_type: execution_options.name, input: input, domain: execution_options.domain, task_list: execution_options.task_list, retry_policy: execution_options.retry_policy, timeouts: execution_options.timeouts, headers: execution_options.headers ) target, cancelation_id = schedule_decision(decision) future = Future.new(target, self, cancelation_id: cancelation_id) dispatcher.register_handler(target, 'completed') do |result| future.set(result) future.callbacks.each { |callback| call_in_fiber(callback, result) } end dispatcher.register_handler(target, 'failed') do |reason, details| future.fail(reason, details) end future end
execute_workflow!(workflow_class, *input, **args)
click to toggle source
# File lib/cadence/workflow/context.rb, line 128 def execute_workflow!(workflow_class, *input, **args) future = execute_workflow(workflow_class, *input, **args) result = future.get if future.failed? reason, details = result error_class = safe_constantize(reason) || StandardError.new(details) raise error_class, details end result end
fail(reason, details = nil)
click to toggle source
TODO: check if workflow can be failed
# File lib/cadence/workflow/context.rb, line 192 def fail(reason, details = nil) decision = Decision::FailWorkflow.new(reason: reason, details: details) schedule_decision(decision) end
has_release?(release_name)
click to toggle source
# File lib/cadence/workflow/context.rb, line 34 def has_release?(release_name) state_manager.release?(release_name.to_s) end
headers()
click to toggle source
# File lib/cadence/workflow/context.rb, line 30 def headers metadata.headers end
logger()
click to toggle source
# File lib/cadence/workflow/context.rb, line 24 def logger @logger ||= ReplayAwareLogger.new(Cadence.logger) @logger.replay = state_manager.replay? @logger end
now()
click to toggle source
# File lib/cadence/workflow/context.rb, line 215 def now state_manager.local_time end
on_signal(&block)
click to toggle source
# File lib/cadence/workflow/context.rb, line 219 def on_signal(&block) target = History::EventTarget.workflow dispatcher.register_handler(target, 'signaled') do |signal, input| call_in_fiber(block, signal, input) end end
side_effect(&block)
click to toggle source
# File lib/cadence/workflow/context.rb, line 143 def side_effect(&block) marker = state_manager.next_side_effect return marker.last if marker result = block.call decision = Decision::RecordMarker.new(name: StateManager::SIDE_EFFECT_MARKER, details: result) schedule_decision(decision) result end
sleep(timeout)
click to toggle source
# File lib/cadence/workflow/context.rb, line 154 def sleep(timeout) start_timer(timeout).wait end
sleep_until(end_time)
click to toggle source
# File lib/cadence/workflow/context.rb, line 158 def sleep_until(end_time) delay = (end_time.to_time - now).to_i sleep(delay) if delay > 0 end
start_timer(timeout, timer_id = nil)
click to toggle source
# File lib/cadence/workflow/context.rb, line 163 def start_timer(timeout, timer_id = nil) decision = Decision::StartTimer.new(timeout: timeout, timer_id: timer_id) target, cancelation_id = schedule_decision(decision) future = Future.new(target, self, cancelation_id: cancelation_id) dispatcher.register_handler(target, 'fired') do |result| future.set(result) future.callbacks.each { |callback| call_in_fiber(callback, result) } end dispatcher.register_handler(target, 'canceled') do |reason, details| future.fail(reason, details) end future end
wait_for(future)
click to toggle source
# File lib/cadence/workflow/context.rb, line 203 def wait_for(future) fiber = Fiber.current dispatcher.register_handler(future.target, Dispatcher::WILDCARD) do fiber.resume if future.finished? end Fiber.yield return end
wait_for_all(*futures)
click to toggle source
# File lib/cadence/workflow/context.rb, line 197 def wait_for_all(*futures) futures.each(&:wait) return end
Private Instance Methods
call_in_fiber(block, *args)
click to toggle source
# File lib/cadence/workflow/context.rb, line 252 def call_in_fiber(block, *args) Fiber.new do Cadence::ThreadLocalContext.set(self) block.call(*args) end.resume end
safe_constantize(const)
click to toggle source
# File lib/cadence/workflow/context.rb, line 259 def safe_constantize(const) Object.const_get(const) if Object.const_defined?(const) rescue NameError nil end
schedule_decision(decision)
click to toggle source
# File lib/cadence/workflow/context.rb, line 248 def schedule_decision(decision) state_manager.schedule(decision) end