class LogStash::Outputs::HDFS
HDFS
output.
Write events to files to HDFS
. You can use fields from the event as parts of the filename.
Public Instance Methods
receive(event)
click to toggle source
# File lib/logstash/outputs/hdfs.rb, line 64 def receive(event) return unless output?(event) out = get_output_stream(event.sprintf(@path)) if @message_format output = event.sprintf(@message_format) else output = event.to_json end output += "\n" unless output.end_with? "\n" out.write(output) flush(out) close_stale_files end
register()
click to toggle source
# File lib/logstash/outputs/hdfs.rb, line 39 def register require "java" java_import "org.apache.hadoop.fs.Path" java_import "org.apache.hadoop.fs.FileSystem" java_import "org.apache.hadoop.conf.Configuration" @files = {} now = Time.now @last_flush_cycle = now @last_stale_cleanup_cycle = now flush_interval = @flush_interval.to_i @stale_cleanup_interval = 10 conf = Configuration.new if @hadoop_config_resources @hadoop_config_resources.each { |resource| conf.addResource(resource) } end @logger.info "Using Hadoop configuration: #{conf.get("fs.defaultFS")}" @hdfs = FileSystem.get(conf) end
teardown()
click to toggle source
# File lib/logstash/outputs/hdfs.rb, line 81 def teardown @logger.debug("Teardown: closing files") @files.each do |path, fd| begin fd.close @logger.debug("Closed file #{path}", :fd => fd) rescue Exception => e @logger.error("Excpetion while flushing and closing files.", :exception => e) end end finished end
Private Instance Methods
close_stale_files()
click to toggle source
every 10 seconds or so (triggered by events, but if there are no events there's no point closing files anyway)
# File lib/logstash/outputs/hdfs.rb, line 139 def close_stale_files now = Time.now return unless now - @last_stale_cleanup_cycle >= @stale_cleanup_interval @logger.info("Starting stale files cleanup cycle", :files => @files) inactive_files = @files.select { |path, file| not file.active } @logger.debug("%d stale files found" % inactive_files.count, :inactive_files => inactive_files) inactive_files.each do |path, file| @logger.info("Closing file %s" % path) file.close @files.delete(path) end # mark all files as inactive, a call to write will mark them as active again @files.each { |path, fd| fd.active = false } @last_stale_cleanup_cycle = now end
flush(fd)
click to toggle source
# File lib/logstash/outputs/hdfs.rb, line 119 def flush(fd) if flush_interval > 0 flush_pending_files else fd.flush end end
flush_pending_files()
click to toggle source
every flush_interval seconds or so (triggered by events, but if there are no events there's no point flushing files anyway)
# File lib/logstash/outputs/hdfs.rb, line 128 def flush_pending_files return unless Time.now - @last_flush_cycle >= flush_interval @logger.debug("Starting flush cycle") @files.each do |path, fd| @logger.debug("Flushing file", :path => path, :fd => fd) fd.flush end @last_flush_cycle = Time.now end
get_output_stream(path_string)
click to toggle source
# File lib/logstash/outputs/hdfs.rb, line 95 def get_output_stream(path_string) return @files[path_string] if @files.has_key?(path_string) path = Path.new(path_string) if @hdfs.exists(path) if enable_append begin dfs_data_output_stream = @hdfs.append(path) rescue java.io.IOException => e logger.error("Error opening path for append, trying to recover lease", :exception => e) recover_lease(path) retry end elsif enable_reopen logger.warn "Overwritting HDFS file", :path => path_string dfs_data_output_stream = @hdfs.create(path, true) else raise IOError, "Cowardly refusing to open pre existing file (#{path_string}) because HDFS will truncate the file!" end else dfs_data_output_stream = @hdfs.create(path) end @files[path_string] = DFSOutputStreamWrapper.new(dfs_data_output_stream) end
recover_lease(path)
click to toggle source
# File lib/logstash/outputs/hdfs.rb, line 155 def recover_lease(path) is_file_closed_available = @hdfs.respond_to? :isFileClosed # Not all Hadoop file systems support recover lease (e.g. LocalFileSystem) return true unless @hdfs.respond_to? :recoverLease start = Time.now first_retry = true until start - Time.now > 900 # 15 minutes timeout recovered = @hdfs.recoverLease(path) return true if recovered # first retry is fast if first_retry sleep 4 first_retry = false next end # on further retries we backoff and spin on isFileClosed in hopes of catching an early break 61.times do return if is_file_closed_available and @hdfs.isFileClosed(path) sleep 1 end end false end