class Fluent::Plugin::CwmHttpInput
Custom HTTP Input Plugin
class for CWM
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_http_cwm.rb, line 69 def initialize super @redis = nil @deployment_api_metrics = default_api_metrics_hash end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_http_cwm.rb, line 76 def configure(conf) super set_up_redis end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_http_cwm.rb, line 82 def start super # set up timer for flush interval timer_execute(:metrics_flush_timer, @redis_config.flush_interval) do flush_api_metrics end log.info("Starting HTTP server [#{@host}:#{@port}]...") http_server_create_http_server(:http_server, addr: @host, port: @port, logger: log) do |server| server.post("/#{tag}") do |req| data = parse_data(req.body) route(data) if update_deployment_metrics(data) # return HTTP 200 OK response to MinIO [200, { 'Content-Type' => 'text/plain' }, nil] end end end
Private Instance Methods
days_to_seconds(days)
click to toggle source
# File lib/fluent/plugin/in_http_cwm.rb, line 144 def days_to_seconds(days) days * 24 * 60 * 60 end
default_api_metrics_hash()
click to toggle source
# File lib/fluent/plugin/in_http_cwm.rb, line 104 def default_api_metrics_hash Hash.new do |h, k| h[k] = { 'bytes_in' => 0, 'bytes_out' => 0, 'num_requests_in' => 0, 'num_requests_out' => 0, 'num_requests_misc' => 0 } end end
flush_api_metrics()
click to toggle source
# File lib/fluent/plugin/in_http_cwm.rb, line 243 def flush_api_metrics return if @deployment_api_metrics.empty? log.debug("Flushing metrics: #{@deployment_api_metrics}") begin @redis.pipelined do @deployment_api_metrics.each do |deploymentid, metrics| metrics.each do |metric, value| @redis.incrby("#{@redis_config.metrics_prefix}:#{deploymentid}:#{metric}", value) if value.positive? end end end @deployment_api_metrics = default_api_metrics_hash log.debug('Flushing complete!') rescue StandardError => e log.error("Unable to flush metrics! ERROR: '#{e}'.") end end
get_request_type(api_name)
click to toggle source
# File lib/fluent/plugin/in_http_cwm.rb, line 221 def get_request_type(api_name) in_apis = %w[WebUpload PutObject DeleteObject].freeze out_apis = %w[WebDownload GetObject].freeze return 'in' if in_apis.include?(api_name) return 'out' if out_apis.include?(api_name) 'misc' end
parse_data(data)
click to toggle source
# File lib/fluent/plugin/in_http_cwm.rb, line 131 def parse_data(data) JSON.parse(data) rescue StandardError => e log.debug("ERROR: #{e}") nil end
route(data)
click to toggle source
# File lib/fluent/plugin/in_http_cwm.rb, line 138 def route(data) time = Fluent::Engine.now record = { 'message' => data } router.emit(@tag, time, record) end
set_up_redis()
click to toggle source
# File lib/fluent/plugin/in_http_cwm.rb, line 113 def set_up_redis host = @redis_config.host port = @redis_config.port db = @redis_config.db log.info("Connecting with Redis [address: #{host}:#{port}, db: #{db}]") @redis = Redis.new(host: host, port: port, db: db) ready = false until ready sleep(1) begin @redis.ping ready = true rescue StandardError => e log.error("Unable to connect to Redis server! ERROR: '#{e}'. Retrying...") end end end
update_deployment_api_metrics(deploymentid, api_name, request_content_length, response_content_length, response_is_cached)
click to toggle source
# File lib/fluent/plugin/in_http_cwm.rb, line 230 def update_deployment_api_metrics(deploymentid, api_name, request_content_length, response_content_length, response_is_cached) log.debug('Updating deployment API metrics') request_type = get_request_type(api_name) log.debug("#{deploymentid}.#{api_name}: (type=#{request_type}, req_size=#{request_content_length}, res_size=#{response_content_length}, res_cache=#{response_is_cached})") metrics = @deployment_api_metrics[deploymentid] metrics['bytes_in'] += request_content_length metrics['bytes_out'] += response_content_length metrics["num_requests_#{request_type}"] += 1 @deployment_api_metrics[deploymentid] = metrics end
update_deployment_last_action(deploymentid)
click to toggle source
# File lib/fluent/plugin/in_http_cwm.rb, line 148 def update_deployment_last_action(deploymentid) log.debug('Updating deployment last action') key = "#{@redis_config.last_update_prefix}:#{deploymentid}" curdt = DateTime.now begin log.debug("checking existing last action entry [key: #{key}]") lastval = @redis.get(key) is_grace_period_expired = false if lastval lastdt = DateTime.parse(lastval, FMT_DATETIME) dt_diff_secs = days_to_seconds((curdt - lastdt).to_f) if dt_diff_secs > @redis_config.grace_period is_grace_period_expired = true log.debug("grace period [#{@redis_config.grace_period}] expired [#{lastdt} => #{curdt}]") end else log.debug('last action entry not found. going to be set for the first time.') end if lastdt.nil? || is_grace_period_expired last_action = curdt.strftime(FMT_DATETIME) @redis.set(key, last_action) log.debug("Updated last action entry [#{key} => #{last_action}]") end rescue StandardError => e log.error("Unable to update last action! ERROR: '#{e}'.") end end
update_deployment_metrics(data)
click to toggle source
# File lib/fluent/plugin/in_http_cwm.rb, line 186 def update_deployment_metrics(data) return false unless data log.debug('Updating deployment metrics') deploymentid = validate_and_get_value(data, 'deploymentid') return false unless deploymentid update_deployment_last_action(deploymentid) api_data = validate_and_get_value(data, 'api') return false unless api_data api_name = validate_and_get_value(api_data, 'name') return false unless api_name response_header_data = validate_and_get_value(data, 'responseHeader') return false unless response_header_data request_header_data = validate_and_get_value(data, 'requestHeader') return false unless request_header_data response_content_length = response_header_data['Content-Length'].to_i response_content_length += response_header_data.to_s.length response_is_cached = (response_header_data['X-Cache'] == 'HIT') request_content_length = request_header_data['Content-Length'].to_i request_content_length += request_header_data.to_s.length update_deployment_api_metrics(deploymentid, api_name, request_content_length, response_content_length, response_is_cached) true end
validate_and_get_value(data_hash, key)
click to toggle source
# File lib/fluent/plugin/in_http_cwm.rb, line 180 def validate_and_get_value(data_hash, key) value = data_hash[key] log.debug("missing '#{key}': #{data_hash.to_json}") unless value value end