class LogStash::Outputs::Riemann

Riemann is a network event stream processing system.

While Riemann is very similar conceptually to Logstash, it has much more in terms of being a monitoring system replacement.

Riemann is used in Logstash much like statsd or other metric-related outputs

You can learn about Riemann here:

You can see the author talk about it here:

Public Instance Methods

build_riemann_formatted_event(event) click to toggle source
# File lib/logstash/outputs/riemann.rb, line 131
def build_riemann_formatted_event(event)
  # Let's build us an event, shall we?
  r_event = Hash.new

  # Always copy "message" to Riemann's "description" field.
  r_event[:description] = event.get("message")

  # Directly map all other fields, if requested. Note that the "message" field
  # will also be mapped this way, so if it's present, it will become a
  # redundant copy of "description".
  if @map_fields == true
    r_event.merge! map_fields(nil, event.to_hash)
  end

  # Fields specified in the "riemann_event" configuration option take
  # precedence over mapped fields.
  if @riemann_event
    @riemann_event.each do |key, val|
      r_event[key.to_sym] = event.sprintf(val)
    end
  end

  # Riemann event attributes are always strings, with a few critical
  # exceptions. "ttl" and "metric" should be sent as float values.
  r_event[:ttl] = r_event[:ttl].to_f if r_event[:ttl]
  r_event[:metric] = r_event[:metric].to_f if r_event[:metric]

  # Similarly, event _time_ in Riemann was historically an integer value.
  # While current Riemann versions support sub-second time resolution in the
  # form of a float, we currently ensure that we send an integer value, as
  # expected by Riemann versions earlier than 0.2.13.
  r_event[:time] = event.timestamp.to_i

  r_event[:tags] = event.get("tags") if event.get("tags").is_a?(Array)
  r_event[:host] = event.sprintf(@sender)

  return r_event
end
map_fields(parent, fields) click to toggle source
# File lib/logstash/outputs/riemann.rb, line 109
def map_fields(parent, fields)
  this_level = Hash.new
  fields.each do |key, contents|
    next if key.start_with?("@")
    field = parent.nil? ? key : "#{parent}.#{key}"
    if contents.is_a?(Hash)
      this_level.merge! map_fields(field, contents)
    else
      this_level[field.to_sym] = contents
    end
  end
  return this_level
end
receive(event) click to toggle source
# File lib/logstash/outputs/riemann.rb, line 124
def receive(event)
  r_event = build_riemann_formatted_event(event)

  @logger.debug("Riemann event: ", :riemann_event => r_event)
  send_to_riemann(r_event)
end
register() click to toggle source
# File lib/logstash/outputs/riemann.rb, line 103
def register
  require 'riemann/client'
  @client = Riemann::Client.new(:host => @host, :port => @port)
end
send_to_riemann(riemann_formatted_event) click to toggle source
# File lib/logstash/outputs/riemann.rb, line 170
def send_to_riemann(riemann_formatted_event)
  begin
    proto_client = @client.instance_variable_get("@#{@protocol}")
    @logger.debug("Riemann client proto: #{proto_client.to_s}")
    proto_client << riemann_formatted_event
  rescue Exception => e
    @logger.error("Unhandled exception", :error => e)
  end
end