class Hodor::Hdfs

HDFS Api wrapper

Public Instance Methods

env() click to toggle source
# File lib/hodor/api/hdfs.rb, line 11
def env
  Hodor::Environment.instance
end
get_file(file, options = {}) click to toggle source

get

Gets a file from HDFS and copies it to a local file
# File lib/hodor/api/hdfs.rb, line 197
def get_file(file, options = {})
  disc_path = env.path_on_disc(file)
  hdfs_path = path_on_hdfs(file)
  git_path = env.path_on_github(file)
  dest_path = "#{file}.hdfs_copy"

  logger.info "\tgetting '#{git_path}'. Writing to '#{dest_path}'."

  get_script = %Q["rm -f #{dest_path}; HADOOP_USER_NAME=#{user} hadoop fs -get #{hdfs_path} #{dest_path}"]
  env.ssh get_script, echo: true, echo_cmd: true
  if options[:clobber]
    FileUtils.rm_f dest_path
  end
  env.run_local %Q[scp #{env.ssh_user}@#{env[:ssh_host]}:#{dest_path} .],
    echo: true, echo_cmd: true
rescue StandardError => ex
  raise FailedToGetFile.new ex,
    msg: "Unable to get file from HDFS.",
    ssh_user: env.ssh_user,
    path_on_disc: disc_path,
    path_on_github: git_path,
    path_on_hdfs: hdfs_path,
    dest_file: dest_file
end
hdfs_root() click to toggle source
# File lib/hodor/api/hdfs.rb, line 19
def hdfs_root
  env.settings[:hdfs_root]
end
logger() click to toggle source
# File lib/hodor/api/hdfs.rb, line 15
def logger
  env.logger
end
ls() click to toggle source
# File lib/hodor/api/hdfs.rb, line 75
def ls
  dest_path = path_on_hdfs(".")
  ls_script = %Q[HADOOP_USER_NAME=#{user} hadoop fs -ls #{dest_path}]
  env.ssh ls_script, echo:true
rescue StandardError => ex
  raise FailedToRemovePath.new ex,
    msg: "Unable to list HDFS path.",
    ssh_user: env.ssh_user,
    path_to_list: dest_path
end
path_on_hdfs(file) click to toggle source
# File lib/hodor/api/hdfs.rb, line 27
def path_on_hdfs(file)
  git_path = env.path_on_github(file)
  "#{hdfs_root}/#{git_path}".sub(/\/\/\//, '/').sub(/\/\//, '/').sub(/\/\.\//, '/').sub(/\/\.$/, '')
end
put_dir(path, options) click to toggle source
# File lib/hodor/api/hdfs.rb, line 132
def put_dir(path, options)
  if env.dryrun? and env.verbose?
    logger.info ""
    logger.info "        ********************* Dry Run *********************"
    logger.info ""
  end

  disc_path = env.path_on_disc(path)
  git_path = env.path_on_github(path)
  hdfs_path = path_on_hdfs(path)

  sync_file = "#{disc_path}/.hdfs-#{target}.sync"

  logger.info "Deploying: #{git_path}" unless env.silent?

  fail "Path '#{disc_path}' not found." unless File.exists?(disc_path)
  fail "Path '#{disc_path}' exists but is not a directory." unless File.directory?(disc_path)

  if env.clean?
    logger.info "  cleaning: #{git_path}"
    FileUtils.rm_f sync_file unless env.dryrun?
    rm_rf(git_path)
    clean_done = true
  end

  fargs = if sync_file && File.exists?(sync_file) && !env.clean?
            "-newer '#{sync_file}'"
          else
            ""
          end
  fargs << " -maxdepth #{options[:maxdepth]}" unless options[:maxdepth].nil?
  mod_files = env.run_local %Q[find #{disc_path} #{fargs} -type f]
  mod_files.split("\n").each { |file|
    basename = File.basename(file)
    next if basename.start_with?('job.properties') ||
      basename.eql?("run.properties") ||
      basename.eql?(".DS_Store") ||
      basename.eql?(".bak") ||
      basename.eql?(".tmp") ||
      basename.eql?(".hdfs") ||
      basename.eql?("Rakefile") ||
      basename.end_with?(".sync") ||
      file.include?("migrations/") ||
      file.include?(".bak/") ||
      file.include?(".tmp/")
    put_file(file, already_cleaned: clean_done)
  }
rescue StandardError => ex
  raise FailedToPutDir.new ex,
    msg: "Unable to write directory to HDFS.",
    ssh_user: env.ssh_user,
    path_on_disc: disc_path,
    path_on_github: git_path,
    path_on_hdfs: hdfs_path,
    sync_file: sync_file,
    max_depth: options[:maxdepth],
    clean: env.clean? ? "true" : "false"
else
  env.run_local %Q[touch '#{sync_file}'] unless env.dryrun?
end
put_file(file, options = {}) click to toggle source

put_file

Puts a local file on HDFS, preserving path and replacing if necessary. Files
with .erb extensions are ERB expanded before deployment.
# File lib/hodor/api/hdfs.rb, line 91
def put_file(file, options = {})

  disc_path = env.path_on_disc(file)
  hdfs_path = path_on_hdfs(file)
  git_path = env.path_on_github(file)

  raise "File '#{disc_path}' not found." if !File.exists?(disc_path)

  logger.info "\tdeploying '#{git_path}'"

  src_file = file
  if disc_path.end_with?('.erb')
    erb_expanded = env.erb_load(disc_path)
    src_file = "/tmp/#{File.basename(disc_path.sub(/\.erb$/,''))}"
    hdfs_path.sub!(/\.erb$/, '')
    puts "ends with erb srcfile = #{src_file}"
    File.open(src_file, 'w') { |f| f.write(erb_expanded) }
  end

  raise "File '#{src_file}' not found." if !File.exists?(src_file)

  put_script = "HADOOP_USER_NAME=#{user} hadoop fs -put - #{hdfs_path}"
  unless options[:already_cleaned]
    rm_script = "HADOOP_USER_NAME=#{user} hadoop fs -rm -f #{hdfs_path}; "
    put_script = rm_script + put_script
  end

  env.run_local %Q[cat #{src_file} | ssh #{env.ssh_addr} "#{put_script}"],
    echo: true, echo_cmd: true
rescue StandardError => ex
  raise FailedToPutFile.new ex,
    msg: "Unable to write file to HDFS.",
    ssh_user: env.ssh_user,
    path_on_disc: disc_path,
    path_on_github: git_path,
    path_on_hdfs: hdfs_path,
    src_file: src_file
end
pwd() click to toggle source
# File lib/hodor/api/hdfs.rb, line 23
def pwd
  "#{hdfs_root}#{env.pwd}"
end
rm(path) click to toggle source
# File lib/hodor/api/hdfs.rb, line 42
def rm(path)
  dest_path = path_on_hdfs(path||".")
  rm_path_script = %Q[HADOOP_USER_NAME=#{user} hadoop fs -rm -skipTrash #{dest_path}]
  env.ssh rm_path_script
rescue StandardError => ex
  raise FailedToRemovePath.new ex,
    msg: "Unable to remove HDFS path.",
    ssh_user: env.ssh_user,
    path_to_remove: dest_path
end
rm_f(path) click to toggle source
# File lib/hodor/api/hdfs.rb, line 53
def rm_f(path)
  dest_path = path_on_hdfs(path||".")
  rm_path_script = %Q[HADOOP_USER_NAME=#{user} hadoop fs -rm -f -skipTrash #{dest_path}]
  env.ssh rm_path_script
rescue StandardError => ex
  raise FailedToRemovePath.new ex,
    msg: "Unable to remove HDFS path.",
    ssh_user: env.ssh_user,
    path_to_remove: dest_path
end
rm_rf(path) click to toggle source
# File lib/hodor/api/hdfs.rb, line 64
def rm_rf(path)
  hdfs_path = path_on_hdfs(path||".")
  rm_path_script = %Q[HADOOP_USER_NAME=#{user} hadoop fs -rm -f -R -skipTrash #{hdfs_path}]
  env.ssh rm_path_script
rescue StandardError => ex
  raise FailedToRemovePath.new ex,
    msg: "Unable to remove HDFS path.",
    ssh_user: env.ssh_user,
    path_to_remove: dest_path
end
target() click to toggle source
# File lib/hodor/api/hdfs.rb, line 36
def target
  env.settings[:target]
end
user() click to toggle source
# File lib/hodor/api/hdfs.rb, line 32
def user
  env.settings[:hdfs_user]
end