class Cadence::Workflow::Poller

Constants

DEFAULT_OPTIONS

Attributes

client[R]
domain[R]
middleware[R]
options[R]
task_list[R]
workflow_lookup[R]

Public Class Methods

new(domain, task_list, workflow_lookup, middleware = [], options = {}) click to toggle source
# File lib/cadence/workflow/poller.rb, line 13
def initialize(domain, task_list, workflow_lookup, middleware = [], options = {})
  @domain = domain
  @task_list = task_list
  @workflow_lookup = workflow_lookup
  @middleware = middleware
  @options = DEFAULT_OPTIONS.merge(options)
  @shutting_down = false
end

Public Instance Methods

start() click to toggle source
# File lib/cadence/workflow/poller.rb, line 22
def start
  @shutting_down = false
  @thread = Thread.new(&method(:poll_loop))
end
stop() click to toggle source
# File lib/cadence/workflow/poller.rb, line 27
def stop
  @shutting_down = true
  Cadence.logger.info('Shutting down a workflow poller')
end
wait() click to toggle source
# File lib/cadence/workflow/poller.rb, line 32
def wait
  @thread.join
  thread_pool.shutdown
end

Private Instance Methods

poll_for_task() click to toggle source
# File lib/cadence/workflow/poller.rb, line 70
def poll_for_task
  client.poll_for_decision_task(domain: domain, task_list: task_list)
rescue StandardError => error
  Cadence.logger.error("Unable to poll for a decision task: #{error.inspect}")
  nil
end
poll_loop() click to toggle source
# File lib/cadence/workflow/poller.rb, line 49
def poll_loop
  last_poll_time = Time.now
  metrics_tags = { domain: domain, task_list: task_list }.freeze

  loop do
    thread_pool.wait_for_available_threads

    return if shutting_down?

    time_diff_ms = ((Time.now - last_poll_time) * 1000).round
    Cadence.metrics.timing('workflow_poller.time_since_last_poll', time_diff_ms, metrics_tags)
    Cadence.logger.debug("Polling for decision tasks (#{domain} / #{task_list})")

    task = poll_for_task
    last_poll_time = Time.now
    next unless task&.workflowType

    thread_pool.schedule { process(task) }
  end
end
process(task) click to toggle source
# File lib/cadence/workflow/poller.rb, line 77
def process(task)
  client = Cadence::Client.generate
  middleware_chain = Middleware::Chain.new(middleware)

  DecisionTaskProcessor.new(task, domain, workflow_lookup, client, middleware_chain).process
end
shutting_down?() click to toggle source
# File lib/cadence/workflow/poller.rb, line 45
def shutting_down?
  @shutting_down
end
thread_pool() click to toggle source
# File lib/cadence/workflow/poller.rb, line 84
def thread_pool
  @thread_pool ||= ThreadPool.new(options[:thread_pool_size])
end