package org.apache.spark.api.ruby
import java.io.{File, FileOutputStream, InputStreamReader, BufferedReader}
import scala.collection.JavaConversions._
import org.apache.spark.{SparkEnv, Logging} import org.apache.spark.util._
/* =================================================================================================
* class FileCommand * ================================================================================================= * * Save command to file and than execute him because from Scala you cannot simply run * something like "bash --norc -i -c 'source .zshrc; ruby master.rb'" */
class FileCommand(command: String) extends Logging {
var pb: ProcessBuilder = null var file: File = null // Command is complete. def this(command: String, env: SparkEnv) = { this(command) create(env) } // Template must contains %s which will be replaced for command def this(template: String, command: String, env: SparkEnv, envVars: Map[String, String]) = { this(template.format(command), env) setEnvVars(envVars) } private def create(env: SparkEnv) { val dir = new File(env.sparkFilesDir) val ext = if(Utils.isWindows) ".cmd" else ".sh" val shell = if(Utils.isWindows) "cmd" else "bash" file = File.createTempFile("command", ext, dir) val out = new FileOutputStream(file) out.write(command.getBytes) out.close logInfo(s"New FileCommand at ${file.getAbsolutePath}") pb = new ProcessBuilder(shell, file.getAbsolutePath) } def setEnvVars(vars: Map[String, String]) { pb.environment().putAll(vars) } def run = { new ExecutedFileCommand(pb.start) }
}
/* =================================================================================================
* class ExecutedFileCommand * ================================================================================================= * * Represent process executed from file. */
class ExecutedFileCommand(process: Process) {
var reader: BufferedReader = null def readLine = { openInput reader.readLine.toString.trim } def openInput { if(reader != null){ return } val input = new InputStreamReader(process.getInputStream) reader = new BufferedReader(input) } // Delegation def destroy = process.destroy def getInputStream = process.getInputStream def getErrorStream = process.getErrorStream
}