class Object

Constants

ELASTICSEARCH_SPARK_JAR
MAIN_CLASS

Constants


SPARK_EXECUTOR_URI
SPARK_SCALA_VERSION

Public Instance Methods

get_executor(state_response, task_id) click to toggle source
# File bin/adstax-spark-job-manager, line 97
def get_executor(state_response, task_id)
  target_executors = []
  state_response['completed_frameworks'].concat(state_response['frameworks']).each do |framework|
    framework['completed_executors'].concat(framework['executors']).each do |executor|
      target_executors.push(executor) if executor['id'] == task_id
    end
  end
  target_executors[0]
end
get_http(uri) click to toggle source
# File bin/adstax-spark-job-manager, line 82
def get_http(uri)
  uri = URI.parse(uri)
  Net::HTTP.new(uri.host, uri.port)
end
get_task(state_response, task_id) click to toggle source
# File bin/adstax-spark-job-manager, line 87
def get_task(state_response, task_id)
  target_tasks = []
  state_response['completed_frameworks'].concat(state_response['frameworks']).each do |framework|
    framework['completed_tasks'].concat(framework['tasks']).each do |task|
      target_tasks.push(task) if task['id'] == task_id
    end
  end
  target_tasks[0]
end
kill_job(submission_id) click to toggle source
# File bin/adstax-spark-job-manager, line 172
def kill_job(submission_id)
  uri = URI.parse($cluster_dispatcher_host)
  http = Net::HTTP.new(uri.host, uri.port)
  request = Net::HTTP::Post.new("/v1/submissions/kill/#{submission_id}")
  http.request(request)
end
log_job(submission_id, follow, show_stderr) click to toggle source
# File bin/adstax-spark-job-manager, line 186
def log_job(submission_id, follow, show_stderr)
  status_response = JSON.parse(status_job(submission_id).body)
  if status_response['driverState'] == "NOT_FOUND"
    puts "Unable to find submission with id #{submission_id}"
    exit 1
  end
  if status_response['driverState'] == "QUEUED"
    puts "Submission with id #{submission_id} is still queued for execution"
    if follow
      print "Waiting for submission with id #{submission_id} to start"
      waiting_thread = Thread.new do
        queued = true
        while queued do
          begin
            sleep 1
            print "."
          rescue Interrupt => e
            exit 1
          end
          res = JSON.parse(status_job(submission_id).body)
          queued = res['driverState'] == "QUEUED"
        end
      end
      waiting_thread.join
      puts ""
    else
      exit 1
    end
  end
  marathon_http = get_http("http://marathon.#{$cli_args[:host_suffix]}")
  marathon_response = marathon_http.request(Net::HTTP::Get.new('/v2/info'))
  unless marathon_response.class.body_permitted?
    puts 'Unable to fetch Mesos leader url from Marathon'
    exit 1
  end
  res = JSON.parse(marathon_response.body)
  mesos_http = get_http(res['marathon_config']['mesos_leader_ui_url'])
  mesos_response = mesos_http.request(Net::HTTP::Get.new('/state.json'))
  unless mesos_response.class.body_permitted?
    puts 'Unable to fetch Mesos status'
    exit 1
  end
  res = JSON.parse(mesos_response.body)
  target_task = get_task(res, submission_id)
  unless target_task
    puts "Unable to find submission with id #{submission_id} in Mesos. Maybe the submission is too old?"
    exit 1
  end
  slaves = res['slaves']
  slave_id = target_task['slave_id']
  target_slaves = slaves.select do |slave|
    slave['id'] == slave_id
  end
  if target_slaves.empty?
    puts "Unable to find slave with id #{slave_id}"
    exit 1
  end
  if target_slaves.length != 1
    puts "Multiple slaves with id #{slave_id}"
    exit 1
  end
  target_slave = target_slaves[0]
  slave_http = get_http('http://' + target_slave['hostname'] + ':5051')
  slave_response = slave_http.request(Net::HTTP::Get.new('/state.json'))
  unless slave_response.class.body_permitted?
    puts 'Unable to fetch file from slave'
    exit 1
  end
  res = JSON.parse(slave_response.body)
  target_executor = get_executor(res, submission_id)
  unless target_executor
    puts "Unable to find submission with id #{submission_id} in executor. Maybe the submission is too old?"
    exit 1
  end
  directory = target_executor['directory']
  stdout_file = Tempfile.new('spark' + submission_id)
  stderr_file = Tempfile.new('spark' + submission_id)
  threads = []
  if follow
    threads.push(Thread.new do
                   loop do
                     begin
                       sleep 1
                     rescue Interrupt => e
                       exit 1
                     end
                     mesos_download(slave_http, directory + '/stdout', stdout_file)
                     mesos_download(slave_http, directory + '/stderr', stderr_file)
                   end
                 end)
  else
    mesos_download(slave_http, directory + '/stdout', stdout_file)
    mesos_download(slave_http, directory + '/stderr', stderr_file)
  end
  if follow
    threads.push(tail_file(stdout_file))
    threads.push(tail_file(stderr_file, Proc.new { |line| puts line.chomp.red })) if show_stderr
    begin
      threads.each { |thread| thread.join }
    rescue Interrupt => e
      exit 1
    end
  else
    if show_stderr
      stderr_file.rewind
      puts stderr_file.read.chomp.red
    end
    stdout_file.rewind
    puts stdout_file.read
  end
