class DTK::State::Component::Attribute::Influxdb

Attributes

client[R]
measurement[R]

Public Class Methods

new(measurement_name) click to toggle source
# File lib/state/component/providers/influxdb.rb, line 11
def initialize(measurement_name)
  @client = Influxdb::Client.new
  @measurement = @client.measurement_helper(measurement_name)
end

Public Instance Methods

get(namespace, component_name, assembly_name, attribute_name, opts = {}) click to toggle source
# File lib/state/component/providers/influxdb.rb, line 16
def get(namespace, component_name, assembly_name, attribute_name, opts = {})
  required_tags = measurement.get_required_tags(namespace, component_name, assembly_name, attribute_name)
  required_tags.merge! measurement.get_correlator_type(opts[:entrypoint]) if opts[:provider] == "correlation"
  measurement.get_last_point(required_tags)
rescue => e
  raise "Error happened while getting attribute from InfluxDB.\nError: #{e}"
end
get_event(event_id, pod_name, pod_namespace, component_name, attribute_name, task_id) click to toggle source
# File lib/state/component/providers/influxdb.rb, line 41
def get_event(event_id, pod_name, pod_namespace, component_name, attribute_name, task_id)
  required_tags = measurement.get_required_tags(event_id, pod_name, pod_namespace, component_name, attribute_name, task_id)
  last_point = measurement.get_last_point(required_tags)
rescue => e
  raise "Error happened while getting event from InfluxDB.\nError: #{e}"
end
get_state(type, name, namespace, object_state, component_name, attribute_name, task_id) click to toggle source
# File lib/state/component/providers/influxdb.rb, line 57
def get_state(type, name, namespace, object_state, component_name, attribute_name, task_id)
  required_tags = measurement.get_required_tags(type, name, namespace, object_state, component_name, attribute_name, task_id)
  measurement.get_last_point(required_tags)
rescue => e
  raise "Error happened while getting state from InfluxDB.\nError: #{e}"
end
write(namespace, component_name, assembly_name, attribute_name, value, opts = {}, timestamp = nil) click to toggle source
# File lib/state/component/providers/influxdb.rb, line 24
def write(namespace, component_name, assembly_name, attribute_name, value, opts = {}, timestamp = nil)
  required_tags = measurement.get_required_tags(namespace, component_name, assembly_name, attribute_name)
  required_tags.merge! measurement.get_correlator_type(opts[:entrypoint]) if opts[:provider] == "correlation"
  measurement.write(value.to_s, required_tags, timestamp)
rescue => e
  raise "Error happened while writing attribute into InfluxDB.\Error: #{e}"
end
write_event(event_id, pod_name, pod_namespace, event_source, event_message, component_name, attribute_name, task_id, timestamp) click to toggle source
# File lib/state/component/providers/influxdb.rb, line 32
def write_event(event_id, pod_name, pod_namespace, event_source, event_message, component_name, attribute_name, task_id, timestamp)
  fail "Bad timestamp input, write operation wont be completed" if timestamp > Time.new
  value_to_write = { event_source: event_source, event_message: event_message }
  required_tags = measurement.get_required_tags(event_id, pod_name, pod_namespace, component_name, attribute_name, task_id)
  measurement.write(value_to_write.to_s, required_tags, timestamp)
rescue => error
  raise "Error happened while writing event into InfluxDB.\nError: #{e}"
end
write_state(type, name, namespace, object_state, spec, status, component_name, attribute_name, task_id, timestamp) click to toggle source
# File lib/state/component/providers/influxdb.rb, line 48
def write_state(type, name, namespace, object_state, spec, status, component_name, attribute_name, task_id, timestamp)
  raise "Bad timestamp input, write operation to InfluxDB wont be completed" if timestamp > Time.new
  value_to_write = { spec: spec, status: status }
  required_tags = measurement.get_required_tags(type, name, namespace, object_state, component_name, attribute_name, task_id)
  measurement.write(value_to_write.to_s, required_tags, timestamp)
rescue => e
  raise "Error happened while writing state into InfluxDB.\nError: #{e}"
end