class Cadence::Workflow::Poller
Constants
- DEFAULT_OPTIONS
Attributes
client[R]
domain[R]
middleware[R]
options[R]
task_list[R]
workflow_lookup[R]
Public Class Methods
new(domain, task_list, workflow_lookup, middleware = [], options = {})
click to toggle source
# File lib/cadence/workflow/poller.rb, line 13 def initialize(domain, task_list, workflow_lookup, middleware = [], options = {}) @domain = domain @task_list = task_list @workflow_lookup = workflow_lookup @middleware = middleware @options = DEFAULT_OPTIONS.merge(options) @shutting_down = false end
Public Instance Methods
start()
click to toggle source
# File lib/cadence/workflow/poller.rb, line 22 def start @shutting_down = false @thread = Thread.new(&method(:poll_loop)) end
stop()
click to toggle source
# File lib/cadence/workflow/poller.rb, line 27 def stop @shutting_down = true Cadence.logger.info('Shutting down a workflow poller') end
wait()
click to toggle source
# File lib/cadence/workflow/poller.rb, line 32 def wait @thread.join thread_pool.shutdown end
Private Instance Methods
poll_for_task()
click to toggle source
# File lib/cadence/workflow/poller.rb, line 70 def poll_for_task client.poll_for_decision_task(domain: domain, task_list: task_list) rescue StandardError => error Cadence.logger.error("Unable to poll for a decision task: #{error.inspect}") nil end
poll_loop()
click to toggle source
# File lib/cadence/workflow/poller.rb, line 49 def poll_loop last_poll_time = Time.now metrics_tags = { domain: domain, task_list: task_list }.freeze loop do thread_pool.wait_for_available_threads return if shutting_down? time_diff_ms = ((Time.now - last_poll_time) * 1000).round Cadence.metrics.timing('workflow_poller.time_since_last_poll', time_diff_ms, metrics_tags) Cadence.logger.debug("Polling for decision tasks (#{domain} / #{task_list})") task = poll_for_task last_poll_time = Time.now next unless task&.workflowType thread_pool.schedule { process(task) } end end
process(task)
click to toggle source
# File lib/cadence/workflow/poller.rb, line 77 def process(task) client = Cadence::Client.generate middleware_chain = Middleware::Chain.new(middleware) DecisionTaskProcessor.new(task, domain, workflow_lookup, client, middleware_chain).process end
shutting_down?()
click to toggle source
# File lib/cadence/workflow/poller.rb, line 45 def shutting_down? @shutting_down end
thread_pool()
click to toggle source
# File lib/cadence/workflow/poller.rb, line 84 def thread_pool @thread_pool ||= ThreadPool.new(options[:thread_pool_size]) end