class Fluent::Plugin::LokiOutput
Constants
- DEFAULT_BUFFER_TYPE
Attributes
record_accessors[RW]
Public Instance Methods
client_cert_configured?()
click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 128 def client_cert_configured? !@key.nil? && !@cert.nil? end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_loki.rb, line 81 def configure(conf) # rubocop:disable Metrics/CyclomaticComplexity compat_parameters_convert(conf, :buffer) super @uri = URI.parse(@url + '/loki/api/v1/push') unless @uri.is_a?(URI::HTTP) || @uri.is_a?(URI::HTTPS) raise Fluent::ConfigError, 'URL parameter must have HTTP/HTTPS scheme' end @record_accessors = {} conf.elements.select { |element| element.name == 'label' }.each do |element| element.each_pair do |k, v| element.has_key?(k) # rubocop:disable Style/PreferredHashMethods #to suppress unread configuration warning v = k if v.empty? @record_accessors[k] = record_accessor_create(v) end end @remove_keys_accessors = [] @remove_keys.each do |key| @remove_keys_accessors.push(record_accessor_create(key)) end # If configured, load and validate client certificate (and corresponding key) if client_cert_configured? load_client_cert validate_client_cert_key end raise "bearer_token_file #{@bearer_token_file} not found" if !@bearer_token_file.nil? && !File.exist?(@bearer_token_file) @auth_token_bearer = nil if !@bearer_token_file.nil? if !File.exist?(@bearer_token_file) raise "bearer_token_file #{@bearer_token_file} not found" end # Read the file once, assume long-lived authentication token. @auth_token_bearer = File.read(@bearer_token_file) if @auth_token_bearer.empty? raise "bearer_token_file #{@bearer_token_file} is empty" end log.info "will use Bearer token from bearer_token_file #{@bearer_token_file} in Authorization header" end raise "CA certificate file #{@ca_cert} not found" if !@ca_cert.nil? && !File.exist?(@ca_cert) end
generic_to_loki(chunk)
click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 201 def generic_to_loki(chunk) # log.debug("GenericToLoki: converting #{chunk}") streams = chunk_to_loki(chunk) payload = payload_builder(streams) payload end
http_request_opts(uri)
click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 171 def http_request_opts(uri) opts = { use_ssl: uri.scheme == 'https' } # Optionally disable server server certificate verification. if @insecure_tls opts = opts.merge( verify_mode: OpenSSL::SSL::VERIFY_NONE ) end # Optionally present client certificate if !@cert.nil? && !@key.nil? opts = opts.merge( cert: @cert, key: @key ) end # For server certificate verification: set custom CA bundle. # Only takes effect when `insecure_tls` is not set. unless @ca_cert.nil? opts = opts.merge( ca_file: @ca_cert ) end opts end
load_client_cert()
click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 132 def load_client_cert @cert = OpenSSL::X509::Certificate.new(File.read(@cert)) if @cert @key = OpenSSL::PKey.read(File.read(@key)) if @key end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 143 def multi_workers_ready? true end
validate_client_cert_key()
click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 137 def validate_client_cert_key if !@key.is_a?(OpenSSL::PKey::RSA) && !@key.is_a?(OpenSSL::PKey::DSA) raise "Unsupported private key type #{key.class}" end end
write(chunk)
click to toggle source
flush a chunk to loki
# File lib/fluent/plugin/out_loki.rb, line 148 def write(chunk) # streams by label payload = generic_to_loki(chunk) body = { 'streams' => payload } tenant = extract_placeholders(@tenant, chunk) if @tenant # add ingest path to loki url res = loki_http_request(body, tenant) if res.is_a?(Net::HTTPSuccess) log.debug "POST request was responded to with status code #{res.code}" return end res_summary = "#{res.code} #{res.message} #{res.body}" log.warn "failed to write post to #{@uri} (#{res_summary})" log.debug Yajl.dump(body) # Only retry 429 and 500s raise(LogPostError, res_summary) if res.is_a?(Net::HTTPTooManyRequests) || res.is_a?(Net::HTTPServerError) end
Private Instance Methods
chunk_to_loki(chunk)
click to toggle source
iterate through each chunk and create a loki stream entry
# File lib/fluent/plugin/out_loki.rb, line 342 def chunk_to_loki(chunk) streams = {} last_time = nil chunk.each do |time, record| # each chunk has a unique set of labels last_time = time if last_time.nil? result = line_to_loki(record) chunk_labels = result[:labels] # initialize a new stream with the chunk_labels if it does not exist streams[chunk_labels] = [] if streams[chunk_labels].nil? # NOTE: timestamp must include nanoseconds # append to matching chunk_labels key streams[chunk_labels].push( 'ts' => to_nano(time), 'line' => result[:line] ) end streams end
format_labels(data_labels)
click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 235 def format_labels(data_labels) formatted_labels = {} # merge extra_labels with data_labels. If there are collisions extra_labels win. data_labels = {} if data_labels.nil? data_labels = data_labels.merge(@extra_labels) # sanitize label values data_labels.each { |k, v| formatted_labels[k] = v.gsub('"', '\\"') if v && v&.is_a?(String) } formatted_labels end
line_to_loki(record)
click to toggle source
convert a line to loki line with labels
# File lib/fluent/plugin/out_loki.rb, line 297 def line_to_loki(record) chunk_labels = {} line = '' if record.is_a?(Hash) @record_accessors&.each do |name, accessor| new_key = name.gsub(%r{[.\-\/]}, '_') chunk_labels[new_key] = accessor.call(record) accessor.delete(record) end if @extract_kubernetes_labels && record.key?('kubernetes') kubernetes_labels = record['kubernetes']['labels'] kubernetes_labels.each_key do |l| new_key = l.gsub(%r{[.\-\/]}, '_') chunk_labels[new_key] = kubernetes_labels[l] end end # remove needless keys. @remove_keys_accessors&.each do |deleter| deleter.delete(record) end line = record_to_line(record) else line = record.to_s end # add buffer flush thread title as a label if there are multiple flush threads # this prevents "entry out of order" errors in loki by making the label constellation # unique per flush thread # note that flush thread != fluentd worker. if you use multiple workers you still need to # add the worker id as a label if @buffer_config.flush_thread_count > 1 chunk_labels['fluentd_thread'] = Thread.current[:_fluentd_plugin_helper_thread_title].to_s end # return both the line content plus the labels found in the record { line: line, labels: chunk_labels } end
loki_http_request(body, tenant)
click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 210 def loki_http_request(body, tenant) req = Net::HTTP::Post.new( @uri.request_uri ) req.add_field('Content-Type', 'application/json') req.add_field('Authorization', "Bearer #{@auth_token_bearer}") if !@auth_token_bearer.nil? req.add_field('X-Scope-OrgID', tenant) if tenant req.body = Yajl.dump(body) req.basic_auth(@username, @password) if @username opts = http_request_opts(@uri) msg = "sending #{req.body.length} bytes to loki" msg += " (tenant: \"#{tenant}\")" if tenant log.debug msg Net::HTTP.start(@uri.host, @uri.port, **opts) { |http| http.request(req) } end
numeric?(val)
click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 229 def numeric?(val) !Float(val).nil? rescue StandardError false end
payload_builder(streams)
click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 245 def payload_builder(streams) payload = [] streams.each do |k, v| # create a stream for each label set. # Additionally sort the entries by timestamp just in case we # got them out of order. entries = v.sort_by.with_index { |hsh, i| [hsh['ts'], i] } payload.push( 'stream' => format_labels(k), 'values' => entries.map { |e| [e['ts'].to_s, e['line']] } ) end payload end
record_to_line(record)
click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 270 def record_to_line(record) line = '' if @drop_single_key && record.keys.length == 1 line = record[record.keys[0]] else case @line_format when :json line = Yajl.dump(record) when :key_value formatted_labels = [] record.each do |k, v| # Escape double quotes and backslashes by prefixing them with a backslash v = v.to_s.gsub(%r{(["\\])}, '\\\\\1') if v.include?(' ') || v.include?('=') formatted_labels.push(%(#{k}="#{v}")) else formatted_labels.push(%(#{k}=#{v})) end end line = formatted_labels.join(' ') end end line end
to_nano(time)
click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 260 def to_nano(time) # time is a Fluent::EventTime object, or an Integer which represents unix timestamp (seconds from Epoch) # https://docs.fluentd.org/plugin-development/api-plugin-output#chunk-each-and-block if time.is_a?(Fluent::EventTime) time.to_i * (10**9) + time.nsec else time.to_i * (10**9) end end