module Mobilize::Hdfs
Constants
- VERSION
Public Class Methods
copy(source_url, target_url, user_name)
click to toggle source
copy file from one url to another source cluster must be able to issue copy command to target cluster
# File lib/mobilize-hdfs/handlers/hdfs.rb, line 78 def Hdfs.copy(source_url, target_url, user_name) #convert aliases source_hdfs_url = Hdfs.hdfs_url(source_url) target_hdfs_url = Hdfs.hdfs_url(target_url) #get cluster names source_cluster = source_url.split("://").last.split("/").first target_cluster = target_url.split("://").last.split("/").first #delete target delete_command = "dfs -rm '#{target_hdfs_url}'" Hadoop.run(target_cluster,delete_command,user_name) #copy source to target copy_command = "dfs -cp '#{source_hdfs_url}' '#{target_hdfs_url}'" response = Hadoop.run(source_cluster,copy_command,user_name) if response['exit_code'] != 0 raise "Unable to copy #{source_url} to #{target_url} with error: #{response['stderr']}" else return target_url end end
file_size(url,user_name)
click to toggle source
return the size in bytes for an Hdfs
file
# File lib/mobilize-hdfs/handlers/hdfs.rb, line 23 def Hdfs.file_size(url,user_name) cluster = url.split("://").last.split("/").first hdfs_url = Hdfs.hdfs_url(url) response = Hadoop.run(cluster, "dfs -du '#{hdfs_url}'", user_name) if response['exit_code'] != 0 raise "Unable to get file size for #{url} with error: #{response['stderr']}" else #parse out response return response['stdout'].split("\n")[1].split(" ")[1].to_i end end
hdfs_url(url)
click to toggle source
replaces the cluster alias with a proper namenode path
# File lib/mobilize-hdfs/handlers/hdfs.rb, line 10 def Hdfs.hdfs_url(url) cluster = url.split("hdfs://").last.split("/").first #replace first instance url.sub("hdfs://#{cluster}",Hdfs.root(cluster)) end
home_dir()
click to toggle source
# File lib/mobilize-hdfs.rb, line 6 def Hdfs.home_dir File.expand_path('..',File.dirname(__FILE__)) end
path_to_dst(path,stage_path,gdrive_slot)
click to toggle source
converts a source path or target path to a dst in the context of handler and stage
# File lib/mobilize-hdfs/handlers/hdfs.rb, line 99 def Hdfs.path_to_dst(path,stage_path,gdrive_slot) has_handler = true if path.index("://") s = Stage.where(:path=>stage_path).first params = s.params target_path = params['target'] is_target = true if path == target_path red_path = path.split("://").last cluster = red_path.split("/").first #is user has a handler, is specifying a target, #has more than 1 slash, #or their first path node is a cluster name #assume it's an hdfs pointer if is_target or has_handler or Hadoop.clusters.include?(cluster) or red_path.split("/").length>2 user_name = Hdfs.user_name_by_stage_path(stage_path) hdfs_url = Hdfs.url_by_path(red_path,user_name,is_target) return Dataset.find_or_create_by_url(hdfs_url) end #otherwise, use ssh convention return Ssh.path_to_dst(path,stage_path,gdrive_slot) end
read_by_dataset_path(dst_path,user_name,*args)
click to toggle source
# File lib/mobilize-hdfs/handlers/hdfs.rb, line 35 def Hdfs.read_by_dataset_path(dst_path,user_name,*args) cluster = dst_path.split("/").first url = Hdfs.url_by_path(dst_path,user_name) #make sure file is not too big if Hdfs.file_size(url,user_name) >= Hadoop.read_limit raise "Hadoop read limit reached -- please reduce query size" end hdfs_url = Hdfs.hdfs_url(url) #need to direct stderr to dev null since hdfs throws errors at being headed off read_command = "dfs -cat '#{hdfs_url}'" response = Hadoop.run(cluster,read_command,user_name) if response['exit_code'] != 0 raise "Unable to read from #{url} with error: #{response['stderr']}" else return response['stdout'] end end
root(cluster)
click to toggle source
returns the hdfs path to the root of the cluster
# File lib/mobilize-hdfs/handlers/hdfs.rb, line 4 def Hdfs.root(cluster) namenode = Hadoop.config['clusters'][cluster]['namenode'] "hdfs://#{namenode['name']}:#{namenode['port']}" end
run(cluster,command,user)
click to toggle source
# File lib/mobilize-hdfs/handlers/hdfs.rb, line 16 def Hdfs.run(cluster,command,user) command = ["-",command].join unless command.starts_with?("-") command = "dfs -fs #{Hdfs.root(cluster)}/ #{command}" Hadoop.run(cluster,command,user) end
url_by_path(path,user_name,is_target=false)
click to toggle source
# File lib/mobilize-hdfs/handlers/hdfs.rb, line 120 def Hdfs.url_by_path(path,user_name,is_target=false) cluster = path.split("/").first.to_s if Hadoop.clusters.include?(cluster) #cut node out of path path = "/" + path.split("/")[1..-1].join("/") else cluster = Hadoop.default_cluster path = path.starts_with?("/") ? path : "/#{path}" end url = "hdfs://#{cluster}#{path}" return url end
user_name_by_stage_path(stage_path,cluster=nil)
click to toggle source
# File lib/mobilize-hdfs/handlers/hdfs.rb, line 133 def Hdfs.user_name_by_stage_path(stage_path,cluster=nil) s = Stage.where(:path=>stage_path).first u = s.job.runner.user user_name = s.params['user'] cluster ||= s.params['cluster'] cluster = Hadoop.default_cluster unless Hadoop.clusters.include?(cluster) node = Hadoop.gateway_node(cluster) if user_name and !Ssh.sudoers(node).include?(u.name) raise "#{u.name} does not have su permissions for node #{node}" elsif user_name.nil? user_name = u.name end return user_name end
write(cluster,hdfs_url,string,user_name)
click to toggle source
# File lib/mobilize-hdfs/handlers/hdfs.rb, line 66 def Hdfs.write(cluster,hdfs_url,string,user_name) file_hash = {'file.txt'=>string} #make sure path is clear delete_command = "dfs -rm '#{hdfs_url}'" Hadoop.run(cluster,delete_command,user_name) write_command = "dfs -copyFromLocal file.txt '#{hdfs_url}'" response = Hadoop.run(cluster,write_command,user_name,file_hash) response end
write_by_dataset_path(dst_path,string,user_name)
click to toggle source
used for writing strings straight up to hdfs
# File lib/mobilize-hdfs/handlers/hdfs.rb, line 54 def Hdfs.write_by_dataset_path(dst_path,string,user_name) cluster = dst_path.split("/").first url = Hdfs.url_by_path(dst_path,user_name) hdfs_url = Hdfs.hdfs_url(url) response = Hdfs.write(cluster,hdfs_url,string,user_name) if response['exit_code'] != 0 raise "Unable to write to #{url} with error: #{response['stderr']}" else return response end end
write_by_stage_path(stage_path)
click to toggle source
# File lib/mobilize-hdfs/handlers/hdfs.rb, line 148 def Hdfs.write_by_stage_path(stage_path) gdrive_slot = Gdrive.slot_worker_by_path(stage_path) #return blank response if there are no slots available return nil unless gdrive_slot s = Stage.where(:path=>stage_path).first source = s.sources(gdrive_slot).first Gdrive.unslot_worker_by_path(stage_path) target = s.target cluster = target.url.split("://").last.split("/").first user_name = Hdfs.user_name_by_stage_path(stage_path,cluster) stdout = if source.handler == 'hdfs' Hdfs.copy(source.url,target.url,user_name) elsif ["gsheet","gfile","ssh"].include?(source.handler) in_string = source.read(user_name,gdrive_slot) Dataset.write_by_url(target.url, in_string, user_name) end return {'out_str'=>stdout, 'signal' => 0} end