class Fluent::KubernetesMetadataInput
Constants
- K8_POD_CA_CERT
- K8_POD_TOKEN
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kubernetes_metadata.rb, line 62 def initialize super require 'kubeclient' require 'active_support/core_ext/object/blank' end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kubernetes_metadata.rb, line 68 def configure(conf) super # Use Kubernetes default service account if we're in a pod. if @kubernetes_url.nil? env_host = ENV['KUBERNETES_SERVICE_HOST'] env_port = ENV['KUBERNETES_SERVICE_PORT'] if env_host.present? && env_port.present? @kubernetes_url = "https://#{env_host}:#{env_port}/api" end end unless @kubernetes_url raise Fluent::ConfigError, "kubernetes_url is not defined" end # Use SSL certificate and bearer token from Kubernetes service account. if Dir.exist?(@secret_dir) ca_cert = File.join(@secret_dir, K8_POD_CA_CERT) pod_token = File.join(@secret_dir, K8_POD_TOKEN) if !@ca_file.present? and File.exist?(ca_cert) @ca_file = ca_cert end if !@bearer_token_file.present? and File.exist?(pod_token) @bearer_token_file = pod_token end end @get_res_string = "get_#{@resource.downcase}" log.trace "resource get method is #{@get_res_string}" end
emit_event(event_obj, time, type)
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metadata.rb, line 169 def emit_event(event_obj, time, type) payload = syms_to_strs(event_obj) payload['event_type'] = type res_name = @resource.to_s.downcase namespace_name = event_obj['metadata']['namespace'] ? event_obj['metadata']['namespace'] : "openshift-infra" if event_obj['metadata']['labels'] then labels = [] syms_to_strs(event_obj['metadata']['labels'].to_h).each{|k,v| labels << "#{k}=#{v}"} payload['metadata']['labels'] = labels end if event_obj['metadata']['annotations'] then annotations = [] syms_to_strs(event_obj['metadata']['annotations'].to_h).each{|k,v| annotations << "#{k}=#{v}"} payload['metadata']['annotations'] = annotations end tag = "kubernetes.#{res_name}.#{namespace_name}.#{event_obj['metadata']['name']}" router.emit(tag, time, payload) end
shutdown()
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metadata.rb, line 142 def shutdown @thread.exit end
start()
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metadata.rb, line 101 def start start_kubclient @thread = Thread.new(&method(:watch_resource)) @thread.abort_on_exception = true log.trace "Exited the watcher thread" end
start_kubclient()
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metadata.rb, line 111 def start_kubclient return @client if @client if @kubernetes_url.present? ssl_options = { client_cert: @client_cert.present? ? OpenSSL::X509::Certificate.new(File.read(@client_cert)) : nil, client_key: @client_key.present? ? OpenSSL::PKey::RSA.new(File.read(@client_key)) : nil, ca_file: @ca_file, verify_ssl: @verify_ssl ? OpenSSL::SSL::VERIFY_PEER : OpenSSL::SSL::VERIFY_NONE } auth_options = {} if @bearer_token_file.present? bearer_token = File.read(@bearer_token_file) auth_options[:bearer_token] = bearer_token end @client = Kubeclient::Client.new @kubernetes_url, @apiVersion, ssl_options: ssl_options, auth_options: auth_options begin @client.api_valid? rescue KubeException => kube_error raise Fluent::ConfigError, "Invalid Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}: #{kube_error.message}" end end end
syms_to_strs(hsh)
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metadata.rb, line 47 def syms_to_strs(hsh) newhsh = {} hsh.each_pair do |kk,vv| if vv.is_a?(Hash) vv = syms_to_strs(vv) end if kk.is_a?(Symbol) newhsh[kk.to_s] = vv else newhsh[kk] = vv end end newhsh end
watch_resource()
click to toggle source
# File lib/fluent/plugin/in_kubernetes_metadata.rb, line 146 def watch_resource loop do begin resource_version = @client.send(@get_res_string).resourceVersion watcher = @client.watch_entities(@resource, options = {resource_version: resource_version}) rescue Exception => e raise Fluent::ConfigError, "Exception encountered fetching metadata from Kubernetes API endpoint: #{e.message}" end begin watcher.each do |notice| time = Engine.now emit_event(notice.object, time, notice.type) end log.trace "Exited resource watcher" rescue log.error "Unexpected error in resource watcher", :error=>$!.to_s log.error_backtrace end end end