class Fluent::Plugin::LabelRouterOutput

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_label_router.rb, line 201
def configure(conf)
  super
  @registry = (::Prometheus::Client.registry if @metrics)
  @route_map = Hash.new { |h, k| h[k] = Set.new }
  @mutex = Mutex.new
  @routers = []
  @default_router = nil
  @routes.each do |rule|
    route_router = event_emitter_router(rule['@label'])
    @routers << Route.new(rule, route_router, @registry)
  end

  if @default_route != '' or @default_tag != ''
    default_rule = { 'matches' => nil, 'tag' => @default_tag, '@label' => @default_route}
    @default_router = Route.new(default_rule, event_emitter_router(@default_route), @registry)
  end

  @access_to_labels = record_accessor_create("$.kubernetes.labels")
  @access_to_namespace = record_accessor_create("$.kubernetes.namespace_name")
  @access_to_host = record_accessor_create("$.kubernetes.host")
  @access_to_container_name = record_accessor_create("$.kubernetes.container_name")

  @batch = @emit_mode == :batch
end
process(tag, es) click to toggle source
# File lib/fluent/plugin/out_label_router.rb, line 147
def process(tag, es)
  if @sticky_tags
    @mutex.synchronize {
      if @route_map.has_key?(tag)
        # We already matched with this tag send events to the routers
        @route_map[tag].each do |r|
          r.emit_es(tag, es.dup)
        end
        return
      end
    }
  end
  event_stream = Hash.new {|h, k| h[k] = Fluent::MultiEventStream.new }
  es.each do |time, record|
    input_metadata = { labels: @access_to_labels.call(record).to_h,
                       namespace: @access_to_namespace.call(record).to_s,
                       container: @access_to_container_name.call(record).to_s,
                       host: @access_to_host.call(record).to_s}
    orphan_record = true
    @routers.each do |r|
      if r.match?(input_metadata)
        orphan_record = false
        if @sticky_tags
          @mutex.synchronize {
            @route_map[tag].add(r)
          }
        end
        if @batch
          event_stream[r].add(time, record)
        else
          r.emit(tag, time, record.dup)
        end
      end
    end
    if !@default_router.nil? && orphan_record
      if @sticky_tags
        @mutex.synchronize {
          @route_map[tag].add(@default_router)
        }
      end
      if @batch
        event_stream[@default_router].add(time, record)
      else
        @default_router.emit(tag, time, record.dup)
      end
    end
  end
  if @batch
    event_stream.each do |r, es|
      r.emit_es(tag, es.dup)
    end
  end
end