class Bosh::Director::JobRunner

Public Class Methods

new(job_class, task_id) click to toggle source

@param [Class] job_class Job class to instantiate and run @param [Integer] task_id Existing task id

# File lib/bosh/director/job_runner.rb, line 8
def initialize(job_class, task_id)
  unless job_class.kind_of?(Class) &&
    job_class <= Jobs::BaseJob
    raise DirectorError, "Invalid director job class '#{job_class}'"
  end

  @task_id = task_id
  setup_task_logging

  task_manager = Bosh::Director::Api::TaskManager.new

  @job_class = job_class
  @task_logger.info("Looking for task with task id #{@task_id}")
  @task = task_manager.find_task(@task_id)
  @task_logger.info("Found task #{@task.inspect}")
end

Public Instance Methods

run(*args) click to toggle source

Runs director job

# File lib/bosh/director/job_runner.rb, line 26
def run(*args)
  Config.current_job = nil

  @task_logger.info("Starting task: #{@task_id}")
  started_at = Time.now

  with_thread_name("task:#{@task_id}") { perform_job(*args) }

  duration = Duration.duration(Time.now - started_at)
  @task_logger.info("Task took #{duration} to process.")
end

Private Instance Methods

finish_task(state, result) click to toggle source

Marks task completion @param [Symbol] state Task completion state @param [#to_s] result

# File lib/bosh/director/job_runner.rb, line 144
def finish_task(state, result)
  @task.state = state
  @task.result = truncate(result.to_s)
  @task.timestamp = Time.now
  @task.save
end
log_exception(exception) click to toggle source

Logs the exception in the event log @param [Exception] exception

# File lib/bosh/director/job_runner.rb, line 153
def log_exception(exception)
  # Event log is being used here to propagate the error.
  # It's up to event log renderer to find the error and
  # signal it properly.
  director_error = DirectorError.create_from_exception(exception)
  Config.event_log.log_error(director_error)
end
perform_job(*args) click to toggle source

Instantiates and performs director job. @param [Array] args Opaque list of job arguments that will be used to

instantiate the new job object.

@return [void]

# File lib/bosh/director/job_runner.rb, line 75
def perform_job(*args)
  @task_logger.info('Creating job')

  job = @job_class.new(*args)
  Config.current_job = job

  job.task_id = @task_id
  job.task_checkpoint # cancelled in the queue?

  run_checkpointing

  @task_logger.info("Performing task: #{@task.inspect}")

  @task.state = :processing
  @task.timestamp = Time.now
  @task.started_at = Time.now
  @task.checkpoint_time = Time.now
  @task.save

  result = job.perform

  @task_logger.info('Done')
  finish_task(:done, result)

rescue Bosh::Director::TaskCancelled => e
  log_exception(e)
  @task_logger.info("Task #{@task.id} cancelled")
  finish_task(:cancelled, 'task cancelled')
rescue Exception => e
  log_exception(e)
  @task_logger.error("#{e}\n#{e.backtrace.join("\n")}")
  finish_task(:error, e)
end
run_checkpointing() click to toggle source

Spawns a thread that periodically updates task checkpoint time. There is no need to kill this thread as job execution lifetime is the same as worker process lifetime. @return [Thread] Checkpoint thread

# File lib/bosh/director/job_runner.rb, line 113
def run_checkpointing
  # task check pointer is scoped to separate class to avoid
  # the secondary thread and main thread modifying the same @task
  # variable (and accidentally clobbering it in the process)
  task_checkpointer = TaskCheckPointer.new(@task.id)
  Thread.new do
    with_thread_name("task:#{@task.id}-checkpoint") do
      while true
        sleep(Config.task_checkpoint_interval)
        task_checkpointer.checkpoint
      end
    end
  end
end
setup_task_logging() click to toggle source

Sets up job logging. @return [void]

# File lib/bosh/director/job_runner.rb, line 42
def setup_task_logging
  log_dir = File.join(Config.base_dir, 'tasks', @task_id.to_s)
  FileUtils.mkdir_p(log_dir)

  debug_log = File.join(log_dir, 'debug')
  event_log = File.join(log_dir, 'event')
  result_log = File.join(log_dir, 'result')

  @task_logger = Logging::Logger.new('DirectorJobRunner')
  shared_appender = Logging.appenders.file(
    'DirectorJobRunnerFile',
    filename: debug_log,
    layout: ThreadFormatter.layout
  )
  @task_logger.add_appenders(shared_appender)
  @task_logger.level = Config.logger.level

  Config.event_log = EventLog::Log.new(event_log)
  Config.result = TaskResultFile.new(result_log)
  Config.logger = @task_logger

  Config.db.logger = @task_logger
  Config.dns_db.logger = @task_logger if Config.dns_db

  cpi_log = File.join(log_dir, 'cpi')
  Config.cloud_options['properties'] ||= {}
  Config.cloud_options['properties']['cpi_log'] = cpi_log
end
truncate(string, len = 128) click to toggle source

Truncates string to fit task result length @param [String] string The original string @param [Integer] len Desired string length @return [String] Truncated string

# File lib/bosh/director/job_runner.rb, line 132
def truncate(string, len = 128)
  stripped = string.strip[0..len]
  if stripped.length > len
    stripped.gsub(/\s+?(\S+)?$/, "") + "..."
  else
    stripped
  end
end