class GraphiteAPI::Buffer
Constants
- AGGREGATORS
- END_OF_STREAM
- IGNORE
- VALID_MESSAGE
Matches the metric name (string with dots and dashes), at least one space, the metric value (int or float), at least one space and finally the metric timestamp (int)
Attributes
cache[R]
options[R]
queue[R]
streamer[R]
Public Class Methods
new(options, timers=false)
click to toggle source
# File lib/graphite-api/buffer.rb, line 38 def initialize options, timers=false @options = options @queue = Queue.new @streamer = Hash.new {|h,k| h[k] = ""} @cache = Cache::Memory.new(options, timers) if options[:cache] end
Public Instance Methods
inspect()
click to toggle source
# File lib/graphite-api/buffer.rb, line 98 def inspect "#<GraphiteAPI::Buffer:%s @quque#size=%s @streamer=%s>" % [ object_id, queue.size, streamer] end
new_records?()
click to toggle source
# File lib/graphite-api/buffer.rb, line 103 def new_records? !queue.empty? end
pull(format = nil)
click to toggle source
# File lib/graphite-api/buffer.rb, line 73 def pull format = nil data = Hash.new { |h,time| h[time] = Hash.new { |h2,metric| h2[metric] = cache_get(time, metric) } } aggregation_methods = Hash.new counter = 0 while new_records? and (counter += 1) < 1_000_000 metrics, time, method_name = queue.pop.values_at(:metric, :time, :aggregation_method) normalized_time = normalize_time(time, options[:slice]) metrics.each do |metric, value| aggregation_methods[metric] = method_name || options[:default_aggregation_method] data[normalized_time][metric].push value.to_f cache_set(normalized_time, metric, data[normalized_time][metric]) end end data.map do |time, metrics| metrics.map do |metric, raw_values| value = AGGREGATORS[aggregation_methods[metric]].call(*raw_values) results = ["#{prefix}#{metric}",("%f"%value).to_f, time] format == :string ? results.join(" ") : results end end.flatten(1) end
push(obj)
click to toggle source
Add records to buffer push({:metric => {'a' => 10},:time => Time.now,:aggregation_method => :sum})
# File lib/graphite-api/buffer.rb, line 65 def push obj Logger.debug [:buffer,:add, obj] queue.push obj nil end
Also aliased as: <<
stream(message, client_id = nil)
click to toggle source
this method isn't thread safe use push
for multiple threads support
# File lib/graphite-api/buffer.rb, line 49 def stream message, client_id = nil message.gsub(/\t/,' ').each_char do |char| next if invalid_char? char streamer[client_id] += char if closed_stream? streamer[client_id] if streamer[client_id] =~ VALID_MESSAGE push stream_message_to_obj streamer[client_id] end streamer.delete client_id end end end
Private Instance Methods
cache_get(time, metric)
click to toggle source
# File lib/graphite-api/buffer.rb, line 109 def cache_get time, metric if cache cache.get(time, metric) || [] else [] end end
cache_set(time, metric, value)
click to toggle source
# File lib/graphite-api/buffer.rb, line 117 def cache_set time, metric, value cache.set(time, metric, value) if cache end
closed_stream?(string)
click to toggle source
# File lib/graphite-api/buffer.rb, line 135 def closed_stream? string string[-1,1] == END_OF_STREAM end
invalid_char?(char)
click to toggle source
# File lib/graphite-api/buffer.rb, line 131 def invalid_char? char IGNORE.include? char end
normalize_time(time, slice)
click to toggle source
# File lib/graphite-api/buffer.rb, line 121 def normalize_time time, slice slice = 60 if slice.nil? ((time || Time.now).to_i / slice * slice).to_i end
prefix()
click to toggle source
# File lib/graphite-api/buffer.rb, line 139 def prefix @prefix ||= if options[:prefix] and !options[:prefix].empty? Array(options[:prefix]).join('.') << '.' else "" end end
stream_message_to_obj(message)
click to toggle source
# File lib/graphite-api/buffer.rb, line 126 def stream_message_to_obj message parts = message.split {:metric => { parts[0] => parts[1] },:time => Time.at(parts[2].to_i) } end