class Barbeque::Executor::Hako
Attributes
hako_s3_client[R]
Public Class Methods
new(hako_dir:, hako_env: {}, yaml_dir: nil, definition_dir: nil, oneshot_notification_prefix:)
click to toggle source
@param [String] hako_dir @param [Hash] hako_env @param [String] definition_dir @param [String] yaml_dir (deprecated: renamed to definition_dir)
# File lib/barbeque/executor/hako.rb, line 19 def initialize(hako_dir:, hako_env: {}, yaml_dir: nil, definition_dir: nil, oneshot_notification_prefix:) @hako_dir = hako_dir @hako_env = hako_env @definition_dir = if definition_dir definition_dir elsif yaml_dir warn 'yaml_dir option is renamed to definition_dir. Please update config/barbeque.yml' yaml_dir else raise ArgumentError.new('definition_dir is required') end @hako_s3_client = HakoS3Client.new(oneshot_notification_prefix) end
Public Instance Methods
poll_execution(job_execution)
click to toggle source
@param [Barbeque::JobExecution] job_execution
# File lib/barbeque/executor/hako.rb, line 80 def poll_execution(job_execution) hako_task = Barbeque::EcsHakoTask.find_by!(message_id: job_execution.message_id) task = @hako_s3_client.get_stopped_result(hako_task) if task status = :failed task.containers.each do |container| if container.name == 'app' status = container.exit_code == 0 ? :success : :failed end end job_execution.update!(status: status, finished_at: task.stopped_at) Barbeque::SlackNotifier.notify_job_execution(job_execution) if status == :failed job_execution.retry_if_possible! end end end
poll_retry(job_retry)
click to toggle source
@param [Barbeque::JobRetry] job_execution
# File lib/barbeque/executor/hako.rb, line 99 def poll_retry(job_retry) hako_task = Barbeque::EcsHakoTask.find_by!(message_id: job_retry.message_id) job_execution = job_retry.job_execution task = @hako_s3_client.get_stopped_result(hako_task) if task status = :failed task.containers.each do |container| if container.name == 'app' status = container.exit_code == 0 ? :success : :failed end end Barbeque::ApplicationRecord.transaction do job_retry.update!(status: status, finished_at: task.stopped_at) job_execution.update!(status: status) end Barbeque::SlackNotifier.notify_job_retry(job_retry) if status == :failed job_execution.retry_if_possible! end end end
start_execution(job_execution, envs)
click to toggle source
@param [Barbeque::JobExecution] job_execution @param [Hash] envs
# File lib/barbeque/executor/hako.rb, line 36 def start_execution(job_execution, envs) docker_image = DockerImage.new(job_execution.job_definition.app.docker_image) cmd = build_hako_oneshot_command(docker_image, job_execution.job_definition.command, envs) stdout, stderr, status = Bundler.with_clean_env { Open3.capture3(@hako_env, *cmd, chdir: @hako_dir) } if status.success? cluster, task_arn = extract_task_info(stdout) Barbeque::EcsHakoTask.create!(message_id: job_execution.message_id, cluster: cluster, task_arn: task_arn) Barbeque::ExecutionLog.try_save_stdout_and_stderr(job_execution, stdout, stderr) job_execution.update!(status: :running) else Barbeque::ExecutionLog.try_save_stdout_and_stderr(job_execution, stdout, stderr) job_execution.update!(status: :failed, finished_at: Time.zone.now) Barbeque::SlackNotifier.notify_job_execution(job_execution) job_execution.retry_if_possible! end end
start_retry(job_retry, envs)
click to toggle source
@param [Barbeque::JobRetry] job_retry @param [Hash] envs
# File lib/barbeque/executor/hako.rb, line 55 def start_retry(job_retry, envs) job_execution = job_retry.job_execution docker_image = DockerImage.new(job_execution.job_definition.app.docker_image) cmd = build_hako_oneshot_command(docker_image, job_execution.job_definition.command, envs) stdout, stderr, status = Bundler.with_clean_env { Open3.capture3(@hako_env, *cmd, chdir: @hako_dir) } if status.success? cluster, task_arn = extract_task_info(stdout) Barbeque::EcsHakoTask.create!(message_id: job_retry.message_id, cluster: cluster, task_arn: task_arn) Barbeque::ExecutionLog.try_save_stdout_and_stderr(job_retry, stdout, stderr) Barbeque::ApplicationRecord.transaction do job_execution.update!(status: :retried) job_retry.update!(status: :running) end else Barbeque::ExecutionLog.try_save_stdout_and_stderr(job_retry, stdout, stderr) Barbeque::ApplicationRecord.transaction do job_retry.update!(status: :failed, finished_at: Time.zone.now) job_execution.update!(status: :failed) end Barbeque::SlackNotifier.notify_job_retry(job_retry) job_execution.retry_if_possible! end end
Private Instance Methods
build_hako_oneshot_command(docker_image, command, envs)
click to toggle source
# File lib/barbeque/executor/hako.rb, line 123 def build_hako_oneshot_command(docker_image, command, envs) jsonnet_path = File.join(@definition_dir, "#{docker_image.repository}.jsonnet") yaml_path = File.join(@definition_dir, "#{docker_image.repository}.yml") cmd = ['bundle', 'exec', 'hako', 'oneshot', '--no-wait', '--tag', docker_image.tag, *env_options(envs)] if File.readable?(jsonnet_path) cmd << jsonnet_path elsif File.readable?(yaml_path) cmd << yaml_path else raise HakoCommandError.new("No definition found matching '#{docker_image.repository}' in #{@definition_dir}") end cmd << '--' cmd.concat(command) end
env_options(envs)
click to toggle source
# File lib/barbeque/executor/hako.rb, line 139 def env_options(envs) envs.map do |key, value| "--env=#{key}=#{value}" end end
extract_task_info(stdout)
click to toggle source
# File lib/barbeque/executor/hako.rb, line 145 def extract_task_info(stdout) last_line = stdout.lines.last if last_line begin task_info = JSON.parse(last_line) cluster = task_info['cluster'] task_arn = task_info['task_arn'] if cluster && task_arn [cluster, task_arn] else raise HakoCommandError.new("Unable find cluster and task_arn in JSON: #{stdout}") end rescue JSON::ParserError => e raise HakoCommandError.new("Unable parse the last line as JSON: #{stdout}") end else raise HakoCommandError.new('stdout is empty') end end