class Fluent::Plugin::HttpInput

Constants

EMPTY_GIF_IMAGE
EVENT_RECORD_PARAMETER

Public Instance Methods

close() click to toggle source
Calls superclass method Fluent::Plugin::Base#close
# File lib/fluent/plugin/in_http.rb, line 147
def close
  @lsock.close
  super
end
configure(conf) click to toggle source
Calls superclass method Fluent::PluginLoggerMixin#configure
# File lib/fluent/plugin/in_http.rb, line 70
def configure(conf)
  compat_parameters_convert(conf, :parser)

  super

  m = if @parser_configs.first['@type'] == 'in_http'
        @parser_msgpack = parser_create(usage: 'parser_in_http_msgpack', type: 'msgpack')
        @parser_msgpack.estimate_current_event = false
        @parser_json = parser_create(usage: 'parser_in_http_json', type: 'json')
        @parser_json.estimate_current_event = false
        @format_name = 'default'
        method(:parse_params_default)
      else
        @parser = parser_create
        @format_name = @parser_configs.first['@type']
        method(:parse_params_with_parser)
      end
  self.singleton_class.module_eval do
    define_method(:parse_params, m)
  end
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/in_http.rb, line 116
def multi_workers_ready?
  true
end
on_request(path_info, params) click to toggle source
# File lib/fluent/plugin/in_http.rb, line 152
def on_request(path_info, params)
  begin
    path = path_info[1..-1]  # remove /
    tag = path.split('/').join('.')
    record_time, record = parse_params(params)

    # Skip nil record
    if record.nil?
      if @respond_with_empty_img
        return ["200 OK", {'Content-Type'=>'image/gif; charset=utf-8'}, EMPTY_GIF_IMAGE]
      else
        return ["200 OK", {'Content-Type'=>'text/plain'}, ""]
      end
    end

    unless record.is_a?(Array)
      if @add_http_headers
        params.each_pair { |k,v|
          if k.start_with?("HTTP_")
            record[k] = v
          end
        }
      end
      if @add_remote_addr
        record['REMOTE_ADDR'] = params['REMOTE_ADDR']
      end
    end
    time = if param_time = params['time']
             param_time = param_time.to_f
             param_time.zero? ? Fluent::Engine.now : @float_time_parser.parse(param_time)
           else
             record_time.nil? ? Fluent::Engine.now : record_time
           end
  rescue
    return ["400 Bad Request", {'Content-Type'=>'text/plain'}, "400 Bad Request\n#{$!}\n"]
  end

  # TODO server error
  begin
    # Support batched requests
    if record.is_a?(Array)
      mes = Fluent::MultiEventStream.new
      record.each do |single_record|
        if @add_http_headers
          params.each_pair { |k,v|
            if k.start_with?("HTTP_")
              single_record[k] = v
            end
          }
        end
        if @add_remote_addr
          single_record['REMOTE_ADDR'] = params['REMOTE_ADDR']
        end
        single_time = single_record.delete("time") || time
        mes.add(single_time, single_record)
      end
      router.emit_stream(tag, mes)
    else
      router.emit(tag, time, record)
    end
  rescue
    return ["500 Internal Server Error", {'Content-Type'=>'text/plain'}, "500 Internal Server Error\n#{$!}\n"]
  end

  if @respond_with_empty_img
    return ["200 OK", {'Content-Type'=>'image/gif; charset=utf-8'}, EMPTY_GIF_IMAGE]
  else
    return ["200 OK", {'Content-Type'=>'text/plain'}, ""]
  end
end
start() click to toggle source
Calls superclass method Fluent::Compat::Input#start
# File lib/fluent/plugin/in_http.rb, line 120
def start
  @_event_loop_run_timeout = @blocking_timeout

  super

  log.debug "listening http", bind: @bind, port: @port

  socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
  if Fluent.windows?
    socket_manager_path = socket_manager_path.to_i
  end
  client = ServerEngine::SocketManager::Client.new(socket_manager_path)
  lsock = client.listen_tcp(@bind, @port)

  @km = KeepaliveManager.new(@keepalive_timeout)
  @lsock = Coolio::TCPServer.new(
    lsock, nil, Handler, @km, method(:on_request),
    @body_size_limit, @format_name, log,
    @cors_allow_origins
  )
  @lsock.listen(@backlog) unless @backlog.nil?
  event_loop_attach(@km)
  event_loop_attach(@lsock)

  @float_time_parser = Fluent::NumericTimeParser.new(:float)
end

Private Instance Methods

parse_params_default(params) click to toggle source
# File lib/fluent/plugin/in_http.rb, line 225
def parse_params_default(params)
  if msgpack = params['msgpack']
    @parser_msgpack.parse(msgpack) do |_time, record|
      return nil, record
    end
  elsif js = params['json']
    @parser_json.parse(js) do |_time, record|
      return nil, record
    end
  else
    raise "'json' or 'msgpack' parameter is required"
  end
end
parse_params_with_parser(params) click to toggle source
# File lib/fluent/plugin/in_http.rb, line 239
def parse_params_with_parser(params)
  if content = params[EVENT_RECORD_PARAMETER]
    @parser.parse(content) { |time, record|
      raise "Received event is not #{@format_name}: #{content}" if record.nil?
      return time, record
    }
  else
    raise "'#{EVENT_RECORD_PARAMETER}' parameter is required"
  end
end