class Fluent::Plugin::HttpInput

Constants

EMPTY_GIF_IMAGE
EVENT_RECORD_PARAMETER
RESPONSE_200
RESPONSE_204
RESPONSE_IMG
RES_400_STATUS
RES_500_STATUS
RES_TEXT_HEADER

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::Compat::Input::new
# File lib/fluent/plugin/in_http.rb, line 94
def initialize
  super

  @km = nil
  @format_name = nil
  @parser_time_key = nil

  # default parsers
  @parser_msgpack = nil
  @parser_json = nil
  @default_time_parser = nil
  @default_keep_time_key = nil
  @float_time_parser = nil

  # <parse> configured parser
  @custom_parser = nil
end

Public Instance Methods

close() click to toggle source
Calls superclass method Fluent::Plugin::Base#close
# File lib/fluent/plugin/in_http.rb, line 190
def close
  server_wait_until_stop
  super
end
configure(conf) click to toggle source
Calls superclass method Fluent::Plugin::Input#configure
# File lib/fluent/plugin/in_http.rb, line 112
def configure(conf)
  compat_parameters_convert(conf, :parser)

  super

  if @cors_allow_credentials
    if @cors_allow_origins.nil? || @cors_allow_origins.include?('*')
      raise Fluent::ConfigError, "Cannot enable cors_allow_credentials without specific origins"
    end
  end

  m = if @parser_configs.first['@type'] == 'in_http'
        @parser_msgpack = parser_create(usage: 'parser_in_http_msgpack', type: 'msgpack')
        @parser_msgpack.time_key = nil
        @parser_msgpack.estimate_current_event = false
        @parser_json = parser_create(usage: 'parser_in_http_json', type: 'json')
        @parser_json.time_key = nil
        @parser_json.estimate_current_event = false

        default_parser = parser_create(usage: '')
        @format_name = 'default'
        @parser_time_key = default_parser.time_key
        @default_time_parser = default_parser.get_time_parser
        @default_keep_time_key = default_parser.keep_time_key
        method(:parse_params_default)
      else
        @custom_parser = parser_create
        @format_name = @parser_configs.first['@type']
        @parser_time_key = @custom_parser.time_key
        method(:parse_params_with_parser)
      end
  self.singleton_class.module_eval do
    define_method(:parse_params, m)
  end
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/in_http.rb, line 172
def multi_workers_ready?
  true
end
on_request(path_info, params) click to toggle source
# File lib/fluent/plugin/in_http.rb, line 202
def on_request(path_info, params)
  begin
    path = path_info[1..-1]  # remove /
    tag = path.split('/').join('.')

    mes = Fluent::MultiEventStream.new
    parse_params(params) do |record_time, record|
      if record.nil?
        log.debug { "incoming event is invalid: path=#{path_info} params=#{params.to_json}" }
        next
      end

      add_params_to_record(record, params)

      time = if param_time = params['time']
               param_time = param_time.to_f
               param_time.zero? ? Fluent::EventTime.now : @float_time_parser.parse(param_time)
             else
               record_time.nil? ? convert_time_field(record) : record_time
             end

      mes.add(time, record)
    end
  rescue => e
    if @dump_error_log
      log.error "failed to process request", error: e
    end
    return [RES_400_STATUS, RES_TEXT_HEADER, "400 Bad Request\n#{e}\n"]
  end

  # TODO server error
  begin
    router.emit_stream(tag, mes) unless mes.empty?
  rescue => e
    if @dump_error_log
      log.error "failed to emit data", error: e
    end
    return [RES_500_STATUS, RES_TEXT_HEADER, "500 Internal Server Error\n#{e}\n"]
  end

  if @respond_with_empty_img
    return RESPONSE_IMG
  else
    if @use_204_response
      return RESPONSE_204
    else
      return RESPONSE_200
    end
  end
end
start() click to toggle source
Calls superclass method Fluent::Compat::Input#start
# File lib/fluent/plugin/in_http.rb, line 176
def start
  @_event_loop_run_timeout = @blocking_timeout

  super

  log.debug "listening http", bind: @bind, port: @port

  @km = KeepaliveManager.new(@keepalive_timeout)
  event_loop_attach(@km)

  server_create_connection(:in_http, @port, bind: @bind, backlog: @backlog, &method(:on_server_connect))
  @float_time_parser = Fluent::NumericTimeParser.new(:float)
end

Private Instance Methods

add_params_to_record(record, params) click to toggle source
# File lib/fluent/plugin/in_http.rb, line 304
def add_params_to_record(record, params)
  if @add_http_headers
    params.each_pair { |k, v|
      if k.start_with?("HTTP_".freeze)
        record[k] = v
      end
    }
  end

  if @add_query_params
    params.each_pair { |k, v|
      if k.start_with?("QUERY_".freeze)
        record[k] = v
      end
    }
  end

  if @add_remote_addr
    record['REMOTE_ADDR'] = params['REMOTE_ADDR']
  end
end
convert_time_field(record) click to toggle source
# File lib/fluent/plugin/in_http.rb, line 326
def convert_time_field(record)
  if t = @default_keep_time_key ? record[@parser_time_key] : record.delete(@parser_time_key)
    if @default_time_parser
      @default_time_parser.parse(t)
    else
      Fluent::EventTime.from_time(Time.at(t))
    end
  else
    Fluent::EventTime.now
  end
end
on_server_connect(conn) click to toggle source
# File lib/fluent/plugin/in_http.rb, line 255
def on_server_connect(conn)
  handler = Handler.new(conn, @km, method(:on_request),
                        @body_size_limit, @format_name, log,
                        @cors_allow_origins, @cors_allow_credentials,
                        @add_query_params)

  conn.on(:data) do |data|
    handler.on_read(data)
  end

  conn.on(:write_complete) do |_|
    handler.on_write_complete
  end

  conn.on(:close) do |_|
    handler.on_close
  end
end
parse_params_default(params) { |nil, record| ... } click to toggle source
# File lib/fluent/plugin/in_http.rb, line 274
def parse_params_default(params)
  if msgpack = params['msgpack']
    @parser_msgpack.parse(msgpack) do |_time, record|
      yield nil, record
    end
  elsif js = params['json']
    @parser_json.parse(js) do |_time, record|
      yield nil, record
    end
  elsif ndjson = params['ndjson']
    ndjson.split(/\r?\n/).each do |js|
      @parser_json.parse(js) do |_time, record|
        yield nil, record
      end
    end
  else
    raise "'json', 'ndjson' or 'msgpack' parameter is required"
  end
end
parse_params_with_parser(params) { |time, record| ... } click to toggle source
# File lib/fluent/plugin/in_http.rb, line 294
def parse_params_with_parser(params)
  if content = params[EVENT_RECORD_PARAMETER]
    @custom_parser.parse(content) do |time, record|
      yield time, record
    end
  else
    raise "'#{EVENT_RECORD_PARAMETER}' parameter is required"
  end
end