class Kuroko2::Command::Shell

Constants

MAX_OUTPUT_LENGTH
MAX_READ_LENGTH

Public Class Methods

new(hostname:, worker_id: 0, worker:, queue: Execution::DEFAULT_QUEUE) click to toggle source
# File lib/autoload/kuroko2/command/shell.rb, line 8
def initialize(hostname:, worker_id: 0, worker:, queue: Execution::DEFAULT_QUEUE)
  @hostname  = hostname
  @worker_id = worker_id
  @worker    = worker
  @queue     = queue
end

Public Instance Methods

execute() click to toggle source
# File lib/autoload/kuroko2/command/shell.rb, line 15
def execute
  @worker.reload
  return nil if @worker.suspended?
  unless @worker.execution_id?
    if (execution = Execution.poll(@queue))
      do_execute(execution)
      execution
    end
  end
rescue RuntimeError => e
  Kuroko2.logger.error("[#{@hostname}-#{@worker_id}] #{e.message}\n" +
    e.backtrace.map { |trace| "[#{@hostname}-#{@worker_id}]    #{trace}" }.join("\n"))

  nil
end

Private Instance Methods

do_execute(execution) click to toggle source
# File lib/autoload/kuroko2/command/shell.rb, line 33
def do_execute(execution)
  begin
    @worker.update_column(:execution_id, execution.id)
    execution.update(hostname: @hostname, worker_id: @worker_id)

    invoke(execution)
  rescue SystemCallError => e
    message = "[#{@hostname}-#{@worker_id}] (uuid #{execution.uuid}) `#{execution.shell}` failed because #{e.class}: #{e.message}"
    execution.token.job_instance.logs.warn(message)
    Kuroko2.logger.warn(message)

    output = truncate_and_escape(e.message)
    execution.finish(output: output, exit_status: e.errno)
  ensure
    @worker.update_column(:execution_id, nil)
  end
end
execute_shell(command, env, execution) click to toggle source
# File lib/autoload/kuroko2/command/shell.rb, line 77
def execute_shell(command, env, execution)
  opts = { unsetenv_others: true, pgroup: true }
  opts[:chdir] = real_path(execution.context['CHDIR']) if execution.context['CHDIR']

  launched_time       = execution.context['meta'].try(:[], 'launched_time').to_s
  job_definition_id   = execution.context['meta'].try(:[], 'job_definition_id').to_s
  job_definition_name = execution.context['meta'].try(:[], 'job_definition_name').to_s
  job_instance_id     = execution.context['meta'].try(:[], 'job_instance_id').to_s

  env.reverse_merge!(
    'HOME'                        => ENV['HOME'],
    'PATH'                        => ENV['PATH'],
    'LANG'                        => ENV['LANG'],
    'KUROKO2_LAUNCHED_TIME'       => launched_time,
    'KUROKO2_JOB_DEFINITION_ID'   => job_definition_id,
    'KUROKO2_JOB_DEFINITION_NAME' => job_definition_name,
    'KUROKO2_JOB_INSTANCE_ID'     => job_instance_id,
  )

  execution_logger = ExecutionLogger.get_logger(
    stream_name: "JOB#{sprintf("%010d", job_definition_id.to_i)}/#{execution.token.job_instance.id}",
  )

  temporally_path_with(env['PATH']) do
    Open3.popen2e(env, command, opts) do |stdin, stdout_and_stderr, thread|
      stdin.close

      pid = thread.pid
      execution.update_attributes(pid: pid)

      reader = Thread.new do
        begin
          output = ''
          stdout_and_stderr.each do |data|
            output << data

            begin
              execution_logger.send_log(
                {
                  uuid: execution.uuid,
                  pid: pid,
                  level: 'NOTICE',
                  message: truncate_and_escape(data.chomp),
                }
              )
            rescue => e
              Kuroko2.logger.error(
                "[#{@hostname}-#{@worker_id}] #{e.message}\n    " + e.backtrace.join("\n    "))
            end
          end
        rescue EOFError
          # do nothing
        ensure
          next output
        end
      end

      status = thread.value # wait until thread is dead
      output = reader.value

      [output, status]
    end
  end
end
invoke(execution) click to toggle source
# File lib/autoload/kuroko2/command/shell.rb, line 51
def invoke(execution)
  command = execution.shell
  env     = execution.context.fetch('ENV', {})

  message = "[#{@hostname}-#{@worker_id}] (uuid #{execution.uuid}) `#{command}` run with env (#{env})"
  execution.token.job_instance.logs.info(message)
  Kuroko2.logger.info(message)

  output, status = execute_shell(command, env, execution)
  output         = truncate_and_escape(output)

  if status.signaled?
    message = "[#{@hostname}-#{@worker_id}] (uuid #{execution.uuid}) `#{command}` stopped by #{Signal.signame(status.termsig)} signal (pid #{status.pid})"
    execution.token.job_instance.logs.warn(message)
    Kuroko2.logger.warn(message)

    execution.finish_by_signal(output: output, term_signal: status.termsig)
  else
    message = "[#{@hostname}-#{@worker_id}] (uuid #{execution.uuid}) `#{command}` finished with #{status.exitstatus} (pid #{status.pid})"
    execution.token.job_instance.logs.info(message)
    Kuroko2.logger.info(message)

    execution.finish(output: output, exit_status: status.exitstatus)
  end
end
real_path(path) click to toggle source
# File lib/autoload/kuroko2/command/shell.rb, line 142
def real_path(path)
  path = Pathname.new(path.sub(/\/\Z/, ''))
  Retryable.retryable(tries: 3, sleep: 0.5, on: [Errno::ENOENT]) do
    path.realpath
  end
end
temporally_path_with(path) { || ... } click to toggle source
# File lib/autoload/kuroko2/command/shell.rb, line 149
def temporally_path_with(path)
  original_path = ENV['PATH']

  ENV['PATH'] = path
  yield
ensure
  ENV['PATH'] = original_path
end
truncate_and_escape(str) click to toggle source
# File lib/autoload/kuroko2/command/shell.rb, line 158
def truncate_and_escape(str)
  str.force_encoding('utf-8')
  truncated = str.length > MAX_OUTPUT_LENGTH ? str[0...MAX_OUTPUT_LENGTH] : str
  truncated.scrub.each_char.select{ |c| c.bytes.count < 4 }.join('')
end