class Fluent::Plugin::WebHDFSOutput

Constants

CHUNK_ID_PLACE_HOLDER
COMPRESSOR_REGISTRY
HOSTNAME_PLACEHOLDERS_DEPRECATED
SUPPORTED_COMPRESS
UUID_OTHER_PLACEHOLDERS_OBSOLETED
UUID_RANDOM_PLACEHOLDERS_DEPRECATED

Attributes

compressor[R]
formatter[R]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_webhdfs.rb, line 100
def initialize
  super
  @compressor = nil
  @standby_namenode_host = nil
  @output_include_tag = @output_include_time = nil # TODO: deprecated
  @header_separator = @field_separator = nil # TODO: deprecated
end
register_compressor(name, compressor) click to toggle source
# File lib/fluent/plugin/out_webhdfs.rb, line 537
def self.register_compressor(name, compressor)
  COMPRESSOR_REGISTRY.register(name, compressor)
end

Public Instance Methods

compat_parameters_convert_plaintextformatter(conf) click to toggle source
# File lib/fluent/plugin/out_webhdfs.rb, line 438
def compat_parameters_convert_plaintextformatter(conf)
  if !conf.elements('format').empty? || !conf['output_data_type']
    @using_formatter_config = true
    @null_convert_keys = []
    return
  end

  log.warn "webhdfs output plugin is working with old configuration parameters. use <inject>/<format> sections instead for further releases."
  @using_formatter_config = false
  @null_convert_keys = []

  @header_separator = case conf['field_separator']
                      when nil     then "\t"
                      when 'SPACE' then ' '
                      when 'TAB'   then "\t"
                      when 'COMMA' then ','
                      when 'SOH'   then "\x01"
                      else conf['field_separator']
                      end

  format_section = Fluent::Config::Element.new('format', '', {}, [])
  case conf['output_data_type']
  when '', 'json' # blank value is for compatibility reason (especially in testing)
    format_section['@type'] = 'json'
  when 'ltsv'
    format_section['@type'] = 'ltsv'
  else
    unless conf['output_data_type'].start_with?('attr:')
      raise Fluent::ConfigError, "output_data_type is invalid: #{conf['output_data_type']}"
    end
    format_section['@format'] = 'tsv'
    keys_part = conf['output_data_type'].sub(/^attr:/, '')
    @null_convert_keys = keys_part.split(',')
    format_section['keys'] = keys_part
    format_section['delimiter'] = case conf['field_separator']
                                  when nil then '\t'
                                  when 'SPACE' then ' '
                                  when 'TAB'   then '\t'
                                  when 'COMMA' then ','
                                  when 'SOH'   then 'SOH' # fixed later
                                  else conf['field_separator']
                                  end
  end

  conf.elements << format_section

  @output_include_time = conf.has_key?('output_include_time') ? Fluent::Config.bool_value(conf['output_include_time']) : true
  @output_include_tag = conf.has_key?('output_include_tag') ? Fluent::Config.bool_value(conf['output_include_tag']) : true

  if @output_include_time
    # default timezone is UTC
    using_localtime = if !conf.has_key?('utc') && !conf.has_key?('localtime')
                        false
                      elsif conf.has_key?('localtime') && conf.has_key?('utc')
                        raise Fluent::ConfigError, "specify either 'localtime' or 'utc'"
                      elsif conf.has_key?('localtime')
                        Fluent::Config.bool_value('localtime')
                      else
                        Fluent::Config.bool_value('utc')
                      end
    @time_formatter = Fluent::TimeFormatter.new(conf['time_format'], using_localtime)
  else
    @time_formatter = nil
  end
