# File lib/fluent/plugin/in_http.rb, line 25 def initialize require 'webrick/httputils' require 'uri' super end
# File lib/fluent/plugin/in_http.rb, line 45 def configure(conf) super m = if @format == 'default' method(:parse_params_default) else @parser = Plugin.new_parser(@format) @parser.configure(conf) method(:parse_params_with_parser) end (class << self; self; end).module_eval do define_method(:parse_params, m) end end
# File lib/fluent/plugin/in_http.rb, line 119 def on_request(path_info, params) begin path = path_info[1..-1] # remove / tag = path.split('/').join('.') record_time, record = parse_params(params) # Skip nil record if record.nil? if @respond_with_empty_img return ["200 OK", {'Content-type'=>'image/gif; charset=utf-8'}, EMPTY_GIF_IMAGE] else return ["200 OK", {'Content-type'=>'text/plain'}, ""] end end if @add_http_headers params.each_pair { |k,v| if k.start_with?("HTTP_") record[k] = v end } end if @add_remote_addr record['REMOTE_ADDR'] = params['REMOTE_ADDR'] end time = if param_time = params['time'] param_time = param_time.to_i param_time.zero? ? Engine.now : param_time else record_time.nil? ? Engine.now : record_time end rescue return ["400 Bad Request", {'Content-type'=>'text/plain'}, "400 Bad Request\n#{$!}\n"] end # TODO server error begin # Support batched requests if record.is_a?(Array) mes = MultiEventStream.new record.each do |single_record| single_time = single_record.delete("time") || time mes.add(single_time, single_record) end router.emit_stream(tag, mes) else router.emit(tag, time, record) end rescue return ["500 Internal Server Error", {'Content-type'=>'text/plain'}, "500 Internal Server Error\n#{$!}\n"] end if @respond_with_empty_img return ["200 OK", {'Content-type'=>'image/gif; charset=utf-8'}, EMPTY_GIF_IMAGE] else return ["200 OK", {'Content-type'=>'text/plain'}, ""] end end
# File lib/fluent/plugin/in_http.rb, line 112 def run @loop.run(@blocking_timeout) rescue log.error "unexpected error", :error=>$!.to_s log.error_backtrace end
# File lib/fluent/plugin/in_http.rb, line 105 def shutdown @loop.watchers.each {|w| w.detach } @loop.stop @lsock.close @thread.join end
# File lib/fluent/plugin/in_http.rb, line 84 def start log.debug "listening http on #{@bind}:#{@port}" lsock = TCPServer.new(@bind, @port) detach_multi_process do super @km = KeepaliveManager.new(@keepalive_timeout) #@lsock = Coolio::TCPServer.new(@bind, @port, Handler, @km, method(:on_request), @body_size_limit) @lsock = Coolio::TCPServer.new(lsock, nil, Handler, @km, method(:on_request), @body_size_limit, @format, log, @cors_allow_origins) @lsock.listen(@backlog) unless @backlog.nil? @loop = Coolio::Loop.new @loop.attach(@km) @loop.attach(@lsock) @thread = Thread.new(&method(:run)) end end
# File lib/fluent/plugin/in_http.rb, line 182 def parse_params_default(params) record = if msgpack = params['msgpack'] MessagePack.unpack(msgpack) elsif js = params['json'] JSON.parse(js) else raise "'json' or 'msgpack' parameter is required" end return nil, record end
# File lib/fluent/plugin/in_http.rb, line 195 def parse_params_with_parser(params) if content = params[EVENT_RECORD_PARAMETER] @parser.parse(content) { |time, record| raise "Received event is not #{@format}: #{content}" if record.nil? return time, record } else raise "'#{EVENT_RECORD_PARAMETER}' parameter is required" end end