class WebHDFS::ClientV1

Constants

KNOWN_ERRORS
OPT_TABLE

This hash table holds command options.

REDIRECTED_OPERATIONS
SSL_VERIFY_MODES

Attributes

doas[RW]
host[RW]
http_headers[RW]
httpfs_mode[RW]
kerberos[RW]
kerberos_delegation_token[RW]
kerberos_keytab[RW]
open_timeout[RW]
port[RW]
proxy_address[RW]
proxy_pass[RW]
proxy_port[RW]
proxy_user[RW]
read_timeout[RW]
retry_interval[RW]
retry_known_errors[RW]
retry_times[RW]
ssl[RW]
ssl_ca_file[RW]
ssl_cert[RW]
ssl_key[RW]
ssl_verify_mode[R]
ssl_version[RW]
username[RW]

Public Class Methods

new(host='localhost', port=50070, username=nil, doas=nil, proxy_address=nil, proxy_port=nil, http_headers={}, renew_kerberos_delegation_token_time_hour=nil) click to toggle source
# File lib/webhdfs/client_v1.rb, line 41
def initialize(host='localhost', port=50070, username=nil, doas=nil, proxy_address=nil, proxy_port=nil, http_headers={}, renew_kerberos_delegation_token_time_hour=nil)
  @host = host
  @port = port
  @username = username
  @doas = doas
  @proxy_address = proxy_address
  @proxy_port = proxy_port
  @retry_known_errors = false
  @retry_times = 1
  @retry_interval = 1

  @httpfs_mode = false

  @ssl = false
  @ssl_ca_file = nil
  @ssl_verify_mode = nil
  @ssl_cert = nil
  @ssl_key = nil
  @ssl_version = nil

  @kerberos = false
  @kerberos_keytab = nil
  @renew_kerberos_delegation_token_time_hour = renew_kerberos_delegation_token_time_hour
  @kerberos_delegation_token = nil
  @kerberos_token_updated_at = Time.now
  @http_headers = http_headers
end

Public Instance Methods

api_path(path) click to toggle source
# File lib/webhdfs/client_v1.rb, line 284
def api_path(path)
  if path.start_with?('/')
    '/webhdfs/v1' + path
  else
    '/webhdfs/v1/' + path
  end
end
build_path(path, op, params) click to toggle source
# File lib/webhdfs/client_v1.rb, line 292
def build_path(path, op, params)
  opts = if @username and @doas
           {'op' => op, 'user.name' => @username, 'doas' => @doas}
         elsif @username
           {'op' => op, 'user.name' => @username}
         elsif @doas
           {'op' => op, 'doas' => @doas}
         else
           {'op' => op}
         end
  query = URI.encode_www_form(params.merge(opts))
  api_path(path) + '?' + query
end
check_options(options, optdecl=[]) click to toggle source

def cancel_delegation_token(token, options={}) # CANCELDELEGATIONTOKEN

raise NotImplementedError

end

# File lib/webhdfs/client_v1.rb, line 275
def check_options(options, optdecl=[])
  ex = options.keys.map(&:to_s) - (optdecl || [])
  raise ArgumentError, "no such option: #{ex.join(' ')}" unless ex.empty?
end
check_success_json(res, attr=nil) click to toggle source
# File lib/webhdfs/client_v1.rb, line 280
def check_success_json(res, attr=nil)
  res.code == '200' and res.content_type == 'application/json' and (attr.nil? or JSON.parse(res.body)[attr])
end
get_cached_kerberos_delegation_token(force_renew=nil) click to toggle source
# File lib/webhdfs/client_v1.rb, line 74
def get_cached_kerberos_delegation_token(force_renew=nil)
  return @kerberos_delegation_token if @kerberos_delegation_token && !should_kerberos_token_updated? && !force_renew

  if !@kerberos_delegation_token || force_renew
    @kerberos_delegation_token = get_kerberos_delegation_token(@username)
    @kerberos_token_updated_at = Time.now
  else
    renew_kerberos_delegation_token(@kerberos_delegation_token)
    @kerberos_token_updated_at = Time.now
  end
  @kerberos_delegation_token
