class GraphiteAPI::Buffer

Constants

AGGREGATORS
END_OF_STREAM
IGNORE
VALID_MESSAGE

Matches the metric name (string with dots and dashes), at least one space, the metric value (int or float), at least one space and finally the metric timestamp (int)

Attributes

cache[R]
options[R]
queue[R]
streamer[R]

Public Class Methods

new(options, timers=false) click to toggle source
# File lib/graphite-api/buffer.rb, line 38
def initialize options, timers=false
  @options = options
  @queue = Queue.new
  @streamer = Hash.new {|h,k| h[k] = ""}
  @cache = Cache::Memory.new(options, timers) if options[:cache]
end

Public Instance Methods

<<(obj)
Alias for: push
inspect() click to toggle source
# File lib/graphite-api/buffer.rb, line 98
def inspect
  "#<GraphiteAPI::Buffer:%s @quque#size=%s @streamer=%s>" %
    [ object_id, queue.size, streamer]
end
new_records?() click to toggle source
# File lib/graphite-api/buffer.rb, line 103
def new_records?
  !queue.empty?
end
pull(format = nil) click to toggle source
# File lib/graphite-api/buffer.rb, line 73
def pull format = nil
  data = Hash.new { |h,time| h[time] = Hash.new { |h2,metric| h2[metric] = cache_get(time, metric) } }
  aggregation_methods = Hash.new

  counter = 0
  while new_records? and (counter += 1) < 1_000_000
    metrics, time, method_name = queue.pop.values_at(:metric, :time, :aggregation_method)

    normalized_time = normalize_time(time, options[:slice])
    metrics.each do |metric, value|
      aggregation_methods[metric] = method_name || options[:default_aggregation_method]
      data[normalized_time][metric].push value.to_f
      cache_set(normalized_time, metric, data[normalized_time][metric])
    end
  end

  data.map do |time, metrics|
    metrics.map do |metric, raw_values|
      value = AGGREGATORS[aggregation_methods[metric]].call(*raw_values)
      results = ["#{prefix}#{metric}",("%f"%value).to_f, time]
      format == :string ? results.join(" ") : results
    end
  end.flatten(1)
end
push(obj) click to toggle source

Add records to buffer push({:metric => {'a' => 10},:time => Time.now,:aggregation_method => :sum})

# File lib/graphite-api/buffer.rb, line 65
def push obj
  Logger.debug [:buffer,:add, obj]
  queue.push obj
  nil
end
Also aliased as: <<
stream(message, client_id = nil) click to toggle source

this method isn't thread safe use push for multiple threads support

# File lib/graphite-api/buffer.rb, line 49
def stream message, client_id = nil
  message.gsub(/\t/,' ').each_char do |char|
    next if invalid_char? char
    streamer[client_id] += char

    if closed_stream? streamer[client_id]
      if streamer[client_id] =~ VALID_MESSAGE
        push stream_message_to_obj streamer[client_id]
      end
      streamer.delete client_id
    end
  end
end

Private Instance Methods

cache_get(time, metric) click to toggle source
# File lib/graphite-api/buffer.rb, line 109
def cache_get time, metric
  if cache
    cache.get(time, metric) || []
  else
    []
  end
end
cache_set(time, metric, value) click to toggle source
# File lib/graphite-api/buffer.rb, line 117
def cache_set time, metric, value
  cache.set(time, metric, value) if cache
end
closed_stream?(string) click to toggle source
# File lib/graphite-api/buffer.rb, line 135
def closed_stream? string
  string[-1,1] == END_OF_STREAM
end
invalid_char?(char) click to toggle source
# File lib/graphite-api/buffer.rb, line 131
def invalid_char? char
  IGNORE.include? char
end
normalize_time(time, slice) click to toggle source
# File lib/graphite-api/buffer.rb, line 121
def normalize_time time, slice
  slice = 60 if slice.nil?
  ((time || Time.now).to_i / slice * slice).to_i
end
prefix() click to toggle source
# File lib/graphite-api/buffer.rb, line 139
def prefix
  @prefix ||= if options[:prefix] and !options[:prefix].empty?
    Array(options[:prefix]).join('.') << '.'
  else
    ""
  end
end
stream_message_to_obj(message) click to toggle source
# File lib/graphite-api/buffer.rb, line 126
def stream_message_to_obj message
  parts = message.split
  {:metric => { parts[0] => parts[1] },:time => Time.at(parts[2].to_i) }
end