module LogStash::Outputs::WebHdfsHelper

Public Instance Methods

compress_gzip(data) click to toggle source

Compress data using the gzip methods. @param data [String] stream of data to be compressed @return [String] the compressed stream of data

# File lib/logstash/outputs/webhdfs_helper.rb, line 59
def compress_gzip(data)
  buffer = StringIO.new('','w')
  compressor = Zlib::GzipWriter.new(buffer)
  begin
    compressor.write(data)
  ensure
    compressor.close()
  end
  buffer.string
end
compress_snappy_file(data) click to toggle source

Compress snappy file. @param data [binary] stream of data to be compressed @return [String] the compressed stream of data

# File lib/logstash/outputs/webhdfs_helper.rb, line 73
def compress_snappy_file(data)
  # Encode data to ASCII_8BIT (binary)
  data= data.encode(Encoding::ASCII_8BIT, "binary", :undef => :replace)
  buffer = StringIO.new('', 'w')
  buffer.set_encoding(Encoding::ASCII_8BIT)
  compressed = Snappy.deflate(data)
  buffer << [compressed.size, compressed].pack("Na*")
  buffer.string
end
compress_snappy_stream(data) click to toggle source
# File lib/logstash/outputs/webhdfs_helper.rb, line 83
def compress_snappy_stream(data)
  # Encode data to ASCII_8BIT (binary)
  data= data.encode(Encoding::ASCII_8BIT, "binary", :undef => :replace)
  buffer = StringIO.new
  buffer.set_encoding(Encoding::ASCII_8BIT)
  chunks = data.scan(/.{1,#{@snappy_bufsize}}/m)
  chunks.each do |chunk|
    compressed = Snappy.deflate(chunk)
    buffer << [chunk.size, compressed.size, compressed].pack("NNa*")
  end
  return buffer.string
end
get_snappy_header!() click to toggle source
# File lib/logstash/outputs/webhdfs_helper.rb, line 96
def get_snappy_header!
  [MAGIC, DEFAULT_VERSION, MINIMUM_COMPATIBLE_VERSION].pack("a8NN")
end
load_module(module_name) click to toggle source

Load a module @param module_name [String] A module name @raise [LoadError] If the module count not be loaded

# File lib/logstash/outputs/webhdfs_helper.rb, line 10
def load_module(module_name)
  begin
    require module_name
  rescue LoadError
    @logger.error("Module #{module_name} could not be loaded.")
    raise
  end
end
prepare_client(host, port, username) click to toggle source

Setup a WebHDFS client @param host [String] The WebHDFS location @param port [Number] The port used to do the communication @param username [String] A valid HDFS user @return [WebHDFS] An setup client instance

# File lib/logstash/outputs/webhdfs_helper.rb, line 24
def prepare_client(host, port, username)
  client = WebHDFS::Client.new(host, port, username)
  if @use_kerberos_auth
    require 'gssapi'
    client.kerberos = true
    client.kerberos_keytab = @kerberos_keytab
  end
  if @use_ssl_auth
    require 'openssl'
    client.ssl = true
    client.ssl_key = OpenSSL::PKey::RSA.new(open(@ssl_key))
    client.ssl_cert = OpenSSL::X509::Certificate.new(open(@ssl_cert))
  end
  client.httpfs_mode = @use_httpfs
  client.open_timeout = @open_timeout
  client.read_timeout = @read_timeout
  client.retry_known_errors = @retry_known_errors
  client.retry_interval = @retry_interval if @retry_interval
  client.retry_times = @retry_times if @retry_times
  client
end
test_client(client) click to toggle source
Test client connection.

@param client [WebHDFS] webhdfs client object.

# File lib/logstash/outputs/webhdfs_helper.rb, line 47
def test_client(client)
  begin
    client.list('/')
  rescue => e
    @logger.error("Webhdfs check request failed. (namenode: #{client.host}:#{client.port}, Exception: #{e.message})")
    raise
  end
end