end
getfilechecksum(path, options={})
Alias for: checksum
getfilestatus(path, options={})
Alias for: stat
gethomedirectory(options={})
Alias for: homedir
liststatus(path, options={})
Alias for: list
mkdirs(path, options={})
Alias for: mkdir
open(path, options={})
Alias for: read
operate_requests(method, path, op, params={}, payload=nil) click to toggle source
# File lib/webhdfs/client_v1.rb, line 307
def operate_requests(method, path, op, params={}, payload=nil)
  if not @httpfs_mode and REDIRECTED_OPERATIONS.include?(op)
    res = request(@host, @port, method, path, op, params, nil)
    unless res.is_a?(Net::HTTPRedirection) and res['location']
      msg = "NameNode returns non-redirection (or without location header), code:#{res.code}, body:#{res.body}."
      raise WebHDFS::RequestFailedError, msg
    end
    uri = URI.parse(res['location'])
    rpath = if uri.query
              uri.path + '?' + uri.query
            else
              uri.path
            end
    request(uri.host, uri.port, method, rpath, nil, {}, payload, {'Content-Type' => 'application/octet-stream'})
  else
    if @httpfs_mode and not payload.nil?
      request(@host, @port, method, path, op, params, payload, {'Content-Type' => 'application/octet-stream'})
    else
      request(@host, @port, method, path, op, params, payload)
    end
  end
end
rename(path, dest, options={}) click to toggle source

curl -i -X PUT “<HOST>:<PORT>/webhdfs/v1/<PATH>?op=RENAME&destination=<PATH>”

# File lib/webhdfs/client_v1.rb, line 139
def rename(path, dest, options={})
  check_options(options, OPT_TABLE['RENAME'])
  unless dest.start_with?('/')
    dest = '/' + dest
  end
  res = operate_requests('PUT', path, 'RENAME', options.merge({'destination' => dest}))
  check_success_json(res, 'boolean')
end
request(host, port, method, path, op=nil, params={}, payload=nil, header=nil, retries=0) click to toggle source

IllegalArgumentException 400 Bad Request UnsupportedOperationException 400 Bad Request SecurityException 401 Unauthorized IOException 403 Forbidden FileNotFoundException 404 Not Found RumtimeException 500 Internal Server Error

