class Fluent::Plugin::MonitorAgentInput
Constants
- IGNORE_ATTRIBUTES
- MONITOR_INFO
They are deprecated but remain for compatibility
- RETRY_INFO
Public Class Methods
new()
click to toggle source
Calls superclass method
Fluent::Compat::Input::new
# File lib/fluent/plugin/in_monitor_agent.rb, line 186 def initialize super @first_warn = false end
Public Instance Methods
all_plugins()
click to toggle source
# File lib/fluent/plugin/in_monitor_agent.rb, line 244 def all_plugins array = [] # get all input plugins array.concat Fluent::Engine.root_agent.inputs # get all output plugins array.concat Fluent::Engine.root_agent.outputs # get all filter plugins array.concat Fluent::Engine.root_agent.filters Fluent::Engine.root_agent.labels.each { |name, l| # TODO: Add label name to outputs / filters for identifing plugins array.concat l.outputs array.concat l.filters } array end
configure(conf)
click to toggle source
Calls superclass method
Fluent::Plugin::Input#configure
# File lib/fluent/plugin/in_monitor_agent.rb, line 192 def configure(conf) super @port += fluentd_worker_id end
fluentd_opts()
click to toggle source
# File lib/fluent/plugin/in_monitor_agent.rb, line 399 def fluentd_opts @fluentd_opts ||= get_fluentd_opts end
get_fluentd_opts()
click to toggle source
# File lib/fluent/plugin/in_monitor_agent.rb, line 403 def get_fluentd_opts opts = {} ObjectSpace.each_object(Fluent::Supervisor) { |obj| opts.merge!(obj.options) break } opts end
get_monitor_info(pe, opts={})
click to toggle source
get monitor info from the plugin ‘pe` and return a hash object
# File lib/fluent/plugin/in_monitor_agent.rb, line 311 def get_monitor_info(pe, opts={}) obj = {} # Common plugin information obj['plugin_id'] = pe.plugin_id obj['plugin_category'] = plugin_category(pe) obj['type'] = pe.config['@type'] obj['config'] = pe.config if opts[:with_config] # run MONITOR_INFO in plugins' instance context and store the info to obj MONITOR_INFO.each_pair {|key,code| begin catch(:skip) do obj[key] = pe.instance_exec(&code) end rescue NoMethodError => e unless @first_warn log.error "NoMethodError in monitoring plugins", key: key, plugin: pe.class, error: e log.error_backtrace @first_warn = true end rescue => e log.warn "unexpected error in monitoring plugins", key: key, plugin: pe.class, error: e end } if pe.respond_to?(:statistics) obj.merge!(pe.statistics.dig('output') || {}) obj.merge!(pe.statistics.dig('filter') || {}) obj.merge!(pe.statistics.dig('input') || {}) end obj['retry'] = get_retry_info(pe.retry) if opts[:with_retry] && pe.instance_variable_defined?(:@retry) # include all instance variables if :with_debug_info is set if opts[:with_debug_info] iv = {} pe.instance_eval do instance_variables.each {|sym| next if IGNORE_ATTRIBUTES.include?(sym) key = sym.to_s[1..-1] # removes first '@' iv[key] = instance_variable_get(sym) } end obj['instance_variables'] = iv elsif ivars = opts[:ivars] iv = {} ivars.each {|name| iname = "@#{name}" iv[name] = pe.instance_variable_get(iname) if pe.instance_variable_defined?(iname) } obj['instance_variables'] = iv end obj end
get_retry_info(pe_retry)
click to toggle source
# File lib/fluent/plugin/in_monitor_agent.rb, line 374 def get_retry_info(pe_retry) retry_variables = {} if pe_retry RETRY_INFO.each_pair { |key, param| retry_variables[key] = pe_retry.instance_variable_get(param) } end retry_variables end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/in_monitor_agent.rb, line 197 def multi_workers_ready? true end
plugin_category(pe)
click to toggle source
# File lib/fluent/plugin/in_monitor_agent.rb, line 386 def plugin_category(pe) case pe when Fluent::Plugin::Input 'input'.freeze when Fluent::Plugin::Output, Fluent::Plugin::MultiOutput, Fluent::Plugin::BareOutput 'output'.freeze when Fluent::Plugin::Filter 'filter'.freeze else 'unknown'.freeze end end
plugin_info_by_id(plugin_id, opts={})
click to toggle source
search a plugin by plugin_id
# File lib/fluent/plugin/in_monitor_agent.rb, line 280 def plugin_info_by_id(plugin_id, opts={}) found = all_plugins.find {|pe| pe.respond_to?(:plugin_id) && pe.plugin_id.to_s == plugin_id } if found get_monitor_info(found, opts) else nil end end
plugin_info_by_tag(tag, opts={})
click to toggle source
try to match the tag and get the info from the matched output plugin TODO: Support output in label
# File lib/fluent/plugin/in_monitor_agent.rb, line 267 def plugin_info_by_tag(tag, opts={}) matches = Fluent::Engine.root_agent.event_router.instance_variable_get(:@match_rules) matches.each { |rule| if rule.match?(tag) if rule.collector.is_a?(Fluent::Plugin::Output) || rule.collector.is_a?(Fluent::Output) return get_monitor_info(rule.collector, opts) end end } nil end
plugins_info_all(opts={})
click to toggle source
# File lib/fluent/plugin/in_monitor_agent.rb, line 302 def plugins_info_all(opts={}) all_plugins.map {|pe| get_monitor_info(pe, opts) } end
plugins_info_by_type(type, opts={})
click to toggle source
This method returns an array because multiple plugins could have the same type
# File lib/fluent/plugin/in_monitor_agent.rb, line 293 def plugins_info_by_type(type, opts={}) array = all_plugins.select {|pe| (pe.config['@type'] == type) rescue nil } array.map {|pe| get_monitor_info(pe, opts) } end
start()
click to toggle source
Calls superclass method
Fluent::Compat::Input#start
# File lib/fluent/plugin/in_monitor_agent.rb, line 208 def start super log.debug "listening monitoring http server on http://#{@bind}:#{@port}/api/plugins for worker#{fluentd_worker_id}" api_handler = APIHandler.new(self) http_server_create_http_server(:in_monitor_http_server_helper, addr: @bind, port: @port, logger: log, default_app: NotFoundJson) do |serv| serv.get('/api/plugins') { |req| api_handler.plugins_ltsv(req) } serv.get('/api/plugins.json') { |req| api_handler.plugins_json(req) } serv.get('/api/config') { |req| api_handler.config_ltsv(req) } serv.get('/api/config.json') { |req| api_handler.config_json(req) } end if @tag log.debug "tag parameter is specified. Emit plugins info to '#{@tag}'" opts = {with_config: false, with_retry: false} timer_execute(:in_monitor_agent_emit, @emit_interval, repeat: true) { es = Fluent::MultiEventStream.new now = Fluent::EventTime.now plugins_info_all(opts).each { |record| es.add(now, record) } router.emit_stream(@tag, es) } end end