class Fluent::Plugin::WebHDFSOutput
Constants
- CHUNK_ID_PLACE_HOLDER
- COMPRESSOR_REGISTRY
- HOSTNAME_PLACEHOLDERS_DEPRECATED
- SUPPORTED_COMPRESS
- UUID_OTHER_PLACEHOLDERS_OBSOLETED
- UUID_RANDOM_PLACEHOLDERS_DEPRECATED
Attributes
compressor[R]
formatter[R]
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_webhdfs.rb, line 100 def initialize super @compressor = nil @standby_namenode_host = nil @output_include_tag = @output_include_time = nil # TODO: deprecated @header_separator = @field_separator = nil # TODO: deprecated end
register_compressor(name, compressor)
click to toggle source
# File lib/fluent/plugin/out_webhdfs.rb, line 537 def self.register_compressor(name, compressor) COMPRESSOR_REGISTRY.register(name, compressor) end
Public Instance Methods
compat_parameters_convert_plaintextformatter(conf)
click to toggle source
# File lib/fluent/plugin/out_webhdfs.rb, line 438 def compat_parameters_convert_plaintextformatter(conf) if !conf.elements('format').empty? || !conf['output_data_type'] @using_formatter_config = true @null_convert_keys = [] return end log.warn "webhdfs output plugin is working with old configuration parameters. use <inject>/<format> sections instead for further releases." @using_formatter_config = false @null_convert_keys = [] @header_separator = case conf['field_separator'] when nil then "\t" when 'SPACE' then ' ' when 'TAB' then "\t" when 'COMMA' then ',' when 'SOH' then "\x01" else conf['field_separator'] end format_section = Fluent::Config::Element.new('format', '', {}, []) case conf['output_data_type'] when '', 'json' # blank value is for compatibility reason (especially in testing) format_section['@type'] = 'json' when 'ltsv' format_section['@type'] = 'ltsv' else unless conf['output_data_type'].start_with?('attr:') raise Fluent::ConfigError, "output_data_type is invalid: #{conf['output_data_type']}" end format_section['@format'] = 'tsv' keys_part = conf['output_data_type'].sub(/^attr:/, '') @null_convert_keys = keys_part.split(',') format_section['keys'] = keys_part format_section['delimiter'] = case conf['field_separator'] when nil then '\t' when 'SPACE' then ' ' when 'TAB' then '\t' when 'COMMA' then ',' when 'SOH' then 'SOH' # fixed later else conf['field_separator'] end end conf.elements << format_section @output_include_time = conf.has_key?('output_include_time') ? Fluent::Config.bool_value(conf['output_include_time']) : true @output_include_tag = conf.has_key?('output_include_tag') ? Fluent::Config.bool_value(conf['output_include_tag']) : true if @output_include_time # default timezone is UTC using_localtime = if !conf.has_key?('utc') && !conf.has_key?('localtime') false elsif conf.has_key?('localtime') && conf.has_key?('utc') raise Fluent::ConfigError, "specify either 'localtime' or 'utc'" elsif conf.has_key?('localtime') Fluent::Config.bool_value('localtime') else Fluent::Config.bool_value('utc') end @time_formatter = Fluent::TimeFormatter.new(conf['time_format'], using_localtime) else @time_formatter = nil end end
compress_context(chunk) { |tmp| ... }
click to toggle source
# File lib/fluent/plugin/out_webhdfs.rb, line 355 def compress_context(chunk, &block) begin tmp = Tempfile.new("webhdfs-") @compressor.compress(chunk, tmp) tmp.rewind yield tmp ensure tmp.close(true) rescue nil end end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_webhdfs.rb, line 108 def configure(conf) # #compat_parameters_convert ignore time format in conf["path"], # so check conf["path"] and overwrite the default value later if needed timekey = case conf["path"] when /%S/ then 1 when /%M/ then 60 when /%H/ then 3600 else 86400 end if buffer_config = conf.elements(name: "buffer").first timekey = buffer_config["timekey"] || timekey end compat_parameters_convert(conf, :buffer, default_chunk_key: "time") if conf.elements(name: "buffer").empty? e = Fluent::Config::Element.new("buffer", "time", {}, []) conf.elements << e end buffer_config = conf.elements(name: "buffer").first # explicitly set timekey buffer_config["timekey"] = timekey compat_parameters_convert_plaintextformatter(conf) verify_config_placeholders_in_path!(conf) super @formatter = formatter_create if @using_formatter_config @null_value = nil else @formatter.delimiter = "\x01" if @formatter.respond_to?(:delimiter) && @formatter.delimiter == 'SOH' @null_value ||= 'NULL' end if @default_tag.nil? && !@using_formatter_config && @output_include_tag @default_tag = "tag_missing" end if @remove_prefix @remove_prefix_actual = @remove_prefix + "." @remove_prefix_actual_length = @remove_prefix_actual.length end @replace_random_uuid = @path.include?('%{uuid}') || @path.include?('%{uuid_flush}') if @replace_random_uuid # to check SecureRandom.uuid is available or not (NotImplementedError raised in such environment) begin SecureRandom.uuid rescue raise Fluent::ConfigError, "uuid feature (SecureRandom) is unavailable in this environment" end end @compressor = COMPRESSOR_REGISTRY.lookup(@compress.to_s).new @compressor.configure(conf) if @host @namenode_host = @host @namenode_port = @port elsif @namenode unless /\A([a-zA-Z0-9][-a-zA-Z0-9.]*):(\d+)\Z/ =~ @namenode raise Fluent::ConfigError, "Invalid config value about namenode: '#{@namenode}', needs NAMENODE_HOST:PORT" end @namenode_host = $1 @namenode_port = $2.to_i else raise Fluent::ConfigError, "WebHDFS host or namenode missing" end if @standby_namenode unless /\A([a-zA-Z0-9][-a-zA-Z0-9.]*):(\d+)\Z/ =~ @standby_namenode raise Fluent::ConfigError, "Invalid config value about standby namenode: '#{@standby_namenode}', needs STANDBY_NAMENODE_HOST:PORT" end if @httpfs raise Fluent::ConfigError, "Invalid configuration: specified to use both of standby_namenode and httpfs." end @standby_namenode_host = $1 @standby_namenode_port = $2.to_i end unless @path.index('/') == 0 raise Fluent::ConfigError, "Path on hdfs MUST starts with '/', but '#{@path}'" end @renew_kerberos_delegation_token_interval_hour = nil @kerberos_delegation_token_max_lifetime_hour = nil if @renew_kerberos_delegation_token @kerberos_delegation_token_max_lifetime_hour = @kerberos_delegation_token_max_lifetime / 60 / 60 unless @username raise Fluent::ConfigError, "username is missing. If you want to reuse delegation token, follow with kerberos accounts" end @renew_kerberos_delegation_token_interval_hour = @renew_kerberos_delegation_token_interval / 60 / 60 end @client = prepare_client(@namenode_host, @namenode_port, @username) if @standby_namenode_host @client_standby = prepare_client(@standby_namenode_host, @standby_namenode_port, @username) else @client_standby = nil end unless @append if @path.index(CHUNK_ID_PLACE_HOLDER).nil? raise Fluent::ConfigError, "path must contain ${chunk_id}, which is the placeholder for chunk_id, when append is set to false." end end end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_webhdfs.rb, line 366 def format(tag, time, record) if @remove_prefix # TODO: remove when it's obsoleted if tag.start_with?(@remove_prefix_actual) if tag.length > @remove_prefix_actual_length tag = tag[@remove_prefix_actual_length..-1] else tag = @default_tag end elsif tag.start_with?(@remove_prefix) if tag == @remove_prefix tag = @default_tag else tag = tag.sub(@remove_prefix, '') end end end if @null_value # TODO: remove when it's obsoleted check_keys = (record.keys + @null_convert_keys).uniq check_keys.each do |key| record[key] = @null_value if record[key].nil? end end if @using_formatter_config record = inject_values_to_record(tag, time, record) line = @formatter.format(tag, time, record) else # TODO: remove when it's obsoleted time_str = @output_include_time ? @time_formatter.call(time) + @header_separator : '' tag_str = @output_include_tag ? tag + @header_separator : '' record_str = @formatter.format(tag, time, record) line = time_str + tag_str + record_str end line << "\n" if @end_with_newline && !line.end_with?("\n") line rescue => e # remove this clause when @suppress_log_broken_string is obsoleted unless @suppress_log_broken_string log.info "unexpected error while formatting events, ignored", tag: tag, record: record, error: e end '' end
generate_path(chunk)
click to toggle source
# File lib/fluent/plugin/out_webhdfs.rb, line 340 def generate_path(chunk) hdfs_path = if @append extract_placeholders(@path, chunk) else extract_placeholders(@path.gsub(CHUNK_ID_PLACE_HOLDER, dump_unique_id_hex(chunk.unique_id)), chunk) end hdfs_ext = @extension || @compressor.ext hdfs_path = "#{hdfs_path}#{hdfs_ext}" if @replace_random_uuid uuid_random = SecureRandom.uuid hdfs_path = hdfs_path.gsub('%{uuid}', uuid_random).gsub('%{uuid_flush}', uuid_random) end hdfs_path end
is_standby_exception(e)
click to toggle source
# File lib/fluent/plugin/out_webhdfs.rb, line 277 def is_standby_exception(e) e.is_a?(WebHDFS::IOError) && e.message.match(/org\.apache\.hadoop\.ipc\.StandbyException/) end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_webhdfs.rb, line 216 def multi_workers_ready? true end
namenode_available(client)
click to toggle source
# File lib/fluent/plugin/out_webhdfs.rb, line 245 def namenode_available(client) if client available = true begin client.list('/') rescue => e log.warn "webhdfs check request failed. (namenode: #{client.host}:#{client.port}, error: #{e.message})" available = false end available else false end end
namenode_failover()
click to toggle source
# File lib/fluent/plugin/out_webhdfs.rb, line 281 def namenode_failover if @standby_namenode @client, @client_standby = @client_standby, @client log.warn "Namenode failovered, now using #{@client.host}:#{@client.port}." end end
path_exists?(path)
click to toggle source
# File lib/fluent/plugin/out_webhdfs.rb, line 298 def path_exists?(path) @client.stat(path) true rescue WebHDFS::FileNotFoundError false end
prepare_client(host, port, username)
click to toggle source
# File lib/fluent/plugin/out_webhdfs.rb, line 220 def prepare_client(host, port, username) client = WebHDFS::Client.new(host, port, username, nil, nil, nil, {}, @renew_kerberos_delegation_token_interval_hour, @kerberos_delegation_token_max_lifetime_hour) if @httpfs client.httpfs_mode = true end client.open_timeout = @open_timeout client.read_timeout = @read_timeout if @retry_known_errors client.retry_known_errors = true client.retry_interval = @retry_interval if @retry_interval client.retry_times = @retry_times if @retry_times end if @ssl client.ssl = true client.ssl_ca_file = @ssl_ca_file if @ssl_ca_file client.ssl_verify_mode = @ssl_verify_mode if @ssl_verify_mode end if @kerberos client.kerberos = true client.kerberos_keytab = @kerberos_keytab if @kerberos_keytab end client end
send_data(path, data)
click to toggle source
# File lib/fluent/plugin/out_webhdfs.rb, line 288 def send_data(path, data) return @client.create(path, data, {'overwrite' => 'true'}) unless @append if path_exists?(path) @client.append(path, data) else @client.create(path, data) end end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_webhdfs.rb, line 260 def start super if namenode_available(@client) log.info "webhdfs connection confirmed: #{@namenode_host}:#{@namenode_port}" return end if @client_standby && namenode_available(@client_standby) log.info "webhdfs connection confirmed: #{@standby_namenode_host}:#{@standby_namenode_port}" return end unless @ignore_start_check_error raise RuntimeError, "webhdfs is not available now." end end
verify_config_placeholders_in_path!(conf)
click to toggle source
# File lib/fluent/plugin/out_webhdfs.rb, line 309 def verify_config_placeholders_in_path!(conf) return unless conf.has_key?('path') path = conf['path'] # check @path for ${hostname}, %{hostname} and __HOSTNAME__ to warn to use #{Socket.gethostbyname} if HOSTNAME_PLACEHOLDERS_DEPRECATED.any?{|ph| path.include?(ph) } log.warn "hostname placeholder is now deprecated. use '\#\{Socket.gethostname\}' instead." hostname = conf['hostname'] || Socket.gethostname HOSTNAME_PLACEHOLDERS_DEPRECATED.each do |ph| path.gsub!(ph, hostname) end end if UUID_RANDOM_PLACEHOLDERS_DEPRECATED.any?{|ph| path.include?(ph) } log.warn "random uuid placeholders are now deprecated. use %{uuid} (or %{uuid_flush}) instead." UUID_RANDOM_PLACEHOLDERS_DEPRECATED.each do |ph| path.gsub!(ph, '%{uuid}') end end if UUID_OTHER_PLACEHOLDERS_OBSOLETED.any?{|ph| path.include?(ph) } UUID_OTHER_PLACEHOLDERS_OBSOLETED.each do |ph| if path.include?(ph) log.error "configuration placeholder #{ph} is now unsupported by webhdfs output plugin." end end raise ConfigError, "there are unsupported placeholders in path." end end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_webhdfs.rb, line 408 def write(chunk) hdfs_path = generate_path(chunk) failovered = false begin compress_context(chunk) do |data| send_data(hdfs_path, data) end rescue => e log.warn "failed to communicate hdfs cluster, path: #{hdfs_path}" raise e if !@client_standby || failovered if is_standby_exception(e) && namenode_available(@client_standby) log.warn "Seems the connected host status is not active (maybe due to failovers). Gonna try another namenode immediately." namenode_failover failovered = true retry end if @num_errors && ((@num_errors + 1) >= @failures_before_use_standby) && namenode_available(@client_standby) log.warn "Too many failures. Try to use the standby namenode instead." namenode_failover failovered = true retry end raise e end hdfs_path end