class Barbeque::Executor::Hako

Attributes

hako_s3_client[R]

Public Class Methods

new(hako_dir:, hako_env: {}, yaml_dir: nil, definition_dir: nil, oneshot_notification_prefix:) click to toggle source

@param [String] hako_dir @param [Hash] hako_env @param [String] definition_dir @param [String] yaml_dir (deprecated: renamed to definition_dir)

# File lib/barbeque/executor/hako.rb, line 19
def initialize(hako_dir:, hako_env: {}, yaml_dir: nil, definition_dir: nil, oneshot_notification_prefix:)
  @hako_dir = hako_dir
  @hako_env = hako_env
  @definition_dir =
    if definition_dir
      definition_dir
    elsif yaml_dir
      warn 'yaml_dir option is renamed to definition_dir. Please update config/barbeque.yml'
      yaml_dir
    else
      raise ArgumentError.new('definition_dir is required')
    end
  @hako_s3_client = HakoS3Client.new(oneshot_notification_prefix)
end

Public Instance Methods

poll_execution(job_execution) click to toggle source

@param [Barbeque::JobExecution] job_execution

# File lib/barbeque/executor/hako.rb, line 80
def poll_execution(job_execution)
  hako_task = Barbeque::EcsHakoTask.find_by!(message_id: job_execution.message_id)
  task = @hako_s3_client.get_stopped_result(hako_task)
  if task
    status = :failed
    task.containers.each do |container|
      if container.name == 'app'
        status = container.exit_code == 0 ? :success : :failed
      end
    end
    job_execution.update!(status: status, finished_at: task.stopped_at)
    Barbeque::SlackNotifier.notify_job_execution(job_execution)
    if status == :failed
      job_execution.retry_if_possible!
    end
  end
end
poll_retry(job_retry) click to toggle source

@param [Barbeque::JobRetry] job_execution

# File lib/barbeque/executor/hako.rb, line 99
def poll_retry(job_retry)
  hako_task = Barbeque::EcsHakoTask.find_by!(message_id: job_retry.message_id)
  job_execution = job_retry.job_execution
  task = @hako_s3_client.get_stopped_result(hako_task)
  if task
    status = :failed
    task.containers.each do |container|
      if container.name == 'app'
        status = container.exit_code == 0 ? :success : :failed
      end
    end
    Barbeque::ApplicationRecord.transaction do
      job_retry.update!(status: status, finished_at: task.stopped_at)
      job_execution.update!(status: status)
    end
    Barbeque::SlackNotifier.notify_job_retry(job_retry)
    if status == :failed
      job_execution.retry_if_possible!
    end
  end
end
start_execution(job_execution, envs) click to toggle source

@param [Barbeque::JobExecution] job_execution @param [Hash] envs

# File lib/barbeque/executor/hako.rb, line 36
def start_execution(job_execution, envs)
  docker_image = DockerImage.new(job_execution.job_definition.app.docker_image)
  cmd = build_hako_oneshot_command(docker_image, job_execution.job_definition.command, envs)
  stdout, stderr, status = Bundler.with_clean_env { Open3.capture3(@hako_env, *cmd, chdir: @hako_dir) }
  if status.success?
    cluster, task_arn = extract_task_info(stdout)
    Barbeque::EcsHakoTask.create!(message_id: job_execution.message_id, cluster: cluster, task_arn: task_arn)
    Barbeque::ExecutionLog.try_save_stdout_and_stderr(job_execution, stdout, stderr)
    job_execution.update!(status: :running)
  else
    Barbeque::ExecutionLog.try_save_stdout_and_stderr(job_execution, stdout, stderr)
    job_execution.update!(status: :failed, finished_at: Time.zone.now)
    Barbeque::SlackNotifier.notify_job_execution(job_execution)
    job_execution.retry_if_possible!
  end
end
start_retry(job_retry, envs) click to toggle source

@param [Barbeque::JobRetry] job_retry @param [Hash] envs

# File lib/barbeque/executor/hako.rb, line 55
def start_retry(job_retry, envs)
  job_execution = job_retry.job_execution
  docker_image = DockerImage.new(job_execution.job_definition.app.docker_image)
  cmd = build_hako_oneshot_command(docker_image, job_execution.job_definition.command, envs)
  stdout, stderr, status = Bundler.with_clean_env { Open3.capture3(@hako_env, *cmd, chdir: @hako_dir) }
  if status.success?
    cluster, task_arn = extract_task_info(stdout)
    Barbeque::EcsHakoTask.create!(message_id: job_retry.message_id, cluster: cluster, task_arn: task_arn)
    Barbeque::ExecutionLog.try_save_stdout_and_stderr(job_retry, stdout, stderr)
    Barbeque::ApplicationRecord.transaction do
      job_execution.update!(status: :retried)
      job_retry.update!(status: :running)
    end
  else
    Barbeque::ExecutionLog.try_save_stdout_and_stderr(job_retry, stdout, stderr)
    Barbeque::ApplicationRecord.transaction do
      job_retry.update!(status: :failed, finished_at: Time.zone.now)
      job_execution.update!(status: :failed)
    end
    Barbeque::SlackNotifier.notify_job_retry(job_retry)
    job_execution.retry_if_possible!
  end
end

Private Instance Methods

build_hako_oneshot_command(docker_image, command, envs) click to toggle source
# File lib/barbeque/executor/hako.rb, line 123
def build_hako_oneshot_command(docker_image, command, envs)
  jsonnet_path = File.join(@definition_dir, "#{docker_image.repository}.jsonnet")
  yaml_path = File.join(@definition_dir, "#{docker_image.repository}.yml")

  cmd = ['bundle', 'exec', 'hako', 'oneshot', '--no-wait', '--tag', docker_image.tag, *env_options(envs)]
  if File.readable?(jsonnet_path)
    cmd << jsonnet_path
  elsif File.readable?(yaml_path)
    cmd << yaml_path
  else
    raise HakoCommandError.new("No definition found matching '#{docker_image.repository}' in #{@definition_dir}")
  end
  cmd << '--'
  cmd.concat(command)
end
env_options(envs) click to toggle source
# File lib/barbeque/executor/hako.rb, line 139
def env_options(envs)
  envs.map do |key, value|
    "--env=#{key}=#{value}"
  end
end
extract_task_info(stdout) click to toggle source
# File lib/barbeque/executor/hako.rb, line 145
def extract_task_info(stdout)
  last_line = stdout.lines.last
  if last_line
    begin
      task_info = JSON.parse(last_line)
      cluster = task_info['cluster']
      task_arn = task_info['task_arn']
      if cluster && task_arn
        [cluster, task_arn]
      else
        raise HakoCommandError.new("Unable find cluster and task_arn in JSON: #{stdout}")
      end
    rescue JSON::ParserError => e
      raise HakoCommandError.new("Unable parse the last line as JSON: #{stdout}")
    end
  else
    raise HakoCommandError.new('stdout is empty')
  end
end