class Wukong::Hadoop::HadoopRunner
The Hadoop::Runner
class contains the logic to examine arguments and construct command lines which it will execute to create the desired behavior.
The Hadoop::Runner will introspect on its arguments to guess (if not given) the processors to use as mapper and reducer in a map/reduce job. It will also decide whether to run that job in local or Hadoop
mode. These decisions result in a command which it will ultimately execute.
Public Instance Methods
The prefix to insert befor all invocations of the wu-local
runner.
@return [String]
# File lib/wukong-hadoop/runner.rb, line 171 def command_prefix settings[:command_prefix] end
Does the given `path` contain a processor named after itself?
@param [String] path @return [true, false]
# File lib/wukong-hadoop/runner.rb, line 162 def file_is_processor?(path) return false unless path processor_registered?(processor_name_from_file(path)) end
What mode is this runner in?
@return [:hadoop, :local]
# File lib/wukong-hadoop/runner.rb, line 124 def mode settings[:mode].to_s == 'local' ? :local : :hadoop end
Returns parameters to pass to an invocation of wu-local
.
Parameters like --reduce_tasks
which are relevant to Wukong-Hadoop will be interpreted and not passed. Others will be passed unmodified.
@return [String]
# File lib/wukong-hadoop/runner.rb, line 183 def non_wukong_hadoop_params_string params_to_pass.reject do |param, val| params_to_pass.definition_of(param, :wukong_hadoop) end.map do |param,val| "--#{param}=#{Shellwords.escape(val.to_s)}" end.join(" ") end
Return the guessed name of a processor at the given `path`.
@param [String] path @return [String]
# File lib/wukong-hadoop/runner.rb, line 154 def processor_name_from_file(path) File.basename(path, '.rb') end
Is there a processor registered with the given `name`?
@param [#to_s] name @return [true, false]
# File lib/wukong-hadoop/runner.rb, line 146 def processor_registered? name Wukong.registry.registered?(name.to_s.to_sym) end
Run this command.
# File lib/wukong-hadoop/runner.rb, line 109 def run if mode == :local log.info "Launching local!" execute_command!(local_commandline) else remove_output_path if settings[:rm] || settings[:overwrite] hadoop_commandline log.info "Launching Hadoop!" execute_command!(hadoop_commandline) end end
Were mapper and/or reducer named by separate arguments?
@return [true, false]
# File lib/wukong-hadoop/runner.rb, line 138 def separate_map_and_reduce_args? args.size == 2 end
Were mapper and/or reducer named by a single argument?
@return [true, false]
# File lib/wukong-hadoop/runner.rb, line 131 def single_job_arg? args.size == 1 end
Validate that no more than two arguments were given and that explicit input & output arguments were given if we're in Hadoop
mode.
@raise [Wukong::Error] if validations fail @return [true]
# File lib/wukong-hadoop/runner.rb, line 100 def validate raise Error.new("Cannot provide more than two arguments") if args.length > 2 if mode == :hadoop && (input_paths.nil? || input_paths.empty? || output_path.nil? || output_path.empty?) raise Error.new("Explicit --input and --output paths are required to run a job in Hadoop mode.") end true end