class Conductor::Coordinator

Attributes

max_thread_count[RW]
polling_timers[RW]
workers[RW]

Public Class Methods

new(workers, max_thread_count: 5) click to toggle source

Create a new Coordinator for a certain set of Workers. A Worker is an implementation of the Worker Interface for a specific task. Conductor::Coordinator.new([Conductor::Worker.new('matt-1'), Conductor::Worker.new('matt-2')])

# File lib/nf-conductor/coordinator/coordinator.rb, line 11
def initialize(workers, max_thread_count: 5)
  self.workers = workers
  self.polling_timers = []
  self.max_thread_count = max_thread_count
end

Public Instance Methods

poll_for_task(worker) click to toggle source

Executed once every x seconds based on the parent polling_timer. Batch polls the Conductor task queue for the given worker and task type, and executes as many tasks concurrently as possible, using a CachedThreadPool ruby-concurrency.github.io/concurrent-ruby/file.thread_pools.html

# File lib/nf-conductor/coordinator/coordinator.rb, line 43
def poll_for_task(worker)
  # TODO bulk poll for task, concurrently, up to size of queue
  tasks = [Conductor::Tasks.poll_task(worker.task_type)]
  tasks.each do |task|
    next if task[:status] != 200
    process_task(worker, task[:body])
  end
rescue => e
  Rails.logger.debug("Conductor::Coordinator : Failed to poll worker (#{worker.task_type}) with error #{e.message}")
end
process_task(worker, task) click to toggle source

Acknowledges the Task in Conductor, then passes the Task to the Worker to execute. Update the Task in Conductor with status and output data.

# File lib/nf-conductor/coordinator/coordinator.rb, line 56
def process_task(worker, task)
  Rails.logger.info("Conductor::Coordinator : Processing task #{task}") if Conductor.config.verbose

  task_identifiers = {
    taskId: task[:taskId],
    workflowInstanceId: task[:workflowInstanceId]
  }

  # Acknowledge the task, so other pollers will not be able to see the task in Conductor's queues
  Conductor::Tasks.acknowledge_task(task[:taskId])

  # Execute the task with the implementing application's worker
  result = worker.execute(task)
  task_body = result.merge!(task_identifiers)

  # Update Conductor about the result of the task
  update_task_with_retry(task_body, 0)
rescue => e
  Rails.logger.debug("Conductor::Coordinator : Failed to process task (#{task}) with error #{e.message} at location #{e.backtrace}")
  update_task_with_retry({ status: 'FAILED' }.merge(task_identifiers), 0)
end
run(execution_interval=15) click to toggle source

Creates and executes a TimerTask for each Worker that the Coordinator has been instantiated with. ruby-concurrency.github.io/concurrent-ruby/Concurrent/TimerTask.html

# File lib/nf-conductor/coordinator/coordinator.rb, line 19
def run(execution_interval=15)
  self.workers.each do |worker|
    polling_timer = Concurrent::TimerTask.new(execution_interval: execution_interval) do
      Rails.logger.info("Conductor::Coordinator : Worker (#{worker.task_type}) polling...") if Conductor.config.verbose
      poll_for_task(worker)
    end

    self.polling_timers << polling_timer
    polling_timer.execute
  end
end
stop() click to toggle source

Shuts down all polling_timers for the Coordinator. Workers will no longer poll for new Tasks.

# File lib/nf-conductor/coordinator/coordinator.rb, line 32
def stop
  self.polling_timers.each do |polling_timer|
    polling_timer.shutdown
  end
  self.polling_timers = []
end
update_task_with_retry(task_body, count) click to toggle source
# File lib/nf-conductor/coordinator/coordinator.rb, line 78
def update_task_with_retry(task_body, count)
  # Put this in a retryable block instead
  begin
    return if count >= 3
    Conductor::Tasks.update_task(task_body)
  rescue
    update_task_with_retry(task_body, count+1)
  end
end