class Krane::RunnerTask

Run a pod that exits upon completing a task

Attributes

pod_name[R]

Public Class Methods

new(namespace:, context:, logger: nil, max_watch_seconds: nil) click to toggle source

Initializes the runner task

@param namespace [String] Kubernetes namespace @param context [String] Kubernetes context / cluster @param logger [Object] Logger object (defaults to an instance of Krane::FormattedLogger) @param max_watch_seconds [Integer] Timeout in seconds

# File lib/krane/runner_task.rb, line 26
def initialize(namespace:, context:, logger: nil, max_watch_seconds: nil)
  @logger = logger || Krane::FormattedLogger.build(namespace, context)
  @task_config = Krane::TaskConfig.new(context, namespace, @logger)
  @namespace = namespace
  @context = context
  @max_watch_seconds = max_watch_seconds
end

Public Instance Methods

run(*args) click to toggle source

Runs the task, returning a boolean representing success or failure

@return [Boolean]

# File lib/krane/runner_task.rb, line 37
def run(*args)
  run!(*args)
  true
rescue DeploymentTimeoutError, FatalDeploymentError
  false
end
run!(task_template:, entrypoint:, args:, env_vars: [], verify_result: true) click to toggle source

Runs the task, raising exceptions in case of issues

@param task_template [String] The template file you'll be rendering @param entrypoint [Array<String>] Override the default command in the container image @param args [Array<String>] Override the default arguments for the command @param env_vars [Array<String>] List of env vars @param verify_result [Boolean] Wait for completion and verify pod success

@return [nil]

# File lib/krane/runner_task.rb, line 53
def run!(task_template:, entrypoint:, args:, env_vars: [], verify_result: true)
  start = Time.now.utc
  @logger.reset

  @logger.phase_heading("Initializing task")

  @logger.info("Validating configuration")
  verify_config!(task_template, args)
  @logger.info("Using namespace '#{@namespace}' in context '#{@context}'")

  pod = build_pod(task_template, entrypoint, args, env_vars, verify_result)
  validate_pod(pod)

  @logger.phase_heading("Running pod")
  create_pod(pod)

  if verify_result
    @logger.phase_heading("Streaming logs")
    watch_pod(pod)
  else
    record_status_once(pod)
  end
  StatsD.client.distribution('task_runner.duration', StatsD.duration(start), tags: statsd_tags('success'))
  @logger.print_summary(:success)
rescue DeploymentTimeoutError
  StatsD.client.distribution('task_runner.duration', StatsD.duration(start), tags: statsd_tags('timeout'))
  @logger.print_summary(:timed_out)
  raise
rescue FatalDeploymentError
  StatsD.client.distribution('task_runner.duration', StatsD.duration(start), tags: statsd_tags('failure'))
  @logger.print_summary(:failure)
  raise
end

Private Instance Methods

build_pod(template_name, entrypoint, args, env_vars, verify_result) click to toggle source
# File lib/krane/runner_task.rb, line 101
def build_pod(template_name, entrypoint, args, env_vars, verify_result)
  task_template = get_template(template_name)
  @logger.info("Using template '#{template_name}'")
  pod_template = build_pod_definition(task_template)
  set_container_overrides!(pod_template, entrypoint, args, env_vars)
  ensure_valid_restart_policy!(pod_template, verify_result)
  Pod.new(namespace: @namespace, context: @context, logger: @logger, stream_logs: true,
                definition: pod_template.to_hash.deep_stringify_keys, statsd_tags: [])
end
build_pod_definition(base_template) click to toggle source
# File lib/krane/runner_task.rb, line 155
def build_pod_definition(base_template)
  pod_definition = base_template.dup
  pod_definition.kind = 'Pod'
  pod_definition.apiVersion = 'v1'
  pod_definition.metadata.namespace = @namespace

  unique_name = pod_definition.metadata.name + "-" + SecureRandom.hex(8)
  @logger.warn("Name is too long, using '#{unique_name[0..62]}'") if unique_name.length > 63
  pod_definition.metadata.name = unique_name[0..62]

  pod_definition
end
create_pod(pod) click to toggle source
# File lib/krane/runner_task.rb, line 89
def create_pod(pod)
  @logger.info("Creating pod '#{pod.name}'")
  pod.deploy_started_at = Time.now.utc
  kubeclient.create_pod(pod.to_kubeclient_resource)
  @pod_name = pod.name
  @logger.info("Pod creation succeeded")
