class Fluent::HttpPumaInput

Constants

OK_RESPONSE

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_http_puma.rb, line 5
def initialize
  require 'puma'
  require 'uri'
  super
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_http_puma.rb, line 20
def configure(conf)
  super

  if @use_ssl && !@ssl_keys
    raise ConfigError, 'ssl_keys parameter is required when use_ssl is true'
  end

  if @format != 'default'
    @parser = TextParser.new
    @parser.configure(conf)
  end
end
on_request(env) click to toggle source
# File lib/fluent/plugin/in_http_puma.rb, line 64
def on_request(env)
  uri = URI.parse(env['REQUEST_URI'.freeze])
  params = Rack::Utils.parse_query(uri.query)
  path_info = uri.path

  begin
    path = path_info[1..-1]  # remove /
    tag = path.split('/').join('.')
    record_time, record = parse_record(env, params)

    # Skip nil record
    if record.nil?
      return OK_RESPONSE
    end

    time = if param_time = params['time'.freeze]
             param_time = param_time.to_i
             param_time.zero? ? Engine.now : param_time
           else
             record_time.nil? ? Engine.now : record_time
           end
  rescue => e
    return [400, {'Content-type'=>'text/plain'}, ["Bad Request\n#{e}\n"]]
  end

  begin
    # Support batched requests
    if record.is_a?(Array)           
      mes = MultiEventStream.new
      record.each do |single_record|
        single_time = single_record.delete("time".freeze) || time 
        mes.add(single_time, single_record)
      end
      Engine.emit_stream(tag, mes)
    else
      Engine.emit(tag, time, record)
    end
  rescue => e
    return [500, {'Content-type'=>'text/plain'}, ["Internal Server Error\n#{e}\n"]]
  end

  return OK_RESPONSE
end
run() click to toggle source
# File lib/fluent/plugin/in_http_puma.rb, line 55
def run
  @server.run(false)
rescue => e
  log.error "unexpected error", :error => e.to_s
  log.error_backtrace e.backtrace
end
shutdown() click to toggle source
# File lib/fluent/plugin/in_http_puma.rb, line 50
def shutdown
  @server.stop(true)
  @thread.join
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_http_puma.rb, line 33
def start
  super

  # Refer puma's Runner and Rack handler for puma server setup
  @server = ::Puma::Server.new(method(:on_request))
  @server.min_threads = @min_threads
  @server.max_threads = @max_threads
  @server.leak_stack_on_error = false
  if @use_ssl
    setup_https
  else
    setup_http
  end

  @thread = Thread.new(&method(:run))
end

Private Instance Methods

parse_params_form(params) click to toggle source
# File lib/fluent/plugin/in_http_puma.rb, line 160
def parse_params_form(params)
  record = if msgpack = params['msgpack'.freeze]
             MessagePack.unpack(msgpack)
           elsif js = params['json'.freeze]
             JSON.parse(js)
           else
             raise "'json' or 'msgpack' parameter is required"
           end
  return nil, record
end
parse_params_with_parser(content) click to toggle source
# File lib/fluent/plugin/in_http_puma.rb, line 171
def parse_params_with_parser(content)
  @parser.parse(content) { |time, record|
    raise "received event is not #{@format}: #{content}" if record.nil?
    return time, record
  }
end
parse_record(env, params) click to toggle source
# File lib/fluent/plugin/in_http_puma.rb, line 140
def parse_record(env, params)
  body = env['rack.input'.freeze]
  content_type = env['CONTENT_TYPE'.freeze]

  case
  when @format != 'default'.freeze
    parse_params_with_parser(body.read)
  when content_type.start_with?('application/json'.freeze)
    return nil, JSON.load(body)
  when content_type.start_with?('application/x-msgpack'.freeze)
    return nil, MessagePack.unpack(body)
  when content_type =~ /^application\/x-www-form-urlencoded/
    params.update(::Rack::Utils.parse_query(body.read))
    parse_params_form(params)
  when content_type =~ /^multipart\/form-data; boundary=(.+)/
    params.update(::Rack::Multipart.parse_multipart(env))
    parse_params_form(params)
  end
end
setup_http() click to toggle source
# File lib/fluent/plugin/in_http_puma.rb, line 110
def setup_http
  log.info "listening http on #{@bind}:#{@port}"

  opts = [@bind, @port, true]
  opts << @backlog if @backlog
  @server.add_tcp_listener(*opts)
end
setup_https() click to toggle source
# File lib/fluent/plugin/in_http_puma.rb, line 118
def setup_https
  require 'puma/minissl'

  ctx = ::Puma::MiniSSL::Context.new
  key, cert = @ssl_keys
  if defined?(JRUBY_VERSION)
    ctx.keystore = key
    ctx.keystore_pass = cert
  else
    ctx.key = key
    ctx.cert = cert
  end
  ctx.verify_mode = ::Puma::MiniSSL::VERIFY_PEER
  #ctx.verify_mode = ::Puma::MiniSSL::VERIFY_NONE

  log.info "listening https on #{@bind}:#{@port}"

  opts = [@bind, @port, ctx, true]
  opts << @backlog if @backlog
  @server.add_ssl_listener(*opts)
end