class WebHDFS::ClientV1
Constants
- KNOWN_ERRORS
- OPT_TABLE
This hash table holds command options.
- REDIRECTED_OPERATIONS
- SSL_VERIFY_MODES
Attributes
doas[RW]
host[RW]
http_headers[RW]
httpfs_mode[RW]
kerberos[RW]
kerberos_delegation_token[RW]
kerberos_keytab[RW]
open_timeout[RW]
port[RW]
proxy_address[RW]
proxy_pass[RW]
proxy_port[RW]
proxy_user[RW]
read_timeout[RW]
retry_interval[RW]
retry_known_errors[RW]
retry_times[RW]
ssl[RW]
ssl_ca_file[RW]
ssl_cert[RW]
ssl_key[RW]
ssl_verify_mode[R]
ssl_version[RW]
username[RW]
Public Class Methods
new(host='localhost', port=50070, username=nil, doas=nil, proxy_address=nil, proxy_port=nil, http_headers={}, renew_kerberos_delegation_token_time_hour=nil)
click to toggle source
# File lib/webhdfs/client_v1.rb, line 41 def initialize(host='localhost', port=50070, username=nil, doas=nil, proxy_address=nil, proxy_port=nil, http_headers={}, renew_kerberos_delegation_token_time_hour=nil) @host = host @port = port @username = username @doas = doas @proxy_address = proxy_address @proxy_port = proxy_port @retry_known_errors = false @retry_times = 1 @retry_interval = 1 @httpfs_mode = false @ssl = false @ssl_ca_file = nil @ssl_verify_mode = nil @ssl_cert = nil @ssl_key = nil @ssl_version = nil @kerberos = false @kerberos_keytab = nil @renew_kerberos_delegation_token_time_hour = renew_kerberos_delegation_token_time_hour @kerberos_delegation_token = nil @kerberos_token_updated_at = Time.now @http_headers = http_headers end
Public Instance Methods
api_path(path)
click to toggle source
# File lib/webhdfs/client_v1.rb, line 284 def api_path(path) if path.start_with?('/') '/webhdfs/v1' + path else '/webhdfs/v1/' + path end end
append(path, body, options={})
click to toggle source
build_path(path, op, params)
click to toggle source
# File lib/webhdfs/client_v1.rb, line 292 def build_path(path, op, params) opts = if @username and @doas {'op' => op, 'user.name' => @username, 'doas' => @doas} elsif @username {'op' => op, 'user.name' => @username} elsif @doas {'op' => op, 'doas' => @doas} else {'op' => op} end query = URI.encode_www_form(params.merge(opts)) api_path(path) + '?' + query end
check_options(options, optdecl=[])
click to toggle source
def cancel_delegation_token(token, options={}) # CANCELDELEGATIONTOKEN
raise NotImplementedError
end
# File lib/webhdfs/client_v1.rb, line 275 def check_options(options, optdecl=[]) ex = options.keys.map(&:to_s) - (optdecl || []) raise ArgumentError, "no such option: #{ex.join(' ')}" unless ex.empty? end
check_success_json(res, attr=nil)
click to toggle source
# File lib/webhdfs/client_v1.rb, line 280 def check_success_json(res, attr=nil) res.code == '200' and res.content_type == 'application/json' and (attr.nil? or JSON.parse(res.body)[attr]) end
checksum(path, options={})
click to toggle source
Also aliased as: getfilechecksum
chmod(path, mode, options={})
click to toggle source
Also aliased as: setpermission
chown(path, options={})
click to toggle source
Also aliased as: setowner
content_summary(path, options={})
click to toggle source
Also aliased as: getcontentsummary
create(path, body, options={})
click to toggle source
delete(path, options={})
click to toggle source
get_cached_kerberos_delegation_token(force_renew=nil)
click to toggle source
# File lib/webhdfs/client_v1.rb, line 74 def get_cached_kerberos_delegation_token(force_renew=nil) return @kerberos_delegation_token if @kerberos_delegation_token && !should_kerberos_token_updated? && !force_renew if !@kerberos_delegation_token || force_renew @kerberos_delegation_token = get_kerberos_delegation_token(@username) @kerberos_token_updated_at = Time.now else renew_kerberos_delegation_token(@kerberos_delegation_token) @kerberos_token_updated_at = Time.now end @kerberos_delegation_token end
get_kerberos_delegation_token(user, options={})
click to toggle source
curl -i “
# File lib/webhdfs/client_v1.rb, line 253 def get_kerberos_delegation_token(user, options={}) options = options.merge({ 'renewer' => user }) check_options(options, OPT_TABLE['GETDELEGATIONTOKEN']) res = operate_requests('GET', '/', 'GETDELEGATIONTOKEN', options) check_success_json(res, 'Token') JSON.parse(res.body)['Token']['urlString'] end
gettrashroot(options={})
click to toggle source
homedir(options={})
click to toggle source
Also aliased as: gethomedirectory
list(path, options={})
click to toggle source
Also aliased as: liststatus
mkdir(path, options={})
click to toggle source
Also aliased as: mkdirs
operate_requests(method, path, op, params={}, payload=nil)
click to toggle source
# File lib/webhdfs/client_v1.rb, line 307 def operate_requests(method, path, op, params={}, payload=nil) if not @httpfs_mode and REDIRECTED_OPERATIONS.include?(op) res = request(@host, @port, method, path, op, params, nil) unless res.is_a?(Net::HTTPRedirection) and res['location'] msg = "NameNode returns non-redirection (or without location header), code:#{res.code}, body:#{res.body}." raise WebHDFS::RequestFailedError, msg end uri = URI.parse(res['location']) rpath = if uri.query uri.path + '?' + uri.query else uri.path end request(uri.host, uri.port, method, rpath, nil, {}, payload, {'Content-Type' => 'application/octet-stream'}) else if @httpfs_mode and not payload.nil? request(@host, @port, method, path, op, params, payload, {'Content-Type' => 'application/octet-stream'}) else request(@host, @port, method, path, op, params, payload) end end end
read(path, options={})
click to toggle source
Also aliased as: open
rename(path, dest, options={})
click to toggle source
curl -i -X PUT “<HOST>:<PORT>/webhdfs/v1/<PATH>?op=RENAME&destination=<PATH>”
# File lib/webhdfs/client_v1.rb, line 139 def rename(path, dest, options={}) check_options(options, OPT_TABLE['RENAME']) unless dest.start_with?('/') dest = '/' + dest end res = operate_requests('PUT', path, 'RENAME', options.merge({'destination' => dest})) check_success_json(res, 'boolean') end
renew_kerberos_delegation_token(token, options={})
click to toggle source
curl -i -X PUT “
# File lib/webhdfs/client_v1.rb, line 263 def renew_kerberos_delegation_token(token, options={}) options = options.merge({ 'token' => token }) check_options(options, OPT_TABLE['RENEWDELEGATIONTOKEN']) res = operate_requests('PUT', '/', 'RENEWDELEGATIONTOKEN', options) check_success_json(res, 'long') end
replication(path, replnum, options={})
click to toggle source
Also aliased as: setreplication
request(host, port, method, path, op=nil, params={}, payload=nil, header=nil, retries=0)
click to toggle source
IllegalArgumentException 400 Bad Request UnsupportedOperationException 400 Bad Request SecurityException 401 Unauthorized IOException 403 Forbidden FileNotFoundException 404 Not Found RumtimeException 500 Internal Server Error
# File lib/webhdfs/client_v1.rb, line 336 def request(host, port, method, path, op=nil, params={}, payload=nil, header=nil, retries=0) conn = Net::HTTP.new(host, port, @proxy_address, @proxy_port) conn.proxy_user = @proxy_user if @proxy_user conn.proxy_pass = @proxy_pass if @proxy_pass conn.open_timeout = @open_timeout if @open_timeout conn.read_timeout = @read_timeout if @read_timeout path = Addressable::URI.escape(path) # make path safe for transmission via HTTP request_path = if op build_path(path, op, params) else path end if @ssl conn.use_ssl = true conn.ca_file = @ssl_ca_file if @ssl_ca_file if @ssl_verify_mode require 'openssl' conn.verify_mode = case @ssl_verify_mode when :none then OpenSSL::SSL::VERIFY_NONE when :peer then OpenSSL::SSL::VERIFY_PEER end end conn.cert = @ssl_cert if @ssl_cert conn.key = @ssl_key if @ssl_key conn.ssl_version = @ssl_version if @ssl_version end gsscli = nil if @kerberos require 'base64' require 'gssapi' gsscli = GSSAPI::Simple.new(@host, 'HTTP', @kerberos_keytab) token = nil begin token = gsscli.init_context rescue => e raise WebHDFS::KerberosError, e.message end if header header['Authorization'] = "Negotiate #{Base64.strict_encode64(token)}" else header = {'Authorization' => "Negotiate #{Base64.strict_encode64(token)}"} end else header = {} if header.nil? header = @http_headers.merge(header) end res = nil if !payload.nil? and payload.respond_to? :read and payload.respond_to? :size req = Net::HTTPGenericRequest.new(method,(payload ? true : false),true,request_path,header) raise WebHDFS::ClientError, 'Error accepting given IO resource as data payload, Not valid in methods other than PUT and POST' unless (method == 'PUT' or method == 'POST') req.body_stream = payload req.content_length = payload.size begin res = conn.request(req) rescue => e raise WebHDFS::ServerError, "Failed to connect to host #{host}:#{port}, #{e.message}" end else begin res = conn.send_request(method, request_path, payload, header) rescue => e raise WebHDFS::ServerError, "Failed to connect to host #{host}:#{port}, #{e.message}" end end # if delegation token param settled, namenode do not response WWW-Authenticate header if @kerberos and res.code == '307' and not params.key?('delegation') itok = (res.header.get_fields('WWW-Authenticate') || ['']).pop.split(/\s+/).last unless itok raise WebHDFS::KerberosError, 'Server does not return WWW-Authenticate header' end begin gsscli.init_context(Base64.strict_decode64(itok)) rescue => e raise WebHDFS::KerberosError, e.message end end case res when Net::HTTPSuccess res when Net::HTTPRedirection res else message = if res.body and not res.body.empty? res.body.gsub(/\n/, '') else 'Response body is empty...' end # when delegation token is invalid if res.code == '403' and @renew_kerberos_delegation_token_time_hour && retries < @retry_times if message.include?('{"RemoteException":{') detail = JSON.parse(message) rescue nil if detail&.dig('RemoteException', 'message')&.include?('HDFS_DELEGATION_TOKEN') params = params.merge('token' => get_cached_kerberos_delegation_token(true)) sleep @retry_interval if @retry_interval > 0 return request(host, port, method, path, op, params, payload, header, retries+1) end end end if @retry_known_errors && retries < @retry_times detail = nil if message =~ /^\{"RemoteException":\{/ begin detail = JSON.parse(message) rescue # ignore broken json response body end end if detail && detail['RemoteException'] && KNOWN_ERRORS.include?(detail['RemoteException']['exception']) sleep @retry_interval if @retry_interval > 0 return request(host, port, method, path, op, params, payload, header, retries+1) end end case res.code when '400' raise WebHDFS::ClientError, message when '401' raise WebHDFS::SecurityError, message when '403' raise WebHDFS::IOError, message when '404' raise WebHDFS::FileNotFoundError, message when '500' raise WebHDFS::ServerError, message else raise WebHDFS::RequestFailedError, "response code:#{res.code}, message:#{message}" end end end
should_kerberos_token_updated?()
click to toggle source
# File lib/webhdfs/client_v1.rb, line 69 def should_kerberos_token_updated? @kerberos_token_updated_at + (@renew_kerberos_delegation_token_time_hour * 60 * 60) <= Time.now end
ssl_verify_mode=(mode)
click to toggle source
# File lib/webhdfs/client_v1.rb, line 34 def ssl_verify_mode=(mode) unless SSL_VERIFY_MODES.include? mode raise ArgumentError, "Invalid SSL verify mode #{mode.inspect}" end @ssl_verify_mode = mode end
stat(path, options={})
click to toggle source
Also aliased as: getfilestatus
touch(path, options={})
click to toggle source
Also aliased as: settimes