# File lib/webhdfs/client_v1.rb, line 336
def request(host, port, method, path, op=nil, params={}, payload=nil, header=nil, retries=0)
  conn = Net::HTTP.new(host, port, @proxy_address, @proxy_port)
  conn.proxy_user = @proxy_user if @proxy_user
  conn.proxy_pass = @proxy_pass if @proxy_pass
  conn.open_timeout = @open_timeout if @open_timeout
  conn.read_timeout = @read_timeout if @read_timeout

  path = Addressable::URI.escape(path) # make path safe for transmission via HTTP
  request_path = if op
                   build_path(path, op, params)
                 else
                   path
                 end
  if @ssl
    conn.use_ssl = true
    conn.ca_file = @ssl_ca_file if @ssl_ca_file
    if @ssl_verify_mode
      require 'openssl'
      conn.verify_mode = case @ssl_verify_mode
                         when :none then OpenSSL::SSL::VERIFY_NONE
                         when :peer then OpenSSL::SSL::VERIFY_PEER
                         end
    end
    conn.cert = @ssl_cert if @ssl_cert
    conn.key = @ssl_key if @ssl_key
    conn.ssl_version = @ssl_version if @ssl_version
  end

  gsscli = nil
  if @kerberos
    require 'base64'
    require 'gssapi'
    gsscli = GSSAPI::Simple.new(@host, 'HTTP', @kerberos_keytab)
    token = nil
    begin
      token = gsscli.init_context
    rescue => e
      raise WebHDFS::KerberosError, e.message
    end
    if header
      header['Authorization'] = "Negotiate #{Base64.strict_encode64(token)}"
    else
      header = {'Authorization' => "Negotiate #{Base64.strict_encode64(token)}"}
    end
  else
    header = {} if header.nil?
    header = @http_headers.merge(header)
  end

  res = nil
  if !payload.nil? and payload.respond_to? :read and payload.respond_to? :size
    req = Net::HTTPGenericRequest.new(method,(payload ? true : false),true,request_path,header)
    raise WebHDFS::ClientError, 'Error accepting given IO resource as data payload, Not valid in methods other than PUT and POST' unless (method == 'PUT' or method == 'POST')

    req.body_stream = payload
    req.content_length = payload.size
    begin
      res = conn.request(req)
    rescue => e
      raise WebHDFS::ServerError, "Failed to connect to host #{host}:#{port}, #{e.message}"
    end
  else
    begin
      res = conn.send_request(method, request_path, payload, header)
    rescue => e
      raise WebHDFS::ServerError, "Failed to connect to host #{host}:#{port}, #{e.message}"
    end
  end

  # if delegation token param settled, namenode do not response WWW-Authenticate header
  if @kerberos and res.code == '307' and not params.key?('delegation')
    itok = (res.header.get_fields('WWW-Authenticate') || ['']).pop.split(/\s+/).last
    unless itok
      raise WebHDFS::KerberosError, 'Server does not return WWW-Authenticate header'
    end

    begin
      gsscli.init_context(Base64.strict_decode64(itok))
    rescue => e
      raise WebHDFS::KerberosError, e.message
    end
  end

  case res
  when Net::HTTPSuccess
    res
  when Net::HTTPRedirection
    res
  else
    message = if res.body and not res.body.empty?
                res.body.gsub(/\n/, '')
              else
                'Response body is empty...'
              end

    # when delegation token is invalid
    if res.code == '403' and @renew_kerberos_delegation_token_time_hour && retries < @retry_times
      if message.include?('{"RemoteException":{')
        detail = JSON.parse(message) rescue nil
        if detail&.dig('RemoteException', 'message')&.include?('HDFS_DELEGATION_TOKEN')
          params = params.merge('token' => get_cached_kerberos_delegation_token(true))
          sleep @retry_interval if @retry_interval > 0
          return request(host, port, method, path, op, params, payload, header, retries+1)
        end
      end
    end

    if @retry_known_errors && retries < @retry_times
      detail = nil
      if message =~ /^\{"RemoteException":\{/
        begin
          detail = JSON.parse(message)
        rescue
          # ignore broken json response body
        end
      end
      if detail && detail['RemoteException'] && KNOWN_ERRORS.include?(detail['RemoteException']['exception'])
        sleep @retry_interval if @retry_interval > 0
        return request(host, port, method, path, op, params, payload, header, retries+1)
      end
    end

    case res.code
    when '400'
      raise WebHDFS::ClientError, message
    when '401'
      raise WebHDFS::SecurityError, message
    when '403'
      raise WebHDFS::IOError, message
    when '404'
      raise WebHDFS::FileNotFoundError, message
    when '500'
      raise WebHDFS::ServerError, message
    else
      raise WebHDFS::RequestFailedError, "response code:#{res.code}, message:#{message}"
    end
  end
end
setowner(path, options={})
Alias for: chown
setpermission(path, mode, options={})
Alias for: chmod
setreplication(path, replnum, options={})
Alias for: replication
settimes(path, options={})
Alias for: touch
should_kerberos_token_updated?() click to toggle source
# File lib/webhdfs/client_v1.rb, line 69
def should_kerberos_token_updated?
  @kerberos_token_updated_at + (@renew_kerberos_delegation_token_time_hour * 60 * 60) <= Time.now
end
ssl_verify_mode=(mode) click to toggle source
# File lib/webhdfs/client_v1.rb, line 34
def ssl_verify_mode=(mode)
  unless SSL_VERIFY_MODES.include? mode
    raise ArgumentError, "Invalid SSL verify mode #{mode.inspect}"
  end
  @ssl_verify_mode = mode
end