end
compress_context(chunk) { |tmp| ... } click to toggle source
# File lib/fluent/plugin/out_webhdfs.rb, line 355
def compress_context(chunk, &block)
  begin
    tmp = Tempfile.new("webhdfs-")
    @compressor.compress(chunk, tmp)
    tmp.rewind
    yield tmp
  ensure
    tmp.close(true) rescue nil
  end
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_webhdfs.rb, line 108
def configure(conf)
  # #compat_parameters_convert ignore time format in conf["path"],
  # so check conf["path"] and overwrite the default value later if needed
  timekey = case conf["path"]
            when /%S/ then 1
            when /%M/ then 60
            when /%H/ then 3600
            else 86400
            end
  if buffer_config = conf.elements(name: "buffer").first
    timekey = buffer_config["timekey"] || timekey
  end

  compat_parameters_convert(conf, :buffer, default_chunk_key: "time")

  if conf.elements(name: "buffer").empty?
    e = Fluent::Config::Element.new("buffer", "time", {}, [])
    conf.elements << e
  end
  buffer_config = conf.elements(name: "buffer").first
  # explicitly set timekey
  buffer_config["timekey"] = timekey

  compat_parameters_convert_plaintextformatter(conf)
  verify_config_placeholders_in_path!(conf)

  super

  @formatter = formatter_create

  if @using_formatter_config
    @null_value = nil
  else
    @formatter.delimiter = "\x01" if @formatter.respond_to?(:delimiter) && @formatter.delimiter == 'SOH'
    @null_value ||= 'NULL'
  end

  if @default_tag.nil? && !@using_formatter_config && @output_include_tag
    @default_tag = "tag_missing"
  end
  if @remove_prefix
    @remove_prefix_actual = @remove_prefix + "."
    @remove_prefix_actual_length = @remove_prefix_actual.length
  end

  @replace_random_uuid = @path.include?('%{uuid}') || @path.include?('%{uuid_flush}')
  if @replace_random_uuid
    # to check SecureRandom.uuid is available or not (NotImplementedError raised in such environment)
    begin
      SecureRandom.uuid
    rescue
      raise Fluent::ConfigError, "uuid feature (SecureRandom) is unavailable in this environment"
    end
  end

  @compressor = COMPRESSOR_REGISTRY.lookup(@compress.to_s).new
  @compressor.configure(conf)

  if @host
    @namenode_host = @host
    @namenode_port = @port
  elsif @namenode
    unless /\A([a-zA-Z0-9][-a-zA-Z0-9.]*):(\d+)\Z/ =~ @namenode
      raise Fluent::ConfigError, "Invalid config value about namenode: '#{@namenode}', needs NAMENODE_HOST:PORT"
    end
    @namenode_host = $1
    @namenode_port = $2.to_i
  else
    raise Fluent::ConfigError, "WebHDFS host or namenode missing"
  end
  if @standby_namenode
    unless /\A([a-zA-Z0-9][-a-zA-Z0-9.]*):(\d+)\Z/ =~ @standby_namenode
      raise Fluent::ConfigError, "Invalid config value about standby namenode: '#{@standby_namenode}', needs STANDBY_NAMENODE_HOST:PORT"
    end
    if @httpfs
      raise Fluent::ConfigError, "Invalid configuration: specified to use both of standby_namenode and httpfs."
    end
    @standby_namenode_host = $1
    @standby_namenode_port = $2.to_i
  end
  unless @path.index('/') == 0
    raise Fluent::ConfigError, "Path on hdfs MUST starts with '/', but '#{@path}'"
  end

  @renew_kerberos_delegation_token_interval_hour = nil
  @kerberos_delegation_token_max_lifetime_hour = nil
  if @renew_kerberos_delegation_token
    @kerberos_delegation_token_max_lifetime_hour = @kerberos_delegation_token_max_lifetime / 60 / 60
    unless @username
      raise Fluent::ConfigError, "username is missing. If you want to reuse delegation token, follow with kerberos accounts"
    end
    @renew_kerberos_delegation_token_interval_hour = @renew_kerberos_delegation_token_interval / 60 / 60
  end

  @client = prepare_client(@namenode_host, @namenode_port, @username)
  if @standby_namenode_host
    @client_standby = prepare_client(@standby_namenode_host, @standby_namenode_port, @username)
  else
    @client_standby = nil
  end

  unless @append
    if @path.index(CHUNK_ID_PLACE_HOLDER).nil?
      raise Fluent::ConfigError, "path must contain ${chunk_id}, which is the placeholder for chunk_id, when append is set to false."
    end
  end
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_webhdfs.rb, line 366
def format(tag, time, record)
  if @remove_prefix # TODO: remove when it's obsoleted
    if tag.start_with?(@remove_prefix_actual)
      if tag.length > @remove_prefix_actual_length
        tag = tag[@remove_prefix_actual_length..-1]
      else
        tag = @default_tag
      end
    elsif tag.start_with?(@remove_prefix)
      if tag == @remove_prefix
        tag = @default_tag
      else
        tag = tag.sub(@remove_prefix, '')
      end
    end
  end

  if @null_value # TODO: remove when it's obsoleted
    check_keys = (record.keys + @null_convert_keys).uniq
    check_keys.each do |key|
      record[key] = @null_value if record[key].nil?
    end
  end

  if @using_formatter_config
    record = inject_values_to_record(tag, time, record)
    line = @formatter.format(tag, time, record)
  else # TODO: remove when it's obsoleted
    time_str = @output_include_time ? @time_formatter.call(time) + @header_separator : ''
    tag_str = @output_include_tag ? tag + @header_separator : ''
    record_str = @formatter.format(tag, time, record)
    line = time_str + tag_str + record_str
  end
  line << "\n" if @end_with_newline && !line.end_with?("\n")
  line
