class LogStash::Outputs::WebHdfs

This plugin sends Logstash events into files in HDFS via the hadoop.apache.org/docs/r1.0.4/webhdfs.html[webhdfs] REST API.

Dependencies

This plugin has no dependency on jars from hadoop, thus reducing configuration and compatibility problems. It uses the webhdfs gem from Kazuki Ohta and TAGOMORI Satoshi (@see: github.com/kzk/webhdfs). Optional dependencies are zlib and snappy gem if you use the compression functionality.

Operational Notes

If you get an error like:

Max write retries reached. Exception: initialize: name or service not known {:level=>:error}

make sure that the hostname of your namenode is resolvable on the host running Logstash. When creating/appending to a file, webhdfs somtime sends a `307 TEMPORARY_REDIRECT` with the `HOSTNAME` of the machine its running on.

Usage

This is an example of Logstash config:

source,ruby

input {

...

} filter {

...

} output {

webhdfs {
  host => "127.0.0.1"                 # (required)
  port => 50070                       # (optional, default: 50070)
  path => "/user/logstash/dt=%{+YYYY-MM-dd}/logstash-%{+HH}.log"  # (required)
  user => "hue"                       # (required)
}

}


Constants

DEFAULT_VERSION
MAGIC
MINIMUM_COMPATIBLE_VERSION

Public Instance Methods

close() click to toggle source
# File lib/logstash/outputs/webhdfs.rb, line 258
def close
  buffer_flush(:final => true)
end
do_failover() click to toggle source
# File lib/logstash/outputs/webhdfs.rb, line 250
def do_failover
  if not @standby_client
    return
  end
  @logger.warn("Failing over from #{@client.host}:#{@client.port} to #{@standby_client.host}:#{@standby_client.port}.")
  @client, @standby_client = @standby_client, @client
end
flush(events=nil, close=false) click to toggle source
# File lib/logstash/outputs/webhdfs.rb, line 185
def flush(events=nil, close=false)
  return if not events
  newline = "\n"
  output_files = Hash.new { |hash, key| hash[key] = "" }
  events.collect do |event|
    # Add thread_id to event metadata to be used as format value in path configuration.
    if @single_file_per_thread
      event.set("[@metadata][thread_id]", Thread.current.object_id.to_s)
    end
    path = event.sprintf(@path)
    event_as_string = @codec.encode(event)
    event_as_string += newline unless event_as_string.end_with? newline
    output_files[path] << event_as_string
  end
  output_files.each do |path, output|
    if @compression == "gzip"
      path += ".gz"
      output = compress_gzip(output)
    elsif @compression == "snappy"
      path += ".snappy"
      if @snappy_format == "file"
        output = compress_snappy_file(output)
      elsif
        output = compress_snappy_stream(output)
      end
    end
    write_data(path, output)
  end
end
receive(event) click to toggle source
# File lib/logstash/outputs/webhdfs.rb, line 181
def receive(event)
  buffer_receive(event)
end
register() click to toggle source
# File lib/logstash/outputs/webhdfs.rb, line 134
def register
  load_module('webhdfs')
  if @compression == "gzip"
    load_module('zlib')
  elsif @compression == "snappy"
    load_module('snappy')
  end
  @main_namenode_failed = false
  @standby_client = false
  @files = {}
  # Create and test standby client if configured.
  if @standby_host
    @standby_client = prepare_client(@standby_host, @standby_port, @user)
    begin
      test_client(@standby_client)
    rescue => e
      logger.warn("Could not connect to standby namenode #{@standby_client.host}. Error: #{e.message}. Trying main webhdfs namenode.")
    end
  end
  @client = prepare_client(@host, @port, @user)
  begin
    test_client(@client)
  rescue => e
    # If no standy host is configured, we need to exit here.
    if not @standby_host
      raise
    else
      # If a standby host is configured, try this before giving up.
      logger.error("Could not connect to #{@client.host}:#{@client.port}. Error: #{e.message}")
      do_failover
    end
  end
  # Make sure @path contains %{[@metadata][thread_id]} format value if @single_file_per_thread is set to true.
  if @single_file_per_thread and !@path.include? "%{[@metadata][thread_id]}"
    @logger.error("Please set %{[@metadata][thread_id]} format value in @path if @single_file_per_thread is active.")
    raise LogStash::ConfigurationError
  end
  buffer_initialize(
    :max_items => @flush_size,
    :max_interval => @idle_flush_time,
    :logger => @logger
  )
  @codec.on_event do |event, encoded_event|
    encoded_event
  end
end
write_data(path, data) click to toggle source
# File lib/logstash/outputs/webhdfs.rb, line 215
def write_data(path, data)
  # Retry max_retry times. This can solve problems like leases being hold by another process. Sadly this is no
  # KNOWN_ERROR in rubys webhdfs client.
  write_tries = 0
  begin
    # Try to append to already existing file, which will work most of the times.
    @client.append(path, data)
    # File does not exist, so create it.
  rescue WebHDFS::FileNotFoundError
    # Add snappy header if format is "file".
    if @compression == "snappy" and @snappy_format == "file"
      @client.create(path, get_snappy_header! + data)
    elsif
      @client.create(path, data)
    end
  # Handle other write errors and retry to write max. @retry_times.
  rescue => e
    # Handle StandbyException and do failover. Still we want to exit if write_tries >= @retry_times.
    if @standby_client && (e.message.match(/Failed to connect to host/) || e.message.match(/StandbyException/))
      do_failover
      write_tries += 1
      retry
    end
    if write_tries < @retry_times
      @logger.warn("webhdfs write caused an exception: #{e.message}. Maybe you should increase retry_interval or reduce number of workers. Retrying...")
      sleep(@retry_interval * write_tries)
      write_tries += 1
      retry
    else
      # Issue error after max retries.
      @logger.error("Max write retries reached. Events will be discarded. Exception: #{e.message}")
    end
  end
end