class Wukong::Hadoop::HadoopRunner

The Hadoop::Runner class contains the logic to examine arguments and construct command lines which it will execute to create the desired behavior.

The Hadoop::Runner will introspect on its arguments to guess (if not given) the processors to use as mapper and reducer in a map/reduce job. It will also decide whether to run that job in local or Hadoop mode. These decisions result in a command which it will ultimately execute.

Public Instance Methods

command_prefix() click to toggle source

The prefix to insert befor all invocations of the wu-local runner.

@return [String]

# File lib/wukong-hadoop/runner.rb, line 171
def command_prefix
  settings[:command_prefix]
end
file_is_processor?(path) click to toggle source

Does the given `path` contain a processor named after itself?

@param [String] path @return [true, false]

# File lib/wukong-hadoop/runner.rb, line 162
def file_is_processor?(path)
  return false unless path
  processor_registered?(processor_name_from_file(path))
end
mode() click to toggle source

What mode is this runner in?

@return [:hadoop, :local]

# File lib/wukong-hadoop/runner.rb, line 124
def mode
  settings[:mode].to_s == 'local' ? :local : :hadoop
end
non_wukong_hadoop_params_string() click to toggle source

Returns parameters to pass to an invocation of wu-local.

Parameters like --reduce_tasks which are relevant to Wukong-Hadoop will be interpreted and not passed. Others will be passed unmodified.

@return [String]

# File lib/wukong-hadoop/runner.rb, line 183
def non_wukong_hadoop_params_string
  params_to_pass.reject do |param, val|
    params_to_pass.definition_of(param, :wukong_hadoop)
  end.map do |param,val|
    "--#{param}=#{Shellwords.escape(val.to_s)}"
  end.join(" ")
end
processor_name_from_file(path) click to toggle source

Return the guessed name of a processor at the given `path`.

@param [String] path @return [String]

# File lib/wukong-hadoop/runner.rb, line 154
def processor_name_from_file(path)
  File.basename(path, '.rb')
end
processor_registered?(name) click to toggle source

Is there a processor registered with the given `name`?

@param [#to_s] name @return [true, false]

# File lib/wukong-hadoop/runner.rb, line 146
def processor_registered? name
  Wukong.registry.registered?(name.to_s.to_sym)
end
run() click to toggle source

Run this command.

# File lib/wukong-hadoop/runner.rb, line 109
def run
  if mode == :local
    log.info "Launching local!"
    execute_command!(local_commandline)
  else
    remove_output_path if settings[:rm] || settings[:overwrite]
    hadoop_commandline
    log.info "Launching Hadoop!"
    execute_command!(hadoop_commandline)
  end
end
separate_map_and_reduce_args?() click to toggle source

Were mapper and/or reducer named by separate arguments?

@return [true, false]

# File lib/wukong-hadoop/runner.rb, line 138
def separate_map_and_reduce_args?
  args.size == 2
end
single_job_arg?() click to toggle source

Were mapper and/or reducer named by a single argument?

@return [true, false]

# File lib/wukong-hadoop/runner.rb, line 131
def single_job_arg?
  args.size == 1
end
validate() click to toggle source

Validate that no more than two arguments were given and that explicit input & output arguments were given if we're in Hadoop mode.

@raise [Wukong::Error] if validations fail @return [true]

# File lib/wukong-hadoop/runner.rb, line 100
def validate
  raise Error.new("Cannot provide more than two arguments") if args.length > 2
  if mode == :hadoop && (input_paths.nil? || input_paths.empty? || output_path.nil? || output_path.empty?)
    raise Error.new("Explicit --input and --output paths are required to run a job in Hadoop mode.")
  end
  true
end