rescue => e # remove this clause when @suppress_log_broken_string is obsoleted
  unless @suppress_log_broken_string
    log.info "unexpected error while formatting events, ignored", tag: tag, record: record, error: e
  end
  ''
end
generate_path(chunk) click to toggle source
# File lib/fluent/plugin/out_webhdfs.rb, line 340
def generate_path(chunk)
  hdfs_path = if @append
                extract_placeholders(@path, chunk)
              else
                extract_placeholders(@path.gsub(CHUNK_ID_PLACE_HOLDER, dump_unique_id_hex(chunk.unique_id)), chunk)
              end
  hdfs_ext = @extension || @compressor.ext
  hdfs_path = "#{hdfs_path}#{hdfs_ext}"
  if @replace_random_uuid
    uuid_random = SecureRandom.uuid
    hdfs_path = hdfs_path.gsub('%{uuid}', uuid_random).gsub('%{uuid_flush}', uuid_random)
  end
  hdfs_path
end
is_standby_exception(e) click to toggle source
# File lib/fluent/plugin/out_webhdfs.rb, line 277
def is_standby_exception(e)
  e.is_a?(WebHDFS::IOError) && e.message.match(/org\.apache\.hadoop\.ipc\.StandbyException/)
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_webhdfs.rb, line 216
def multi_workers_ready?
  true
end
namenode_available(client) click to toggle source
# File lib/fluent/plugin/out_webhdfs.rb, line 245
def namenode_available(client)
  if client
    available = true
    begin
      client.list('/')
    rescue => e
      log.warn "webhdfs check request failed. (namenode: #{client.host}:#{client.port}, error: #{e.message})"
      available = false
    end
    available
  else
    false
  end
end
namenode_failover() click to toggle source
# File lib/fluent/plugin/out_webhdfs.rb, line 281
def namenode_failover
  if @standby_namenode
    @client, @client_standby = @client_standby, @client
    log.warn "Namenode failovered, now using #{@client.host}:#{@client.port}."
  end
end
path_exists?(path) click to toggle source
# File lib/fluent/plugin/out_webhdfs.rb, line 298
def path_exists?(path)
  @client.stat(path)
  true
rescue WebHDFS::FileNotFoundError
  false
