class Object
Constants
- ELASTICSEARCH_SPARK_JAR
- MAIN_CLASS
Constants
- SPARK_EXECUTOR_URI
- SPARK_SCALA_VERSION
Public Instance Methods
get_executor(state_response, task_id)
click to toggle source
# File bin/adstax-spark-job-manager, line 97 def get_executor(state_response, task_id) target_executors = [] state_response['completed_frameworks'].concat(state_response['frameworks']).each do |framework| framework['completed_executors'].concat(framework['executors']).each do |executor| target_executors.push(executor) if executor['id'] == task_id end end target_executors[0] end
get_http(uri)
click to toggle source
# File bin/adstax-spark-job-manager, line 82 def get_http(uri) uri = URI.parse(uri) Net::HTTP.new(uri.host, uri.port) end
get_task(state_response, task_id)
click to toggle source
# File bin/adstax-spark-job-manager, line 87 def get_task(state_response, task_id) target_tasks = [] state_response['completed_frameworks'].concat(state_response['frameworks']).each do |framework| framework['completed_tasks'].concat(framework['tasks']).each do |task| target_tasks.push(task) if task['id'] == task_id end end target_tasks[0] end
kill_job(submission_id)
click to toggle source
# File bin/adstax-spark-job-manager, line 172 def kill_job(submission_id) uri = URI.parse($cluster_dispatcher_host) http = Net::HTTP.new(uri.host, uri.port) request = Net::HTTP::Post.new("/v1/submissions/kill/#{submission_id}") http.request(request) end
log_job(submission_id, follow, show_stderr)
click to toggle source
# File bin/adstax-spark-job-manager, line 186 def log_job(submission_id, follow, show_stderr) status_response = JSON.parse(status_job(submission_id).body) if status_response['driverState'] == "NOT_FOUND" puts "Unable to find submission with id #{submission_id}" exit 1 end if status_response['driverState'] == "QUEUED" puts "Submission with id #{submission_id} is still queued for execution" if follow print "Waiting for submission with id #{submission_id} to start" waiting_thread = Thread.new do queued = true while queued do begin sleep 1 print "." rescue Interrupt => e exit 1 end res = JSON.parse(status_job(submission_id).body) queued = res['driverState'] == "QUEUED" end end waiting_thread.join puts "" else exit 1 end end marathon_http = get_http("http://marathon.#{$cli_args[:host_suffix]}") marathon_response = marathon_http.request(Net::HTTP::Get.new('/v2/info')) unless marathon_response.class.body_permitted? puts 'Unable to fetch Mesos leader url from Marathon' exit 1 end res = JSON.parse(marathon_response.body) mesos_http = get_http(res['marathon_config']['mesos_leader_ui_url']) mesos_response = mesos_http.request(Net::HTTP::Get.new('/state.json')) unless mesos_response.class.body_permitted? puts 'Unable to fetch Mesos status' exit 1 end res = JSON.parse(mesos_response.body) target_task = get_task(res, submission_id) unless target_task puts "Unable to find submission with id #{submission_id} in Mesos. Maybe the submission is too old?" exit 1 end slaves = res['slaves'] slave_id = target_task['slave_id'] target_slaves = slaves.select do |slave| slave['id'] == slave_id end if target_slaves.empty? puts "Unable to find slave with id #{slave_id}" exit 1 end if target_slaves.length != 1 puts "Multiple slaves with id #{slave_id}" exit 1 end target_slave = target_slaves[0] slave_http = get_http('http://' + target_slave['hostname'] + ':5051') slave_response = slave_http.request(Net::HTTP::Get.new('/state.json')) unless slave_response.class.body_permitted? puts 'Unable to fetch file from slave' exit 1 end res = JSON.parse(slave_response.body) target_executor = get_executor(res, submission_id) unless target_executor puts "Unable to find submission with id #{submission_id} in executor. Maybe the submission is too old?" exit 1 end directory = target_executor['directory'] stdout_file = Tempfile.new('spark' + submission_id) stderr_file = Tempfile.new('spark' + submission_id) threads = [] if follow threads.push(Thread.new do loop do begin sleep 1 rescue Interrupt => e exit 1 end mesos_download(slave_http, directory + '/stdout', stdout_file) mesos_download(slave_http, directory + '/stderr', stderr_file) end end) else mesos_download(slave_http, directory + '/stdout', stdout_file) mesos_download(slave_http, directory + '/stderr', stderr_file) end if follow threads.push(tail_file(stdout_file)) threads.push(tail_file(stderr_file, Proc.new { |line| puts line.chomp.red })) if show_stderr begin threads.each { |thread| thread.join } rescue Interrupt => e exit 1 end else if show_stderr stderr_file.rewind puts stderr_file.read.chomp.red end stdout_file.rewind puts stdout_file.read end end
mesos_download(http, remote_file, local_file)
click to toggle source
# File bin/adstax-spark-job-manager, line 107 def mesos_download(http, remote_file, local_file) params = { path: remote_file } encoded_params = URI.encode_www_form(params) file_response = http.request(Net::HTTP::Get.new(['/files/download', encoded_params].join('?'))) unless file_response.class.body_permitted? puts 'Unable to fetch file from slave' exit 1 end local_file.rewind local_file.write(file_response.body) end
status_job(submission_id)
click to toggle source
# File bin/adstax-spark-job-manager, line 179 def status_job(submission_id) uri = URI.parse($cluster_dispatcher_host) http = Net::HTTP.new(uri.host, uri.port) request = Net::HTTP::Get.new("/v1/submissions/status/#{submission_id}") http.request(request) end
submit_job(jar, job)
click to toggle source
# File bin/adstax-spark-job-manager, line 141 def submit_job(jar, job) uri = URI.parse($cluster_dispatcher_host) http = Net::HTTP.new(uri.host, uri.port) payload = { 'action' => 'CreateSubmissionRequest', 'appArgs' => ['--job', job].concat(ARGV), 'appResource' => jar, 'mainClass' => MAIN_CLASS, 'clientSparkVersion' => '1.6.1', 'environmentVariables' => { 'SPARK_SCALA_VERSION' => SPARK_SCALA_VERSION }, 'sparkProperties' => { 'spark.jars' => "#{ELASTICSEARCH_SPARK_JAR},#{$cli_args[:jar]}", 'spark.driver.supervise' => 'false', 'spark.app.name' => MAIN_CLASS, 'spark.es.port' => '49200', 'spark.es.nodes' => 'localhost', 'spark.submit.deployMode' => 'cluster', 'spark.mesos.coarse' => 'false', 'spark.master' => "mesos://spark-cluster-dispatcher.#{$cli_args[:host_suffix]}:7077", 'spark.executor.uri' => SPARK_EXECUTOR_URI } }.to_json request = Net::HTTP::Post.new( '/v1/submissions/create', initheader = { 'Content-Type' => 'application/json' }) request.body = payload http.request(request) end
tail_file(file, output_method = Proc.new { |line| puts line })
click to toggle source
# File bin/adstax-spark-job-manager, line 119 def tail_file(file, output_method = Proc.new { |line| puts line }) Thread.new do File.open(file.path) do |log| log.extend(File::Tail) log.interval = 1 log.backward(10) begin log.tail { |line| output_method.call(line) } rescue Interrupt => e exit 1 end end end end
warn_missing(name)
click to toggle source
# File bin/adstax-spark-job-manager, line 77 def warn_missing(name) puts "Missing required argument: #{name}" exit 1 end