class Fluent::Plugin::LokiOutput
Constants
- DEFAULT_BUFFER_TYPE
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_loki.rb, line 13 def initialize super end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_loki.rb, line 55 def configure(conf) compat_parameters_convert(conf, :buffer) super @ssl_verify_mode = if @ssl_no_verify OpenSSL::SSL::VERIFY_NONE else OpenSSL::SSL::VERIFY_PEER end @ca_file = @cacert_file @last_request_time = nil raise Fluent::ConfigError, "'tag' in chunk_keys is required." if !@chunk_key_tag && @buffered end
create_request(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 106 def create_request(tag, time, record) url = format_url(tag, time, record) uri = URI.parse(url+"/api/prom/push") req = Net::HTTP::Post.new(uri.request_uri) set_body(req, tag, time, record) set_header(req, tag, time, record) return req, uri end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 191 def format(tag, time, record) [time, record].to_msgpack end
format_labels(labels)
click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 172 def format_labels(labels) st = "{" labels.each do |key, val| st+= "#{key}=\"#{val}\"," end st[-1]="}" return st end
format_url(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 78 def format_url(tag, time, record) @endpoint_url end
formatted_to_msgpack_binary?()
click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 195 def formatted_to_msgpack_binary? true end
handle_record(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 180 def handle_record(tag, time, record) rec = {"streams"=>[{"labels"=>format_labels(@labels), "entries"=>[{"ts"=>Time.now.iso8601(3), "line"=>record.to_json}]}]} # I used time now instead of at 'time' because it cause 'Entry out of order' on loki's side req, uri = create_request(tag, time, rec) send_request(req, uri) end
http_opts(uri)
click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 115 def http_opts(uri) opts = { :use_ssl => uri.scheme == 'https' } opts[:verify_mode] = @ssl_verify_mode if opts[:use_ssl] opts[:ca_file] = File.join(@ca_file) if File.file?(@ca_file) opts end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 199 def multi_workers_ready? true end
prefer_buffered_processing()
click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 187 def prefer_buffered_processing @buffered end
process(tag, es)
click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 203 def process(tag, es) es.each do |time, record| handle_record(tag, time, record) end end
proxies()
click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 124 def proxies ENV['HTTPS_PROXY'] || ENV['HTTP_PROXY'] || ENV['http_proxy'] || ENV['https_proxy'] end
send_request(req, uri)
click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 128 def send_request(req, uri) is_rate_limited = (@rate_limit_msec != 0 and not @last_request_time.nil?) if is_rate_limited and ((Time.now.to_f - @last_request_time) * 1000.0 < @rate_limit_msec) log.info('Dropped request due to rate limiting') return end res = nil begin if @authentication == :basic req.basic_auth(@username, @password) elsif @authentication == :bearer req['authorization'] = "bearer #{@token}" elsif @authentication == :jwt req['authorization'] = "jwt #{@token}" end @last_request_time = Time.now.to_f if proxy = proxies proxy_uri = URI.parse(proxy) res = Net::HTTP.start(uri.host, uri.port, proxy_uri.host, proxy_uri.port, proxy_uri.user, proxy_uri.password, **http_opts(uri)) {|http| http.request(req) } else res = Net::HTTP.start(uri.host, uri.port, **http_opts(uri)) {|http| http.request(req) } end rescue => e # rescue all StandardErrors # server didn't respond log.warn "Net::HTTP.#{req.method.capitalize} raises exception: #{e.class}, '#{e.message}'" raise e if @raise_on_error else unless res and res.is_a?(Net::HTTPSuccess) res_summary = if res "#{res.code} #{res.message} #{res.body}" else "res=nil" end log.warn "failed to #{req.method} #{uri} (#{res_summary})" end #end unless end # end begin end
set_body(req, tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 82 def set_body(req, tag, time, record) set_json_body(req, record) req end
set_header(req, tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 87 def set_header(req, tag, time, record) if @tenant req["X-Scope-OrgID"] = @tenant end if @custom_headers @custom_headers.each do |k,v| req[k] = v end req else req end end
set_json_body(req, data)
click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 101 def set_json_body(req, data) req.body = Yajl.dump(data) req['Content-Type'] = 'application/json' end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_loki.rb, line 74 def shutdown super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_loki.rb, line 70 def start super end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 209 def write(chunk) tag = chunk.metadata.tag @endpoint_url = extract_placeholders(@endpoint_url, chunk.metadata) chunk.msgpack_each do |time, record| handle_record(tag, time, record) end end