end
prepare_client(host, port, username) click to toggle source
# File lib/fluent/plugin/out_webhdfs.rb, line 220
def prepare_client(host, port, username)
  client = WebHDFS::Client.new(host, port, username, nil, nil, nil, {}, @renew_kerberos_delegation_token_interval_hour, @kerberos_delegation_token_max_lifetime_hour)
  if @httpfs
    client.httpfs_mode = true
  end
  client.open_timeout = @open_timeout
  client.read_timeout = @read_timeout
  if @retry_known_errors
    client.retry_known_errors = true
    client.retry_interval = @retry_interval if @retry_interval
    client.retry_times = @retry_times if @retry_times
  end
  if @ssl
    client.ssl = true
    client.ssl_ca_file = @ssl_ca_file if @ssl_ca_file
    client.ssl_verify_mode = @ssl_verify_mode if @ssl_verify_mode
  end
  if @kerberos
    client.kerberos = true
    client.kerberos_keytab = @kerberos_keytab if @kerberos_keytab
  end

  client
end
send_data(path, data) click to toggle source
# File lib/fluent/plugin/out_webhdfs.rb, line 288
def send_data(path, data)
  return @client.create(path, data, {'overwrite' => 'true'}) unless @append

  if path_exists?(path)
    @client.append(path, data)
  else
    @client.create(path, data)
  end
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_webhdfs.rb, line 260
def start
  super

  if namenode_available(@client)
    log.info "webhdfs connection confirmed: #{@namenode_host}:#{@namenode_port}"
    return
  end
  if @client_standby && namenode_available(@client_standby)
    log.info "webhdfs connection confirmed: #{@standby_namenode_host}:#{@standby_namenode_port}"
    return
  end

  unless @ignore_start_check_error
    raise RuntimeError, "webhdfs is not available now."
  end
end
verify_config_placeholders_in_path!(conf) click to toggle source
# File lib/fluent/plugin/out_webhdfs.rb, line 309
def verify_config_placeholders_in_path!(conf)
  return unless conf.has_key?('path')

  path = conf['path']

  # check @path for ${hostname}, %{hostname} and __HOSTNAME__ to warn to use #{Socket.gethostbyname}
  if HOSTNAME_PLACEHOLDERS_DEPRECATED.any?{|ph| path.include?(ph) }
    log.warn "hostname placeholder is now deprecated. use '\#\{Socket.gethostname\}' instead."
    hostname = conf['hostname'] || Socket.gethostname
    HOSTNAME_PLACEHOLDERS_DEPRECATED.each do |ph|
      path.gsub!(ph, hostname)
    end
  end

  if UUID_RANDOM_PLACEHOLDERS_DEPRECATED.any?{|ph| path.include?(ph) }
    log.warn "random uuid placeholders are now deprecated. use %{uuid} (or %{uuid_flush}) instead."
    UUID_RANDOM_PLACEHOLDERS_DEPRECATED.each do |ph|
      path.gsub!(ph, '%{uuid}')
    end
  end

  if UUID_OTHER_PLACEHOLDERS_OBSOLETED.any?{|ph| path.include?(ph) }
    UUID_OTHER_PLACEHOLDERS_OBSOLETED.each do |ph|
      if path.include?(ph)
        log.error "configuration placeholder #{ph} is now unsupported by webhdfs output plugin."
      end
    end
    raise ConfigError, "there are unsupported placeholders in path."
  end
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_webhdfs.rb, line 408
def write(chunk)
  hdfs_path = generate_path(chunk)

  failovered = false
  begin
    compress_context(chunk) do |data|
      send_data(hdfs_path, data)
    end
  rescue => e
    log.warn "failed to communicate hdfs cluster, path: #{hdfs_path}"

    raise e if !@client_standby || failovered

    if is_standby_exception(e) && namenode_available(@client_standby)
      log.warn "Seems the connected host status is not active (maybe due to failovers). Gonna try another namenode immediately."
      namenode_failover
      failovered = true
      retry
    end
    if @num_errors && ((@num_errors + 1) >= @failures_before_use_standby) && namenode_available(@client_standby)
      log.warn "Too many failures. Try to use the standby namenode instead."
      namenode_failover
      failovered = true
      retry
    end
    raise e
  end
  hdfs_path
end