rescue Kubeclient::HttpError => e
  msg = "Failed to create pod: #{e.class.name}: #{e.message}"
  @logger.summary.add_paragraph(msg)
  raise FatalDeploymentError, msg
end
ensure_valid_restart_policy!(template, verify) click to toggle source
# File lib/krane/runner_task.rb, line 187
def ensure_valid_restart_policy!(template, verify)
  restart_policy = template.spec.restartPolicy
  if verify && restart_policy != "Never"
    @logger.warn("Changed Pod RestartPolicy from '#{restart_policy}' to 'Never'. Disable "\
      "result verification to use '#{restart_policy}'.")
    template.spec.restartPolicy = "Never"
  end
end
get_template(template_name) click to toggle source
# File lib/krane/runner_task.rb, line 144
def get_template(template_name)
  pod_template = kubeclient.get_pod_template(template_name, @namespace)
  pod_template.template
rescue Kubeclient::ResourceNotFoundError
  msg = "Pod template `#{template_name}` not found in namespace `#{@namespace}`, context `#{@context}`"
  @logger.summary.add_paragraph(msg)
  raise TaskTemplateMissingError, msg
rescue Kubeclient::HttpError => error
  raise FatalKubeAPIError, "Error retrieving pod template: #{error.class.name}: #{error.message}"
end
kubeclient() click to toggle source
# File lib/krane/runner_task.rb, line 200
def kubeclient
  @kubeclient ||= kubeclient_builder.build_v1_kubeclient(@context)
end
kubeclient_builder() click to toggle source
# File lib/krane/runner_task.rb, line 204
def kubeclient_builder
  @kubeclient_builder ||= KubeclientBuilder.new
end
kubectl() click to toggle source
# File lib/krane/runner_task.rb, line 196
def kubectl
  @kubectl ||= Kubectl.new(task_config: @task_config, log_failure_by_default: true)
end
record_status_once(pod) click to toggle source
# File lib/krane/runner_task.rb, line 123
    def record_status_once(pod)
      cache = ResourceCache.new(@task_config)
      pod.sync(cache)
      warning = <<~STRING
        #{ColorizedString.new('Result verification is disabled for this task.').yellow}
        The following status was observed immediately after pod creation:
        #{pod.pretty_status}
      STRING
      @logger.summary.add_paragraph(warning)
    end
set_container_overrides!(pod_definition, entrypoint, args, env_vars) click to toggle source
# File lib/krane/runner_task.rb, line 168
def set_container_overrides!(pod_definition, entrypoint, args, env_vars)
  container = pod_definition.spec.containers.find { |cont| cont.name == 'task-runner' }
  if container.nil?
    message = "Pod spec does not contain a template container called 'task-runner'"
    @logger.summary.add_paragraph(message)
    raise TaskConfigurationError, message
  end

  container.command = entrypoint if entrypoint
  container.args = args if args

  env_args = env_vars.map do |env|
    key, value = env.split('=', 2)
    { name: key, value: value }
  end
  container.env ||= []
  container.env = container.env.map(&:to_h) + env_args
end
statsd_tags(status) click to toggle source
# File lib/krane/runner_task.rb, line 208
def statsd_tags(status)
  %W(namespace:#{@namespace} context:#{@context} status:#{status})
end
validate_pod(pod) click to toggle source
# File lib/krane/runner_task.rb, line 111
def validate_pod(pod)
  pod.validate_definition(kubectl)
end
verify_config!(task_template, args) click to toggle source
# File lib/krane/runner_task.rb, line 134
def verify_config!(task_template, args)
  task_config_validator = RunnerTaskConfigValidator.new(task_template, args, @task_config, kubectl,
    kubeclient_builder)
  unless task_config_validator.valid?
    @logger.summary.add_action("Configuration invalid")
    @logger.summary.add_paragraph([task_config_validator.errors].map { |err| "- #{err}" }.join("\n"))
    raise Krane::TaskConfigurationError
  end
end
watch_pod(pod) click to toggle source
# File lib/krane/runner_task.rb, line 115
def watch_pod(pod)
  rw = ResourceWatcher.new(resources: [pod], timeout: @max_watch_seconds,
    operation_name: "run", task_config: @task_config)
  rw.run(delay_sync: 1, reminder_interval: 30.seconds)
  raise DeploymentTimeoutError if pod.deploy_timed_out?
  raise FatalDeploymentError if pod.deploy_failed?
end