class LogStash::Outputs::WebHdfs
This plugin sends Logstash events into files in HDFS via the hadoop.apache.org/docs/r1.0.4/webhdfs.html[webhdfs] REST API.
Dependencies¶ ↑
This plugin has no dependency on jars from hadoop, thus reducing configuration and compatibility problems. It uses the webhdfs gem from Kazuki Ohta and TAGOMORI Satoshi (@see: github.com/kzk/webhdfs). Optional dependencies are zlib and snappy gem if you use the compression functionality.
Operational Notes¶ ↑
If you get an error like:
Max write retries reached. Exception: initialize: name or service not known {:level=>:error}
make sure that the hostname of your namenode is resolvable on the host running Logstash. When creating/appending to a file, webhdfs somtime sends a `307 TEMPORARY_REDIRECT` with the `HOSTNAME` of the machine its running on.
Usage¶ ↑
This is an example of Logstash config:
- source,ruby
input {
...
} filter {
...
} output {
webhdfs { host => "127.0.0.1" # (required) port => 50070 # (optional, default: 50070) path => "/user/logstash/dt=%{+YYYY-MM-dd}/logstash-%{+HH}.log" # (required) user => "hue" # (required) }
}
Constants
- DEFAULT_VERSION
- MAGIC
- MINIMUM_COMPATIBLE_VERSION
Public Instance Methods
close()
click to toggle source
# File lib/logstash/outputs/webhdfs.rb, line 258 def close buffer_flush(:final => true) end
do_failover()
click to toggle source
# File lib/logstash/outputs/webhdfs.rb, line 250 def do_failover if not @standby_client return end @logger.warn("Failing over from #{@client.host}:#{@client.port} to #{@standby_client.host}:#{@standby_client.port}.") @client, @standby_client = @standby_client, @client end
flush(events=nil, close=false)
click to toggle source
# File lib/logstash/outputs/webhdfs.rb, line 185 def flush(events=nil, close=false) return if not events newline = "\n" output_files = Hash.new { |hash, key| hash[key] = "" } events.collect do |event| # Add thread_id to event metadata to be used as format value in path configuration. if @single_file_per_thread event.set("[@metadata][thread_id]", Thread.current.object_id.to_s) end path = event.sprintf(@path) event_as_string = @codec.encode(event) event_as_string += newline unless event_as_string.end_with? newline output_files[path] << event_as_string end output_files.each do |path, output| if @compression == "gzip" path += ".gz" output = compress_gzip(output) elsif @compression == "snappy" path += ".snappy" if @snappy_format == "file" output = compress_snappy_file(output) elsif output = compress_snappy_stream(output) end end write_data(path, output) end end
receive(event)
click to toggle source
# File lib/logstash/outputs/webhdfs.rb, line 181 def receive(event) buffer_receive(event) end
register()
click to toggle source
# File lib/logstash/outputs/webhdfs.rb, line 134 def register load_module('webhdfs') if @compression == "gzip" load_module('zlib') elsif @compression == "snappy" load_module('snappy') end @main_namenode_failed = false @standby_client = false @files = {} # Create and test standby client if configured. if @standby_host @standby_client = prepare_client(@standby_host, @standby_port, @user) begin test_client(@standby_client) rescue => e logger.warn("Could not connect to standby namenode #{@standby_client.host}. Error: #{e.message}. Trying main webhdfs namenode.") end end @client = prepare_client(@host, @port, @user) begin test_client(@client) rescue => e # If no standy host is configured, we need to exit here. if not @standby_host raise else # If a standby host is configured, try this before giving up. logger.error("Could not connect to #{@client.host}:#{@client.port}. Error: #{e.message}") do_failover end end # Make sure @path contains %{[@metadata][thread_id]} format value if @single_file_per_thread is set to true. if @single_file_per_thread and !@path.include? "%{[@metadata][thread_id]}" @logger.error("Please set %{[@metadata][thread_id]} format value in @path if @single_file_per_thread is active.") raise LogStash::ConfigurationError end buffer_initialize( :max_items => @flush_size, :max_interval => @idle_flush_time, :logger => @logger ) @codec.on_event do |event, encoded_event| encoded_event end end
write_data(path, data)
click to toggle source
# File lib/logstash/outputs/webhdfs.rb, line 215 def write_data(path, data) # Retry max_retry times. This can solve problems like leases being hold by another process. Sadly this is no # KNOWN_ERROR in rubys webhdfs client. write_tries = 0 begin # Try to append to already existing file, which will work most of the times. @client.append(path, data) # File does not exist, so create it. rescue WebHDFS::FileNotFoundError # Add snappy header if format is "file". if @compression == "snappy" and @snappy_format == "file" @client.create(path, get_snappy_header! + data) elsif @client.create(path, data) end # Handle other write errors and retry to write max. @retry_times. rescue => e # Handle StandbyException and do failover. Still we want to exit if write_tries >= @retry_times. if @standby_client && (e.message.match(/Failed to connect to host/) || e.message.match(/StandbyException/)) do_failover write_tries += 1 retry end if write_tries < @retry_times @logger.warn("webhdfs write caused an exception: #{e.message}. Maybe you should increase retry_interval or reduce number of workers. Retrying...") sleep(@retry_interval * write_tries) write_tries += 1 retry else # Issue error after max retries. @logger.error("Max write retries reached. Events will be discarded. Exception: #{e.message}") end end end