class DTK::State::Component::Attribute::Influxdb::Measurement
Constants
- InfluxdbEncoding
- LEGAL_TAG_CLASSES
Attributes
client[R]
name[R]
Public Class Methods
new(name, client)
click to toggle source
# File lib/state/component/providers/influxdb/measurement.rb, line 9 def initialize(name, client) @name = name @client = client end
Public Instance Methods
flux_filter(params_hash)
click to toggle source
# File lib/state/component/providers/influxdb/measurement.rb, line 14 def flux_filter(params_hash) filter = '' params_hash.each_pair do |key,value| filter += "|> filter(fn: (r) => r.#{key} == \"#{value}\")" end filter end
get_last_point(params_hash = {})
click to toggle source
# File lib/state/component/providers/influxdb/measurement.rb, line 22 def get_last_point(params_hash = {}) check_params_hash(params_hash) flux_query = 'from(bucket:"' + client.connection_parameters[:bucket] + '") |> range(start:-5) |> filter(fn: (r) => r._measurement == "' + name.to_s + '")' + flux_filter(params_hash) + ' |> last()' + '|> drop(columns: ["_start", "_stop", "_field", "_measurement", "attribute_name", "assembly_name", "task_id", "component_name", "namespace"])' result = self.client.query(query: flux_query) result.values.map(&:records).flatten.map(&:values) rescue => e raise "Failed while getting last attribute point. Error: #{e}" end
Private Instance Methods
check_params_hash(params_hash = {})
click to toggle source
# File lib/state/component/providers/influxdb/measurement.rb, line 67 def check_params_hash(params_hash = {}) checked_params_hash = {} self.required_params.each do |name| unless value = params_hash[name] fail "Missing parameter '#{name}'" end fail_if_illegal_tag_value(name, value) checked_params_hash[name] = value end checked_params_hash end
fail_if_illegal_tag_value(name, value)
click to toggle source
# File lib/state/component/providers/influxdb/measurement.rb, line 96 def fail_if_illegal_tag_value(name, value) unless LEGAL_TAG_CLASSES.include?(value.class) fail "Parameter '#{name}' has an illegal type, legal types are #{LEGAL_TAG_CLASSES.join(', ')}" end end
get_string_params(array)
click to toggle source
# File lib/state/component/providers/influxdb/measurement.rb, line 86 def get_string_params(array) string_array = {} array.each_pair do |key, value| string_array[key.to_s] = value end string_array end
influxdb_encoding(params_hash, timestamp)
click to toggle source
This default encoding can be overwritten by its children, for example attribute might make teh emasurement name be “#{self.name}_#{params” if that turns to be more efficient indexing
# File lib/state/component/providers/influxdb/measurement.rb, line 82 def influxdb_encoding(params_hash, timestamp) InfluxdbEncoding.new(self.name, params_hash, timestamp) end
write_point(value, params_hash = {}, timestamp)
click to toggle source
# File lib/state/component/providers/influxdb/measurement.rb, line 49 def write_point(value, params_hash = {}, timestamp) # The encode function we allow us to use different mappings of name, params_hash # to the influxdb actual measurement name and tags. begin influxdb_encoding = influxdb_encoding(params_hash, timestamp) timestamp = timestamp.nil? ? Time.now : timestamp data = { name: name.to_s, tags: get_string_params(influxdb_encoding.tags), fields: { value: value }, time: (timestamp.to_f * 1000).to_i } self.client.write_point(data) rescue => error fail "write_point error: #{error}" end end