class Prometheus::Client::DataStores::DirectFileStore::MetricStore
Attributes
metric_name[R]
store_settings[R]
Public Class Methods
new(metric_name:, store_settings:, metric_settings:)
click to toggle source
# File lib/prometheus/client/data_stores/direct_file_store.rb, line 78 def initialize(metric_name:, store_settings:, metric_settings:) @metric_name = metric_name @store_settings = store_settings @values_aggregation_mode = metric_settings[:aggregation] @store_opened_by_pid = nil @lock = Monitor.new end
Public Instance Methods
all_values()
click to toggle source
# File lib/prometheus/client/data_stores/direct_file_store.rb, line 127 def all_values stores_data = Hash.new{ |hash, key| hash[key] = [] } # There's no need to call `synchronize` here. We're opening a second handle to # the file, and `flock`ing it, which prevents inconsistent reads stores_for_metric.each do |file_path| begin store = FileMappedDict.new(file_path, true) store.all_values.each do |(labelset_qs, v, ts)| # Labels come as a query string, and CGI::parse returns arrays for each key # "foo=bar&x=y" => { "foo" => ["bar"], "x" => ["y"] } # Turn the keys back into symbols, and remove the arrays label_set = CGI::parse(labelset_qs).map do |k, vs| [k.to_sym, vs.first] end.to_h stores_data[label_set] << [v, ts] end ensure store.close if store end end # Aggregate all the different values for each label_set aggregate_hash = Hash.new { |hash, key| hash[key] = 0.0 } stores_data.each_with_object(aggregate_hash) do |(label_set, values), acc| acc[label_set] = aggregate_values(values) end end
get(labels:)
click to toggle source
# File lib/prometheus/client/data_stores/direct_file_store.rb, line 121 def get(labels:) in_process_sync do internal_store.read_value(store_key(labels)) end end
increment(labels:, by: 1)
click to toggle source
# File lib/prometheus/client/data_stores/direct_file_store.rb, line 108 def increment(labels:, by: 1) if @values_aggregation_mode == DirectFileStore::MOST_RECENT raise InvalidStoreSettingsError, "The :most_recent aggregation does not support the use of increment"\ "/decrement" end key = store_key(labels) in_process_sync do internal_store.increment_value(key, by.to_f) end end
set(labels:, val:)
click to toggle source
# File lib/prometheus/client/data_stores/direct_file_store.rb, line 102 def set(labels:, val:) in_process_sync do internal_store.write_value(store_key(labels), val.to_f) end end
synchronize() { || ... }
click to toggle source
Synchronize is used to do a multi-process Mutex, when incrementing multiple values at once, so that the other process, reading the file for export, doesn’t get incomplete increments.
‘in_process_sync`, instead, is just used so that two threads don’t increment the same value and get a context switch between read and write leading to an inconsistency
# File lib/prometheus/client/data_stores/direct_file_store.rb, line 94 def synchronize in_process_sync do internal_store.with_file_lock do yield end end end
Private Instance Methods
aggregate_values(values)
click to toggle source
# File lib/prometheus/client/data_stores/direct_file_store.rb, line 194 def aggregate_values(values) # Each entry in the `values` array is a tuple of `value` and `timestamp`, # so for all aggregations except `MOST_RECENT`, we need to only take the # first value in each entry and ignore the second. if @values_aggregation_mode == MOST_RECENT latest_tuple = values.max { |a,b| a[1] <=> b[1] } latest_tuple.first # return the value without the timestamp else values = values.map(&:first) # Discard timestamps if @values_aggregation_mode == SUM values.inject { |sum, element| sum + element } elsif @values_aggregation_mode == MAX values.max elsif @values_aggregation_mode == MIN values.min elsif @values_aggregation_mode == ALL values.first else raise InvalidStoreSettingsError, "Invalid Aggregation Mode: #{ @values_aggregation_mode }" end end end
filemap_filename()
click to toggle source
Filename for this metric’s PStore (one per process)
# File lib/prometheus/client/data_stores/direct_file_store.rb, line 181 def filemap_filename filename = "metric_#{ metric_name }___#{ process_id }.bin" File.join(@store_settings[:dir], filename) end
in_process_sync() { || ... }
click to toggle source
# File lib/prometheus/client/data_stores/direct_file_store.rb, line 159 def in_process_sync @lock.synchronize { yield } end
internal_store()
click to toggle source
# File lib/prometheus/client/data_stores/direct_file_store.rb, line 171 def internal_store if @store_opened_by_pid != process_id @store_opened_by_pid = process_id @internal_store = FileMappedDict.new(filemap_filename) else @internal_store end end
process_id()
click to toggle source
# File lib/prometheus/client/data_stores/direct_file_store.rb, line 190 def process_id Process.pid end
store_key(labels)
click to toggle source
# File lib/prometheus/client/data_stores/direct_file_store.rb, line 163 def store_key(labels) if @values_aggregation_mode == ALL labels[:pid] = process_id end labels.to_a.sort.map{|k,v| "#{CGI::escape(k.to_s)}=#{CGI::escape(v.to_s)}"}.join('&') end
stores_for_metric()
click to toggle source
# File lib/prometheus/client/data_stores/direct_file_store.rb, line 186 def stores_for_metric Dir.glob(File.join(@store_settings[:dir], "metric_#{ metric_name }___*")) end