class Fluent::KubernetesMetadataInput

Constants

K8_POD_CA_CERT
K8_POD_TOKEN

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kubernetes_metadata.rb, line 62
def initialize
  super
  require 'kubeclient'
  require 'active_support/core_ext/object/blank'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kubernetes_metadata.rb, line 68
def configure(conf)
  super

  # Use Kubernetes default service account if we're in a pod.
  if @kubernetes_url.nil?
    env_host = ENV['KUBERNETES_SERVICE_HOST']
    env_port = ENV['KUBERNETES_SERVICE_PORT']
    if env_host.present? && env_port.present?
      @kubernetes_url = "https://#{env_host}:#{env_port}/api"
    end
  end
  unless @kubernetes_url
    raise Fluent::ConfigError, "kubernetes_url is not defined"
  end

  # Use SSL certificate and bearer token from Kubernetes service account.
  if Dir.exist?(@secret_dir)
    ca_cert = File.join(@secret_dir, K8_POD_CA_CERT)
    pod_token = File.join(@secret_dir, K8_POD_TOKEN)

    if !@ca_file.present? and File.exist?(ca_cert)
      @ca_file = ca_cert
    end

    if !@bearer_token_file.present? and File.exist?(pod_token)
      @bearer_token_file = pod_token
    end
  end

  @get_res_string = "get_#{@resource.downcase}"
  log.trace "resource get method is #{@get_res_string}"
end
emit_event(event_obj, time, type) click to toggle source
# File lib/fluent/plugin/in_kubernetes_metadata.rb, line 169
def emit_event(event_obj, time, type)
  payload = syms_to_strs(event_obj)
  payload['event_type'] = type
  res_name = @resource.to_s.downcase
  namespace_name = event_obj['metadata']['namespace'] ? event_obj['metadata']['namespace'] : "openshift-infra"
  if event_obj['metadata']['labels'] then
    labels = []
    syms_to_strs(event_obj['metadata']['labels'].to_h).each{|k,v| labels << "#{k}=#{v}"}
    payload['metadata']['labels'] = labels
  end
  if event_obj['metadata']['annotations'] then
    annotations = []
    syms_to_strs(event_obj['metadata']['annotations'].to_h).each{|k,v| annotations << "#{k}=#{v}"}
    payload['metadata']['annotations'] = annotations
  end

  tag = "kubernetes.#{res_name}.#{namespace_name}.#{event_obj['metadata']['name']}"

  router.emit(tag, time, payload)
end
shutdown() click to toggle source
# File lib/fluent/plugin/in_kubernetes_metadata.rb, line 142
def shutdown
  @thread.exit
end
start() click to toggle source
# File lib/fluent/plugin/in_kubernetes_metadata.rb, line 101
def start

  start_kubclient

  @thread = Thread.new(&method(:watch_resource))
  @thread.abort_on_exception = true
  log.trace "Exited the watcher thread"

end
start_kubclient() click to toggle source
# File lib/fluent/plugin/in_kubernetes_metadata.rb, line 111
def start_kubclient
  return @client if @client

  if @kubernetes_url.present?

    ssl_options = {
        client_cert: @client_cert.present? ? OpenSSL::X509::Certificate.new(File.read(@client_cert)) : nil,
        client_key:  @client_key.present? ? OpenSSL::PKey::RSA.new(File.read(@client_key)) : nil,
        ca_file:     @ca_file,
        verify_ssl:  @verify_ssl ? OpenSSL::SSL::VERIFY_PEER : OpenSSL::SSL::VERIFY_NONE
    }

    auth_options = {}

    if @bearer_token_file.present?
      bearer_token = File.read(@bearer_token_file)
      auth_options[:bearer_token] = bearer_token
    end

    @client = Kubeclient::Client.new @kubernetes_url, @apiVersion,
                                     ssl_options: ssl_options,
                                     auth_options: auth_options

    begin
      @client.api_valid?
    rescue KubeException => kube_error
      raise Fluent::ConfigError, "Invalid Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}: #{kube_error.message}"
    end
  end
end
syms_to_strs(hsh) click to toggle source
# File lib/fluent/plugin/in_kubernetes_metadata.rb, line 47
def syms_to_strs(hsh)
  newhsh = {}
  hsh.each_pair do |kk,vv|
    if vv.is_a?(Hash)
      vv = syms_to_strs(vv)
    end
    if kk.is_a?(Symbol)
      newhsh[kk.to_s] = vv
    else
      newhsh[kk] = vv
    end
  end
  newhsh
end
watch_resource() click to toggle source
# File lib/fluent/plugin/in_kubernetes_metadata.rb, line 146
def watch_resource
  loop do
    begin
      resource_version = @client.send(@get_res_string).resourceVersion
      watcher          = @client.watch_entities(@resource, options = {resource_version: resource_version})
    rescue Exception => e
      raise Fluent::ConfigError, "Exception encountered fetching metadata from Kubernetes API endpoint: #{e.message}"
    end


    begin
      watcher.each do |notice|
        time = Engine.now
        emit_event(notice.object, time, notice.type)
      end
      log.trace "Exited resource watcher"
    rescue
      log.error "Unexpected error in resource watcher", :error=>$!.to_s
      log.error_backtrace
    end
  end
end