class LogStash::Outputs::HDFS

HDFS output.

Write events to files to HDFS. You can use fields from the event as parts of the filename.

Public Instance Methods

receive(event) click to toggle source
# File lib/logstash/outputs/hdfs.rb, line 64
def receive(event)
  return unless output?(event)
  out = get_output_stream(event.sprintf(@path))

  if @message_format
    output = event.sprintf(@message_format)
  else
    output = event.to_json
  end
  output += "\n" unless output.end_with? "\n"

  out.write(output)

  flush(out)
  close_stale_files
end
register() click to toggle source
# File lib/logstash/outputs/hdfs.rb, line 39
def register
  require "java"
  java_import "org.apache.hadoop.fs.Path"
  java_import "org.apache.hadoop.fs.FileSystem"
  java_import "org.apache.hadoop.conf.Configuration"

  @files = {}
  now = Time.now
  @last_flush_cycle = now
  @last_stale_cleanup_cycle = now
  flush_interval = @flush_interval.to_i
  @stale_cleanup_interval = 10
  conf = Configuration.new

  if @hadoop_config_resources
    @hadoop_config_resources.each { |resource|
      conf.addResource(resource)
    }
  end

  @logger.info "Using Hadoop configuration: #{conf.get("fs.defaultFS")}"
  @hdfs = FileSystem.get(conf)
end
teardown() click to toggle source
# File lib/logstash/outputs/hdfs.rb, line 81
def teardown
  @logger.debug("Teardown: closing files")
  @files.each do |path, fd|
    begin
      fd.close
      @logger.debug("Closed file #{path}", :fd => fd)
    rescue Exception => e
      @logger.error("Excpetion while flushing and closing files.", :exception => e)
    end
  end
  finished
end

Private Instance Methods

close_stale_files() click to toggle source

every 10 seconds or so (triggered by events, but if there are no events there's no point closing files anyway)

# File lib/logstash/outputs/hdfs.rb, line 139
def close_stale_files
  now = Time.now
  return unless now - @last_stale_cleanup_cycle >= @stale_cleanup_interval
  @logger.info("Starting stale files cleanup cycle", :files => @files)
  inactive_files = @files.select { |path, file| not file.active }
  @logger.debug("%d stale files found" % inactive_files.count, :inactive_files => inactive_files)
  inactive_files.each do |path, file|
    @logger.info("Closing file %s" % path)
    file.close
    @files.delete(path)
  end
  # mark all files as inactive, a call to write will mark them as active again
  @files.each { |path, fd| fd.active = false }
  @last_stale_cleanup_cycle = now
end
flush(fd) click to toggle source
# File lib/logstash/outputs/hdfs.rb, line 119
def flush(fd)
  if flush_interval > 0
    flush_pending_files
  else
    fd.flush
  end
end
flush_pending_files() click to toggle source

every flush_interval seconds or so (triggered by events, but if there are no events there's no point flushing files anyway)

# File lib/logstash/outputs/hdfs.rb, line 128
def flush_pending_files
  return unless Time.now - @last_flush_cycle >= flush_interval
  @logger.debug("Starting flush cycle")
  @files.each do |path, fd|
    @logger.debug("Flushing file", :path => path, :fd => fd)
    fd.flush
  end
  @last_flush_cycle = Time.now
end
get_output_stream(path_string) click to toggle source
# File lib/logstash/outputs/hdfs.rb, line 95
def get_output_stream(path_string)
  return @files[path_string] if @files.has_key?(path_string)
  path = Path.new(path_string)
  if @hdfs.exists(path)
    if enable_append
      begin
        dfs_data_output_stream = @hdfs.append(path)
      rescue java.io.IOException => e
        logger.error("Error opening path for append, trying to recover lease", :exception => e)
        recover_lease(path)
        retry
      end
    elsif enable_reopen
      logger.warn "Overwritting HDFS file", :path => path_string
      dfs_data_output_stream = @hdfs.create(path, true)
    else
      raise IOError, "Cowardly refusing to open pre existing file (#{path_string}) because HDFS will truncate the file!"
    end
  else
    dfs_data_output_stream = @hdfs.create(path)
  end
  @files[path_string] = DFSOutputStreamWrapper.new(dfs_data_output_stream)
end
recover_lease(path) click to toggle source
# File lib/logstash/outputs/hdfs.rb, line 155
def recover_lease(path)
  is_file_closed_available = @hdfs.respond_to? :isFileClosed

  # Not all Hadoop file systems support recover lease (e.g. LocalFileSystem)
  return true unless @hdfs.respond_to? :recoverLease

  start = Time.now
  first_retry = true

  until start - Time.now > 900 # 15 minutes timeout
    recovered = @hdfs.recoverLease(path)
    return true if recovered
    # first retry is fast
    if first_retry
      sleep 4
      first_retry = false
      next
    end

    # on further retries we backoff and spin on isFileClosed in hopes of catching an early break
    61.times do
      return if is_file_closed_available and @hdfs.isFileClosed(path)
      sleep 1
    end
  end
  false
end