class Cadence::Workflow::Context

Attributes

dispatcher[R]
metadata[R]
state_manager[R]

Public Class Methods

new(state_manager, dispatcher, metadata) click to toggle source
# File lib/cadence/workflow/context.rb, line 18
def initialize(state_manager, dispatcher, metadata)
  @state_manager = state_manager
  @dispatcher = dispatcher
  @metadata = metadata
end

Public Instance Methods

cancel(target, cancelation_id) click to toggle source
# File lib/cadence/workflow/context.rb, line 233
def cancel(target, cancelation_id)
  case target.type
  when History::EventTarget::ACTIVITY_TYPE
    cancel_activity(cancelation_id)
  when History::EventTarget::TIMER_TYPE
    cancel_timer(cancelation_id)
  else
    raise "#{target} can not be canceled"
  end
end
cancel_activity(activity_id) click to toggle source
# File lib/cadence/workflow/context.rb, line 227
def cancel_activity(activity_id)
  decision = Decision::RequestActivityCancellation.new(activity_id: activity_id)

  schedule_decision(decision)
end
cancel_timer(timer_id) click to toggle source
# File lib/cadence/workflow/context.rb, line 180
def cancel_timer(timer_id)
  decision = Decision::CancelTimer.new(timer_id: timer_id)
  schedule_decision(decision)
end
complete(result = nil) click to toggle source

TODO: check if workflow can be completed

# File lib/cadence/workflow/context.rb, line 186
def complete(result = nil)
  decision = Decision::CompleteWorkflow.new(result: result)
  schedule_decision(decision)
end
execute_activity(activity_class, *input, **args) click to toggle source
# File lib/cadence/workflow/context.rb, line 38
def execute_activity(activity_class, *input, **args)
  options = args.delete(:options) || {}
  input << args unless args.empty?

  execution_options = ExecutionOptions.new(activity_class, options)

  decision = Decision::ScheduleActivity.new(
    activity_id: options[:activity_id],
    activity_type: execution_options.name,
    input: input,
    domain: execution_options.domain,
    task_list: execution_options.task_list,
    retry_policy: execution_options.retry_policy,
    timeouts: execution_options.timeouts,
    headers: execution_options.headers
  )

  target, cancelation_id = schedule_decision(decision)
  future = Future.new(target, self, cancelation_id: cancelation_id)

  dispatcher.register_handler(target, 'completed') do |result|
    future.set(result)
    future.callbacks.each { |callback| call_in_fiber(callback, result) }
  end

  dispatcher.register_handler(target, 'failed') do |reason, details|
    future.fail(reason, details)
  end

  future
end
execute_activity!(activity_class, *input, **args) click to toggle source
# File lib/cadence/workflow/context.rb, line 70
def execute_activity!(activity_class, *input, **args)
  future = execute_activity(activity_class, *input, **args)
  result = future.get

  if future.failed?
    reason, details = result

    error_class = safe_constantize(reason) || Cadence::ActivityException

    raise error_class, details
  end

  result
end
execute_local_activity(activity_class, *input, **args) click to toggle source

TODO: how to handle failures?

# File lib/cadence/workflow/context.rb, line 86
def execute_local_activity(activity_class, *input, **args)
  input << args unless args.empty?

  side_effect do
    # TODO: this probably requires a local context implementation
    context = Activity::Context.new(nil, nil)
    activity_class.execute_in_context(context, input)
  end
end
execute_workflow(workflow_class, *input, **args) click to toggle source
# File lib/cadence/workflow/context.rb, line 96
def execute_workflow(workflow_class, *input, **args)
  options = args.delete(:options) || {}
  input << args unless args.empty?

  execution_options = ExecutionOptions.new(workflow_class, options)

  decision = Decision::StartChildWorkflow.new(
    workflow_id: options[:workflow_id] || SecureRandom.uuid,
    workflow_type: execution_options.name,
    input: input,
    domain: execution_options.domain,
    task_list: execution_options.task_list,
    retry_policy: execution_options.retry_policy,
    timeouts: execution_options.timeouts,
    headers: execution_options.headers
  )

  target, cancelation_id = schedule_decision(decision)
  future = Future.new(target, self, cancelation_id: cancelation_id)

  dispatcher.register_handler(target, 'completed') do |result|
    future.set(result)
    future.callbacks.each { |callback| call_in_fiber(callback, result) }
  end

  dispatcher.register_handler(target, 'failed') do |reason, details|
    future.fail(reason, details)
  end

  future