end
mesos_download(http, remote_file, local_file) click to toggle source
# File bin/adstax-spark-job-manager, line 107
def mesos_download(http, remote_file, local_file)
  params = { path: remote_file }
  encoded_params = URI.encode_www_form(params)
  file_response = http.request(Net::HTTP::Get.new(['/files/download', encoded_params].join('?')))
  unless file_response.class.body_permitted?
    puts 'Unable to fetch file from slave'
    exit 1
  end
  local_file.rewind
  local_file.write(file_response.body)
end
status_job(submission_id) click to toggle source
# File bin/adstax-spark-job-manager, line 179
def status_job(submission_id)
  uri = URI.parse($cluster_dispatcher_host)
  http = Net::HTTP.new(uri.host, uri.port)
  request = Net::HTTP::Get.new("/v1/submissions/status/#{submission_id}")
  http.request(request)
end
submit_job(jar, job) click to toggle source
# File bin/adstax-spark-job-manager, line 141
def submit_job(jar, job)
  uri = URI.parse($cluster_dispatcher_host)
  http = Net::HTTP.new(uri.host, uri.port)
  payload = {
    'action' => 'CreateSubmissionRequest',
    'appArgs' => ['--job', job].concat(ARGV),
    'appResource' => jar,
    'mainClass' => MAIN_CLASS,
    'clientSparkVersion' => '1.6.1',
    'environmentVariables' => {
      'SPARK_SCALA_VERSION' => SPARK_SCALA_VERSION
    },
    'sparkProperties' => {
      'spark.jars' => "#{ELASTICSEARCH_SPARK_JAR},#{$cli_args[:jar]}",
      'spark.driver.supervise' => 'false',
      'spark.app.name' => MAIN_CLASS,
      'spark.es.port' => '49200',
      'spark.es.nodes' => 'localhost',
      'spark.submit.deployMode' => 'cluster',
      'spark.mesos.coarse' => 'false',
      'spark.master' => "mesos://spark-cluster-dispatcher.#{$cli_args[:host_suffix]}:7077",
      'spark.executor.uri' => SPARK_EXECUTOR_URI
    }
  }.to_json
  request = Net::HTTP::Post.new(
    '/v1/submissions/create',
    initheader = { 'Content-Type' => 'application/json' })
  request.body = payload
  http.request(request)
end
tail_file(file, output_method = Proc.new { |line| puts line }) click to toggle source
# File bin/adstax-spark-job-manager, line 119
def tail_file(file, output_method = Proc.new { |line| puts line })
  Thread.new do
    File.open(file.path) do |log|
      log.extend(File::Tail)
      log.interval = 1
      log.backward(10)
      begin
        log.tail { |line| output_method.call(line) }
      rescue Interrupt => e
        exit 1
      end
    end
  end
end
warn_missing(name) click to toggle source
# File bin/adstax-spark-job-manager, line 77
def warn_missing(name)
  puts "Missing required argument: #{name}"
  exit 1
end