class LogStash::Inputs::Ganglia
Read ganglia packets from the network via udp
Public Class Methods
new(params)
click to toggle source
Calls superclass method
# File lib/logstash/inputs/ganglia.rb, line 24 def initialize(params) super BasicSocket.do_not_reverse_lookup = true end
Public Instance Methods
parse_packet(packet)
click to toggle source
# File lib/logstash/inputs/ganglia.rb, line 98 def parse_packet(packet) gmonpacket=GmonPacket.new(packet) if gmonpacket.meta? # Extract the metadata from the packet meta=gmonpacket.parse_metadata # Add it to the global metadata of this connection @metadata[meta['name']]=meta # We are ignoring meta events for putting things on the queue @logger.debug("received a meta packet", @metadata) return nil elsif gmonpacket.data? data=gmonpacket.parse_data(@metadata) # Check if it was a valid data request return nil unless data props={ "program" => "ganglia", "log_host" => data["hostname"], "val" => data["val"] } %w{dmax tmax slope type units name}.each do |info| props[info] = @metadata[data["name"]][info] end return LogStash::Event.new(props) else # Skipping unknown packet types return nil end end
register()
click to toggle source
# File lib/logstash/inputs/ganglia.rb, line 30 def register end
run(output_queue)
click to toggle source
# File lib/logstash/inputs/ganglia.rb, line 34 def run(output_queue) begin udp_listener(output_queue) rescue => e if !stop? @logger.warn("ganglia udp listener died", :address => "#{@host}:#{@port}", :exception => e, :backtrace => e.backtrace) Stud.stoppable_sleep(5) { stop? } retry end end # begin end
stop()
click to toggle source
# File lib/logstash/inputs/ganglia.rb, line 85 def stop close_udp end
Private Instance Methods
close_udp()
click to toggle source
# File lib/logstash/inputs/ganglia.rb, line 90 def close_udp if @udp @udp.close @udp = nil end end
udp_listener(output_queue)
click to toggle source
# File lib/logstash/inputs/ganglia.rb, line 49 def udp_listener(output_queue) @logger.info("Starting ganglia udp listener", :address => "#{@host}:#{@port}") @udp.close if @udp @udp = UDPSocket.new(Socket::AF_INET) @udp.bind(@host, @port) @metadata = Hash.new if @metadata.nil? while !stop? packet = "" begin packet, client = @udp.recvfrom_nonblock(9000) rescue IO::WaitReadable # The socket is still not active and readable so the # read operation fails packet = "" end # recvfrom_nonblock return packet == String.empty? when no data is in the buffers, # we reused this same error code for IO:WaitReadable exception handler. next if packet.empty? # TODO(sissel): make this a codec... e = parse_packet(packet) unless e.nil? decorate(e) e.set("host", client[3]) # the IP address output_queue << e end end ensure close_udp end