class Fluent::Plugin::CalyptiaMonitoringInput
Constants
- DEFAULT_PENDING_METRICS_SIZE
- DEFAULT_STORAGE_TYPE
- RPC_CONFIG_DUMP_ENDPOINT
- UNPROCESSABLE_HTTP_ERRORS
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_calyptia_monitoring.rb, line 64 def initialize super @current_config = nil @monitor = Monitor.new @pending = [] end
Public Instance Methods
add_metrics(buffer)
click to toggle source
# File lib/fluent/plugin/in_calyptia_monitoring.rb, line 229 def add_metrics(buffer) return false unless agent = @storage_agent.get(:agent) begin code, response = if @pending.empty? @api_requester.add_metrics(buffer, agent["token"], agent["id"]) else @monitor.synchronize do @pending = @pending.concat([buffer]) @api_requester.add_metrics(@pending.join, agent["token"], agent["id"]) @pending = [] end end if response && response["error"] case code.to_i when *UNPROCESSABLE_HTTP_ERRORS log.warn "Sending metrics is failed and dropped metrics due to unprocessable on server. Error: `#{response["error"]}', Code: #{code}" return false end log.warn "Failed to send metrics. Error: `#{response["error"]}', Code: #{code}" append_pendings(buffer) return false end rescue => ex log.warn "Failed to send metrics: error = #{ex}, backtrace = #{ex.backtrace}" append_pendings(buffer) return false end return true end
append_pendings(buffer)
click to toggle source
# File lib/fluent/plugin/in_calyptia_monitoring.rb, line 214 def append_pendings(buffer) @monitor.synchronize do if @pending.empty? @pending = [buffer] elsif @pending.size >= DEFAULT_PENDING_METRICS_SIZE drop_count = 1 @pending = @pending.drop(drop_count) log.warn "pending buffer is full. Dropped the first element from the pending buffer" @pending.concat([buffer]) else @pending.concat([buffer]) end end end
check_config_sending_usability()
click to toggle source
# File lib/fluent/plugin/in_calyptia_monitoring.rb, line 187 def check_config_sending_usability unless system_config.rpc_endpoint log.warn "This feature needs to enable RPC with `rpc_endpoint` on <system>." return false end res = retrieve_config_from_rpc if status = (res.code.to_i == 200) return status else log.warn "This feature needs to enable getDump RPC endpoint with `enable_get_dump` on <system>." return false end end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_calyptia_monitoring.rb, line 71 def configure(conf) super config = conf.elements.select{|e| e.name == 'storage' }.first @storage_agent = storage_create(usage: 'calyptia_monitoring_agent', conf: config, default_type: DEFAULT_STORAGE_TYPE) end
create_agent(current_config)
click to toggle source
# File lib/fluent/plugin/in_calyptia_monitoring.rb, line 159 def create_agent(current_config) code, agent, machine_id = @api_requester.create_agent(current_config) if code.to_s.start_with?("2") @storage_agent.put(:agent, agent) @storage_agent.put(:machine_id, machine_id) return true else raise CreateAgentError, "Create agent is failed. Error: `#{agent["error"]}', code: #{code}" end end
get_current_config_from_rpc()
click to toggle source
# File lib/fluent/plugin/in_calyptia_monitoring.rb, line 78 def get_current_config_from_rpc res = retrieve_config_from_rpc config = Yajl.load(res.body)["conf"] conf = Fluent::Config.parse(config, '(supervisor)', '(RPC)', true) confs = [] conf.elements.select{|e| e.name == 'ROOT' }.first.elements.each{|e| confs << e.to_s } # Remove outer <ROOT> element confs.join end
get_masked_conf_from_conf_file()
click to toggle source
# File lib/fluent/plugin/in_calyptia_monitoring.rb, line 90 def get_masked_conf_from_conf_file return "" unless File.exist?(@cloud_monitoring.fluentd_conf_path) # check file existence. conf = "" callback = ->(status) { if status && status.success? #nop elsif status log.warn "config dumper exits with error code", prog: prog, status: status.exitstatus, signal: status.termsig else log.warn "config dumper unexpectedly exits without exit status", prog: prog end } spawn_command, arguments = if Fluent.windows? [::ServerEngine.ruby_bin_path, File.join(File.dirname(__FILE__), "calyptia_config_dumper.rb")] else [File.join(File.dirname(__FILE__), "calyptia_config_dumper.rb")] end retval = child_process_execute(:exec_calyptia_config_dumper, spawn_command, arguments: arguments, immediate: true, env: {"FLUENT_CONFIG_PATH" => @cloud_monitoring.fluentd_conf_path}, parallel: true, mode: [:read_with_stderr], on_exit_callback: callback) do |io| io.set_encoding(Encoding::ASCII_8BIT) conf = io.read end unless retval.nil? begin Timeout.timeout(10) do sleep 0.1 until !conf.empty? end rescue Timeout::Error log.warn "cannot retrive configuration contents on #{@cloud_monitoring.fluentd_conf_path} within 10 seconds." end end conf end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/in_calyptia_monitoring.rb, line 60 def multi_workers_ready? true end
on_timer_send_metrics()
click to toggle source
# File lib/fluent/plugin/in_calyptia_monitoring.rb, line 260 def on_timer_send_metrics opts = {with_config: false, with_retry: false} buffer = "" @monitor_agent.plugins_info_all(opts).each { |record| metrics = record["metrics"] metrics.each_pair do |k, v| buffer += v end } @monitor_agent_buffer.plugins_info_all(opts).each {|record| metrics = record["metrics"] metrics.each_pair do |k, v| buffer += v end } if buffer.empty? log.debug "No initialized metrics is found. Trying to send cmetrics on the next tick." else unless add_metrics(buffer) log.warn "Sending metrics is failed. Trying to send pending buffers in the next interval: #{@cloud_monitoring.rate}, next sending time: #{Time.now + @cloud_monitoring.rate}" end end end
retrieve_config_from_rpc()
click to toggle source
# File lib/fluent/plugin/in_calyptia_monitoring.rb, line 202 def retrieve_config_from_rpc uri = URI.parse("http://#{system_config.rpc_endpoint}") res = Net::HTTP.start(uri.host, uri.port) {|http| http.get(RPC_CONFIG_DUMP_ENDPOINT) } res end
setup_agent(current_config)
click to toggle source
# File lib/fluent/plugin/in_calyptia_monitoring.rb, line 170 def setup_agent(current_config) if agent = @storage_agent.get(:agent) unless machine_id = @storage_agent.get(:machine_id) return create_agent(current_config) end code, body = @api_requester.update_agent(current_config, agent, machine_id) if code.to_s.start_with?("2") return true else log.warn "Updating agent is failed. Error: #{Yajl.load(body)["error"]}, Code: #{code}" return false end else create_agent(current_config) end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_calyptia_monitoring.rb, line 210 def shutdown super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_calyptia_monitoring.rb, line 127 def start super enabled_cmetrics = if system_config.metrics system_config.metrics[:@type] == "cmetrics" else false end raise Fluent::ConfigError, "cmetrics plugin should be used to collect metrics on Calyptia Cloud" unless enabled_cmetrics @monitor_agent = Fluent::Plugin::CalyptiaMonitoringExtInput.new @monitor_agent_buffer = Fluent::Plugin::CalyptiaMonitoringBufferExtInput.new @api_requester = Fluent::Plugin::CalyptiaAPI::Requester.new(@cloud_monitoring.endpoint, @cloud_monitoring.api_key, log, fluentd_worker_id) @current_config = if !@cloud_monitoring.fluentd_conf_path.nil? get_masked_conf_from_conf_file elsif check_config_sending_usability get_current_config_from_rpc end if @cloud_monitoring.rate < 30 log.warn "High frequency events ingestion is not supported. Set up 30s as ingestion interval" @cloud_monitoring[:rate] = 30 end if setup_agent(@current_config) timer_execute(:in_calyptia_monitoring_send_metrics, @cloud_monitoring.rate, &method(:on_timer_send_metrics)) else raise UpdateAgentError, "Setup agent is failed. Something went wrong" end end