class Conductor::Coordinator
Attributes
Public Class Methods
Create a new Coordinator
for a certain set of Workers. A Worker
is an implementation of the Worker
Interface for a specific task. Conductor::Coordinator.new
([Conductor::Worker.new('matt-1'), Conductor::Worker.new
('matt-2')])
# File lib/nf-conductor/coordinator/coordinator.rb, line 11 def initialize(workers, max_thread_count: 5) self.workers = workers self.polling_timers = [] self.max_thread_count = max_thread_count end
Public Instance Methods
Executed once every x seconds based on the parent polling_timer. Batch polls the Conductor
task queue for the given worker and task type, and executes as many tasks concurrently as possible, using a CachedThreadPool ruby-concurrency.github.io/concurrent-ruby/file.thread_pools.html
# File lib/nf-conductor/coordinator/coordinator.rb, line 43 def poll_for_task(worker) # TODO bulk poll for task, concurrently, up to size of queue tasks = [Conductor::Tasks.poll_task(worker.task_type)] tasks.each do |task| next if task[:status] != 200 process_task(worker, task[:body]) end rescue => e Rails.logger.debug("Conductor::Coordinator : Failed to poll worker (#{worker.task_type}) with error #{e.message}") end
Acknowledges the Task in Conductor
, then passes the Task to the Worker
to execute. Update the Task in Conductor
with status and output data.
# File lib/nf-conductor/coordinator/coordinator.rb, line 56 def process_task(worker, task) Rails.logger.info("Conductor::Coordinator : Processing task #{task}") if Conductor.config.verbose task_identifiers = { taskId: task[:taskId], workflowInstanceId: task[:workflowInstanceId] } # Acknowledge the task, so other pollers will not be able to see the task in Conductor's queues Conductor::Tasks.acknowledge_task(task[:taskId]) # Execute the task with the implementing application's worker result = worker.execute(task) task_body = result.merge!(task_identifiers) # Update Conductor about the result of the task update_task_with_retry(task_body, 0) rescue => e Rails.logger.debug("Conductor::Coordinator : Failed to process task (#{task}) with error #{e.message} at location #{e.backtrace}") update_task_with_retry({ status: 'FAILED' }.merge(task_identifiers), 0) end
Creates and executes a TimerTask for each Worker
that the Coordinator
has been instantiated with. ruby-concurrency.github.io/concurrent-ruby/Concurrent/TimerTask.html
# File lib/nf-conductor/coordinator/coordinator.rb, line 19 def run(execution_interval=15) self.workers.each do |worker| polling_timer = Concurrent::TimerTask.new(execution_interval: execution_interval) do Rails.logger.info("Conductor::Coordinator : Worker (#{worker.task_type}) polling...") if Conductor.config.verbose poll_for_task(worker) end self.polling_timers << polling_timer polling_timer.execute end end
Shuts down all polling_timers
for the Coordinator
. Workers will no longer poll for new Tasks
.
# File lib/nf-conductor/coordinator/coordinator.rb, line 32 def stop self.polling_timers.each do |polling_timer| polling_timer.shutdown end self.polling_timers = [] end
# File lib/nf-conductor/coordinator/coordinator.rb, line 78 def update_task_with_retry(task_body, count) # Put this in a retryable block instead begin return if count >= 3 Conductor::Tasks.update_task(task_body) rescue update_task_with_retry(task_body, count+1) end end