class LogStash::Outputs::Pipe
Pipe
output.
Pipe
events to stdin of another program. You can use fields from the event as parts of the command. WARNING: This feature can cause logstash to fork off multiple children if you are not carefull with per-event commandline.
Public Instance Methods
close()
click to toggle source
# File lib/logstash/outputs/pipe.rb, line 57 def close @logger.info("close: closing pipes") @pipes.each do |command, pipe| begin drop_pipe(command) @logger.debug("Closed pipe #{command}", :pipe => pipe) rescue Exception => e @logger.error("Excpetion while closing pipes.", :exception => e) end end end
receive(event)
click to toggle source
# File lib/logstash/outputs/pipe.rb, line 34 def receive(event) command = event.sprintf(@command) if @message_format output = event.sprintf(@message_format) + "\n" else output = event.to_json end begin pipe = get_pipe(command) pipe.puts(output) rescue IOError, Errno::EPIPE, Errno::EBADF => e @logger.error("Error writing to pipe, closing pipe.", :command => command, :pipe => pipe) drop_pipe(command) retry end close_stale_pipes end
register()
click to toggle source
# File lib/logstash/outputs/pipe.rb, line 28 def register @pipes = {} @last_stale_cleanup_cycle = Time.now end
Private Instance Methods
close_stale_pipes()
click to toggle source
every 10 seconds or so (triggered by events, but if there are no events there's no point closing files anyway)
# File lib/logstash/outputs/pipe.rb, line 71 def close_stale_pipes return if @ttl <= 0 now = Time.now return unless now - @last_stale_cleanup_cycle >= @ttl @logger.info("Starting stale pipes cleanup cycle", :pipes => @pipes) inactive_pipes = @pipes.select { |command, pipe| not pipe.active } @logger.debug("%d stale pipes found" % inactive_pipes.count, :inactive_pipes => inactive_pipes) inactive_pipes.each do |command, pipe| drop_pipe(command) end # mark all pipes as inactive, a call to write will mark them as active again @pipes.each { |command, pipe| pipe.active = false } @last_stale_cleanup_cycle = now end
drop_pipe(command)
click to toggle source
# File lib/logstash/outputs/pipe.rb, line 86 def drop_pipe(command) return unless @pipes.include? command @logger.info("Closing pipe \"%s\"" % command) begin @pipes[command].close rescue Exception => e @logger.warn("Failed to close pipe.", :error => e, :command => command) end @pipes.delete(command) end
get_pipe(command)
click to toggle source
# File lib/logstash/outputs/pipe.rb, line 97 def get_pipe(command) return @pipes[command] if @pipes.include?(command) @logger.info("Opening pipe", :command => command) @pipes[command] = PipeWrapper.new(command, mode="a+") end