class Fluent::Plugin::CwmHttpInput

Custom HTTP Input Plugin class for CWM

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_http_cwm.rb, line 69
def initialize
  super

  @redis = nil
  @deployment_api_metrics = default_api_metrics_hash
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_http_cwm.rb, line 76
def configure(conf)
  super

  set_up_redis
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_http_cwm.rb, line 82
def start
  super

  # set up timer for flush interval
  timer_execute(:metrics_flush_timer, @redis_config.flush_interval) do
    flush_api_metrics
  end

  log.info("Starting HTTP server [#{@host}:#{@port}]...")
  http_server_create_http_server(:http_server, addr: @host, port: @port, logger: log) do |server|
    server.post("/#{tag}") do |req|
      data = parse_data(req.body)
      route(data) if update_deployment_metrics(data)

      # return HTTP 200 OK response to MinIO
      [200, { 'Content-Type' => 'text/plain' }, nil]
    end
  end
end

Private Instance Methods

days_to_seconds(days) click to toggle source
# File lib/fluent/plugin/in_http_cwm.rb, line 144
def days_to_seconds(days)
  days * 24 * 60 * 60
end
default_api_metrics_hash() click to toggle source
# File lib/fluent/plugin/in_http_cwm.rb, line 104
def default_api_metrics_hash
  Hash.new do |h, k|
    h[k] = {
      'bytes_in' => 0, 'bytes_out' => 0,
      'num_requests_in' => 0, 'num_requests_out' => 0, 'num_requests_misc' => 0
    }
  end
end
flush_api_metrics() click to toggle source
# File lib/fluent/plugin/in_http_cwm.rb, line 243
def flush_api_metrics
  return if @deployment_api_metrics.empty?

  log.debug("Flushing metrics: #{@deployment_api_metrics}")

  begin
    @redis.pipelined do
      @deployment_api_metrics.each do |deploymentid, metrics|
        metrics.each do |metric, value|
          @redis.incrby("#{@redis_config.metrics_prefix}:#{deploymentid}:#{metric}", value) if value.positive?
        end
      end
    end

    @deployment_api_metrics = default_api_metrics_hash

    log.debug('Flushing complete!')
  rescue StandardError => e
    log.error("Unable to flush metrics! ERROR: '#{e}'.")
  end
end
get_request_type(api_name) click to toggle source
# File lib/fluent/plugin/in_http_cwm.rb, line 221
def get_request_type(api_name)
  in_apis = %w[WebUpload PutObject DeleteObject].freeze
  out_apis = %w[WebDownload GetObject].freeze
  return 'in' if in_apis.include?(api_name)
  return 'out' if out_apis.include?(api_name)

  'misc'
end
parse_data(data) click to toggle source
# File lib/fluent/plugin/in_http_cwm.rb, line 131
def parse_data(data)
  JSON.parse(data)
rescue StandardError => e
  log.debug("ERROR: #{e}")
  nil
end
route(data) click to toggle source
# File lib/fluent/plugin/in_http_cwm.rb, line 138
def route(data)
  time = Fluent::Engine.now
  record = { 'message' => data }
  router.emit(@tag, time, record)
end
set_up_redis() click to toggle source
# File lib/fluent/plugin/in_http_cwm.rb, line 113
def set_up_redis
  host = @redis_config.host
  port = @redis_config.port
  db = @redis_config.db
  log.info("Connecting with Redis [address: #{host}:#{port}, db: #{db}]")
  @redis = Redis.new(host: host, port: port, db: db)
  ready = false
  until ready
    sleep(1)
    begin
      @redis.ping
      ready = true
    rescue StandardError => e
      log.error("Unable to connect to Redis server! ERROR: '#{e}'. Retrying...")
    end
  end
end
update_deployment_api_metrics(deploymentid, api_name, request_content_length, response_content_length, response_is_cached) click to toggle source
# File lib/fluent/plugin/in_http_cwm.rb, line 230
def update_deployment_api_metrics(deploymentid, api_name, request_content_length, response_content_length, response_is_cached)
  log.debug('Updating deployment API metrics')

  request_type = get_request_type(api_name)
  log.debug("#{deploymentid}.#{api_name}: (type=#{request_type}, req_size=#{request_content_length}, res_size=#{response_content_length}, res_cache=#{response_is_cached})")

  metrics = @deployment_api_metrics[deploymentid]
  metrics['bytes_in'] += request_content_length
  metrics['bytes_out'] += response_content_length
  metrics["num_requests_#{request_type}"] += 1
  @deployment_api_metrics[deploymentid] = metrics
end
update_deployment_last_action(deploymentid) click to toggle source
# File lib/fluent/plugin/in_http_cwm.rb, line 148
def update_deployment_last_action(deploymentid)
  log.debug('Updating deployment last action')

  key = "#{@redis_config.last_update_prefix}:#{deploymentid}"
  curdt = DateTime.now

  begin
    log.debug("checking existing last action entry [key: #{key}]")
    lastval = @redis.get(key)

    is_grace_period_expired = false
    if lastval
      lastdt = DateTime.parse(lastval, FMT_DATETIME)
      dt_diff_secs = days_to_seconds((curdt - lastdt).to_f)
      if dt_diff_secs > @redis_config.grace_period
        is_grace_period_expired = true
        log.debug("grace period [#{@redis_config.grace_period}] expired [#{lastdt} => #{curdt}]")
      end
    else
      log.debug('last action entry not found. going to be set for the first time.')
    end

    if lastdt.nil? || is_grace_period_expired
      last_action = curdt.strftime(FMT_DATETIME)
      @redis.set(key, last_action)
      log.debug("Updated last action entry [#{key} => #{last_action}]")
    end
  rescue StandardError => e
    log.error("Unable to update last action! ERROR: '#{e}'.")
  end
end
update_deployment_metrics(data) click to toggle source
# File lib/fluent/plugin/in_http_cwm.rb, line 186
def update_deployment_metrics(data)
  return false unless data

  log.debug('Updating deployment metrics')

  deploymentid = validate_and_get_value(data, 'deploymentid')
  return false unless deploymentid

  update_deployment_last_action(deploymentid)

  api_data = validate_and_get_value(data, 'api')
  return false unless api_data

  api_name = validate_and_get_value(api_data, 'name')
  return false unless api_name

  response_header_data = validate_and_get_value(data, 'responseHeader')
  return false unless response_header_data

  request_header_data = validate_and_get_value(data, 'requestHeader')
  return false unless request_header_data

  response_content_length = response_header_data['Content-Length'].to_i
  response_content_length += response_header_data.to_s.length

  response_is_cached = (response_header_data['X-Cache'] == 'HIT')

  request_content_length = request_header_data['Content-Length'].to_i
  request_content_length += request_header_data.to_s.length

  update_deployment_api_metrics(deploymentid, api_name, request_content_length, response_content_length, response_is_cached)

  true
end
validate_and_get_value(data_hash, key) click to toggle source
# File lib/fluent/plugin/in_http_cwm.rb, line 180
def validate_and_get_value(data_hash, key)
  value = data_hash[key]
  log.debug("missing '#{key}': #{data_hash.to_json}") unless value
  value
end