class Embulk::Input::Prometheus
Public Class Methods
resume(task, columns, count) { |task, columns, count| ... }
click to toggle source
# File lib/embulk/input/prometheus.rb, line 36 def self.resume(task, columns, count, &control) task_reports = yield(task, columns, count) next_config_diff = {} return next_config_diff end
transaction(config, &control)
click to toggle source
# File lib/embulk/input/prometheus.rb, line 12 def self.transaction(config, &control) task = { "url" => config.param("url", :string, default: 'http://localhost:9090/api/v1/'), "query" => config.param("query", :string), "since" => config.param("since", :integer, default: 0), "start_at" => config.param("start_at", :string, default: nil), "end_at" => config.param("end_at", :string, default: nil), "step" => config.param("step", :integer), "element_key" => config.param("element_key", :string, default: 'instance'), "tls" => config.param("tls", :hash, default: nil), "token" => config.param("token", :string, default: nil), "timeout" => config.param("timeout", :interger, default: 60), "open_timeout" => config.param("open_timeout", :interger, default: 10), } columns = [ Column.new(0, "name", :string), Column.new(1, "time", :long), Column.new(2, "value", :double), ] resume(task, columns, 1, &control) end
Public Instance Methods
run()
click to toggle source
# File lib/embulk/input/prometheus.rb, line 43 def run params = { url: task['url'], options: {}, ssl: {}, credentials: {}, } if task["tls"] params[:ssl][:client_cert] = OpenSSL::X509::Certificate.new(File.read(task["tls"]["cert_path"])) if task["tls"]["cert_path"] params[:ssl][:client_key] = OpenSSL::PKey::RSA.new(File.read(task["tls"]["key_path"])) if task["tls"]["key_path"] params[:ssl][:ca_file] = task["tls"]["ca_path"] if task["tls"]["ca_path"] end params[:credentials][:token] = task["token"] if task["token"] %w( open_timeout timeout ).each do |n| params[:options][n.to_sym] = task[n] if task[n] end start_at = if task['start_at'] && !task['start_at'].empty? Time.parse(task['start_at']) else Time.now - task['since'] end end_at = if task['end_at'] && !task['end_at'].empty? Time.parse(task['end_at']) else Time.now end result = JSON.parse(::Prometheus::ApiClient.client(params).get( 'query_range', query: task['query'], start: start_at.strftime("%Y-%m-%dT%H:%M:%S.%LZ"), end: end_at.strftime("%Y-%m-%dT%H:%M:%S.%LZ"), step: "#{task['step']}s", ).body) result['data']['result'].each do |r| r["values"].each do |v| page_builder.add([r["metric"][task["element_key"]], v[0], v[1]]) end end if result['status'] == 'success' page_builder.finish task_report = {} return task_report end