class LogStash::Inputs::Ganglia

Read ganglia packets from the network via udp

Public Class Methods

new(params) click to toggle source
Calls superclass method
# File lib/logstash/inputs/ganglia.rb, line 24
def initialize(params)
  super
  BasicSocket.do_not_reverse_lookup = true
end

Public Instance Methods

parse_packet(packet) click to toggle source
# File lib/logstash/inputs/ganglia.rb, line 98
def parse_packet(packet)
  gmonpacket=GmonPacket.new(packet)
  if gmonpacket.meta?
    # Extract the metadata from the packet
    meta=gmonpacket.parse_metadata
    # Add it to the global metadata of this connection
    @metadata[meta['name']]=meta

    # We are ignoring meta events for putting things on the queue
    @logger.debug("received a meta packet", @metadata)
    return nil
  elsif gmonpacket.data?
    data=gmonpacket.parse_data(@metadata)

    # Check if it was a valid data request
    return nil unless data
    props={ "program" => "ganglia", "log_host" => data["hostname"],
            "val" => data["val"] }
    %w{dmax tmax slope type units name}.each do |info|
      props[info] = @metadata[data["name"]][info]
    end
    return LogStash::Event.new(props)
  else
    # Skipping unknown packet types
    return nil
  end
end
register() click to toggle source
# File lib/logstash/inputs/ganglia.rb, line 30
def register
end
run(output_queue) click to toggle source
# File lib/logstash/inputs/ganglia.rb, line 34
def run(output_queue)
  begin
    udp_listener(output_queue)
  rescue => e
    if !stop?
      @logger.warn("ganglia udp listener died",
                   :address => "#{@host}:#{@port}", :exception => e,
      :backtrace => e.backtrace)
      Stud.stoppable_sleep(5) { stop? }
      retry
    end
  end # begin
end
stop() click to toggle source
# File lib/logstash/inputs/ganglia.rb, line 85
def stop
  close_udp
end

Private Instance Methods

close_udp() click to toggle source
# File lib/logstash/inputs/ganglia.rb, line 90
def close_udp
  if @udp
    @udp.close
    @udp = nil
  end
end
udp_listener(output_queue) click to toggle source
# File lib/logstash/inputs/ganglia.rb, line 49
def udp_listener(output_queue)
  @logger.info("Starting ganglia udp listener", :address => "#{@host}:#{@port}")

  @udp.close if @udp

  @udp = UDPSocket.new(Socket::AF_INET)
  @udp.bind(@host, @port)

  @metadata = Hash.new if @metadata.nil?
  while !stop?
    packet = ""
    begin
      packet, client = @udp.recvfrom_nonblock(9000)
    rescue IO::WaitReadable
     # The socket is still not active and readable so the
     # read operation fails
     packet = ""
    end
    # recvfrom_nonblock return packet == String.empty? when no data is in the buffers,
    # we reused this same error code for IO:WaitReadable exception handler.
    next if packet.empty?
    # TODO(sissel): make this a codec...
    e = parse_packet(packet)
    unless e.nil?
      decorate(e)
      e.set("host", client[3]) # the IP address
      output_queue << e
    end
  end
ensure
  close_udp
end