end
execute_workflow!(workflow_class, *input, **args) click to toggle source
# File lib/cadence/workflow/context.rb, line 128
def execute_workflow!(workflow_class, *input, **args)
  future = execute_workflow(workflow_class, *input, **args)
  result = future.get

  if future.failed?
    reason, details = result

    error_class = safe_constantize(reason) || StandardError.new(details)

    raise error_class, details
  end

  result
end
fail(reason, details = nil) click to toggle source

TODO: check if workflow can be failed

# File lib/cadence/workflow/context.rb, line 192
def fail(reason, details = nil)
  decision = Decision::FailWorkflow.new(reason: reason, details: details)
  schedule_decision(decision)
end
has_release?(release_name) click to toggle source
# File lib/cadence/workflow/context.rb, line 34
def has_release?(release_name)
  state_manager.release?(release_name.to_s)
end
headers() click to toggle source
# File lib/cadence/workflow/context.rb, line 30
def headers
  metadata.headers
end
logger() click to toggle source
# File lib/cadence/workflow/context.rb, line 24
def logger
  @logger ||= ReplayAwareLogger.new(Cadence.logger)
  @logger.replay = state_manager.replay?
  @logger
end
now() click to toggle source
# File lib/cadence/workflow/context.rb, line 215
def now
  state_manager.local_time
end
on_signal(&block) click to toggle source
# File lib/cadence/workflow/context.rb, line 219
def on_signal(&block)
  target = History::EventTarget.workflow

  dispatcher.register_handler(target, 'signaled') do |signal, input|
    call_in_fiber(block, signal, input)
  end
end
side_effect(&block) click to toggle source
# File lib/cadence/workflow/context.rb, line 143
def side_effect(&block)
  marker = state_manager.next_side_effect
  return marker.last if marker

  result = block.call
  decision = Decision::RecordMarker.new(name: StateManager::SIDE_EFFECT_MARKER, details: result)
  schedule_decision(decision)

  result
end
sleep(timeout) click to toggle source
# File lib/cadence/workflow/context.rb, line 154
def sleep(timeout)
  start_timer(timeout).wait
end
sleep_until(end_time) click to toggle source
# File lib/cadence/workflow/context.rb, line 158
def sleep_until(end_time)
  delay = (end_time.to_time - now).to_i
  sleep(delay) if delay > 0
end
start_timer(timeout, timer_id = nil) click to toggle source
# File lib/cadence/workflow/context.rb, line 163
def start_timer(timeout, timer_id = nil)
  decision = Decision::StartTimer.new(timeout: timeout, timer_id: timer_id)
  target, cancelation_id = schedule_decision(decision)
  future = Future.new(target, self, cancelation_id: cancelation_id)

  dispatcher.register_handler(target, 'fired') do |result|
    future.set(result)
    future.callbacks.each { |callback| call_in_fiber(callback, result) }
  end

  dispatcher.register_handler(target, 'canceled') do |reason, details|
    future.fail(reason, details)
  end

  future
end
wait_for(future) click to toggle source
# File lib/cadence/workflow/context.rb, line 203
def wait_for(future)
  fiber = Fiber.current

  dispatcher.register_handler(future.target, Dispatcher::WILDCARD) do
    fiber.resume if future.finished?
  end

  Fiber.yield

  return
end
wait_for_all(*futures) click to toggle source
# File lib/cadence/workflow/context.rb, line 197
def wait_for_all(*futures)
  futures.each(&:wait)

  return
end

Private Instance Methods

call_in_fiber(block, *args) click to toggle source
# File lib/cadence/workflow/context.rb, line 252
def call_in_fiber(block, *args)
  Fiber.new do
    Cadence::ThreadLocalContext.set(self)
    block.call(*args)
  end.resume
end
safe_constantize(const) click to toggle source
# File lib/cadence/workflow/context.rb, line 259
def safe_constantize(const)
  Object.const_get(const) if Object.const_defined?(const)
rescue NameError
  nil
end
schedule_decision(decision) click to toggle source
# File lib/cadence/workflow/context.rb, line 248
def schedule_decision(decision)
  state_manager.schedule(decision)
end