class RIMS::Service

Constants

DEFAULT_CONFIG

Public Class Methods

new(config) click to toggle source
# File lib/rims/service.rb, line 820
def initialize(config)
  @config = config
end

Public Instance Methods

setup(server, daemon: false) click to toggle source
# File lib/rims/service.rb, line 826
def setup(server, daemon: false)
  *file_logger_params, file_logger_opts = @config.make_file_logger_params
  logger = Logger.new(*file_logger_params, **file_logger_opts)

  *stdout_logger_params, stdout_logger_opts = @config.make_stdout_logger_params
  unless (daemon && @config.daemonize?) then
    logger += Logger.new(*stdout_logger_params, **stdout_logger_opts)
  end

  *protocol_logger_params, protocol_logger_opts = @config.make_protocol_logger_params
  protocol_logger = Logger.new(*protocol_logger_params, **protocol_logger_opts)

  logger.info('preload libraries.')
  Riser.preload
  Riser.preload(RIMS)
  Riser.preload(RIMS::Protocol)
  @config.require_features

  logger.info('setup server.')
  server.accept_polling_timeout_seconds          = @config.accept_polling_timeout_seconds
  server.process_num                             = @config.process_num
  server.process_queue_size                      = @config.process_queue_size
  server.process_queue_polling_timeout_seconds   = @config.process_queue_polling_timeout_seconds
  server.process_send_io_polling_timeout_seconds = @config.process_send_io_polling_timeout_seconds
  server.thread_num                              = @config.thread_num
  server.thread_queue_size                       = @config.thread_queue_size
  server.thread_queue_polling_timeout_seconds    = @config.thread_queue_polling_timeout_seconds

  ssl_context = @config.ssl_context
  conn_limits = @config.connection_limits

  make_kvs_factory = lambda{|kvs_params, kvs_type|
    kvs_factory = kvs_params.build_factory
    return lambda{|mailbox_data_structure_version, unique_user_id, db_name|
      kvs_path = @config.make_key_value_store_path(mailbox_data_structure_version, unique_user_id)
      unless (kvs_path.directory?) then
        logger.debug("make a directory: #{kvs_path}") if logger.debug?
        kvs_path.mkpath
      end
      db_path = kvs_path + db_name
      logger.debug("#{kvs_type} data key-value sotre path: #{db_path}") if logger.debug?
      kvs_factory.call(db_path.to_s)
    }, lambda{
      logger.info("#{kvs_type} key-value store parameter: type=#{kvs_params.origin_type}")
      logger.info("#{kvs_type} key-value store parameter: config=#{kvs_params.origin_config.to_json}")
      kvs_params.middleware_list.each_with_index do |middleware, i|
        logger.info("#{kvs_type} key-value store parameter: middleware[#{i}]=#{middleware}")
      end
    }
  }

  kvs_meta_open, kvs_meta_log = make_kvs_factory.call(@config.make_meta_key_value_store_params, 'meta')
  kvs_text_open, kvs_text_log = make_kvs_factory.call(@config.make_text_key_value_store_params, 'text')
  auth = @config.make_authentication

  drb_process_num = @config.drb_process_num
  if (@config.process_num > 0) then
    unless (drb_process_num > 0) then
      drb_process_num = 1
      logger.warn('the number of dRuby processes needs to be 1 or more when the number of server processes is 1 or more.')
      logger.warn("drb_services parameter: changed process_num: #{@config.drb_process_num} -> #{drb_process_num}")
    end
  end

  drb_services = Riser::DRbServices.new(drb_process_num,
                                        server_config: @config.drb_server_config,
                                        client_config: @config.drb_client_config)
  drb_services.add_sticky_process_service(:engine,
                                          Riser::ResourceSet.build{|builder|
                                            builder.at_create{|unique_user_id|
                                              mail_store = MailStore.build(unique_user_id, kvs_meta_open, kvs_text_open)
                                              Protocol::Decoder::Engine.new(unique_user_id, mail_store, logger,
                                                                            bulk_response_count: @config.bulk_response_count,
                                                                            bulk_response_size: @config.bulk_response_size,
                                                                            read_lock_timeout_seconds: @config.read_lock_timeout_seconds,
                                                                            write_lock_timeout_seconds: @config.write_lock_timeout_seconds,
                                                                            cleanup_write_lock_timeout_seconds: @config.cleanup_write_lock_timeout_seconds,
                                                                            charset_aliases: @config.charset_aliases,
                                                                            charset_convert_options: @config.charset_convert_options)
                                            }
                                            builder.at_destroy{|engine|
                                              engine.destroy
                                            }
                                            builder.alias_unref(:destroy)
                                          })

  server.before_start{|server_socket|
    logger.info('start server.')
    for feature in @config.get_required_features
      logger.info("required feature: #{feature}")
    end
    logger.info("file logging parameter: path=#{file_logger_params[0]}")
    file_logger_params[1..-2].each_with_index do |value, i|
      logger.info("file logging parameter: shift_args[#{i}]=#{value}")
    end
    for name, value in file_logger_opts
      logger.info("file logging parameter: #{name}=#{value}")
    end
    for name, value in stdout_logger_opts
      logger.info("stdout logging parameter: #{name}=#{value}")
    end
    logger.info("protocol logging parameter: path=#{protocol_logger_params[0]}")
    protocol_logger_params[1..-2].each_with_index do |value, i|
      logger.info("protocol logging parameter: shift_args[#{i}]=#{value}")
    end
    for name, value in protocol_logger_opts
      logger.info("protocol logging parameter: #{name}=#{value}")
    end
    logger.info("listen address: #{server_socket.local_address.inspect_sockaddr}")
    privileged_user = Etc.getpwuid(Process.euid).name rescue ''
    logger.info("server privileged user: #{privileged_user}(#{Process.euid})")
    privileged_group = Etc.getgrgid(Process.egid).name rescue ''
    logger.info("server privileged group: #{privileged_group}(#{Process.egid})")
    logger.info("server parameter: accept_polling_timeout_seconds=#{server.accept_polling_timeout_seconds}")
    logger.info("server parameter: process_num=#{server.process_num}")
    logger.info("server parameter: process_queue_size=#{server.process_queue_size}")
    logger.info("server parameter: process_queue_polling_timeout_seconds=#{server.process_queue_polling_timeout_seconds}")
    logger.info("server parameter: process_send_io_polling_timeout_seconds=#{server.process_send_io_polling_timeout_seconds}")
    logger.info("server parameter: thread_num=#{server.thread_num}")
    logger.info("server parameter: thread_queue_size=#{server.thread_queue_size}")
    logger.info("server parameter: thread_queue_polling_timeout_seconds=#{server.thread_queue_polling_timeout_seconds}")
    if (ssl_context) then
      Array(ssl_context.alpn_protocols).each_with_index do |protocol, i|
        logger.info("openssl parameter: alpn_protocols[#{i}]=#{protocol}")
      end
      logger.info("openssl parameter: alpn_select_cb=#{ssl_context.alpn_select_cb.inspect}") if ssl_context.alpn_select_cb
      logger.info("openssl parameter: ca_file=#{ssl_context.ca_file}") if ssl_context.ca_file
      logger.info("openssl parameter: ca_path=#{ssl_context.ca_path}") if ssl_context.ca_path
      if (ssl_context.cert) then
        ssl_context.cert.to_text.each_line do |line|
          logger.info("openssl parameter: [cert] #{line.chomp}")
        end
      else
        logger.warn('openssl parameter: not defined cert attribute.')
      end
      logger.info("openssl parameter: cert_store=#{ssl_context.cert_store.inspect}") if ssl_context.cert_store
      Array(ssl_context.ciphers).each_with_index do |cipher, i|
        logger.info("openssl parameter: ciphers[#{i}]=#{cipher.join(',')}")
      end
      Array(ssl_context.client_ca).each_with_index do |cert, i|
        cert.to_text.each_line do |line|
          logger.info("openssl parameter: client_ca[#{i}]: #{line.chomp}")
        end
      end
      logger.info("openssl parameter: client_cert_cb=#{ssl_context.client_cert_cb.inspect}") if ssl_context.client_cert_cb
      Array(ssl_context.extra_chain_cert).each_with_index do |cert, i|
        cert.to_text.each_line do |line|
          logger.info("openssl parameter: extra_chain_cert[#{i}]: #{line.chomp}")
        end
      end
      if (ssl_context.key) then
        logger.info("openssl parameter: key=#{ssl_context.key.inspect}")
        if (logger.debug?) then
          ssl_context.key.to_text.each_line do |line|
            logger.debug("openssl parameter: [key] #{line.chomp}")
          end
        end
      else
        logger.warn('openssl parameter: not defined key attribute.')
      end
      Array(ssl_context.npn_protocols).each_with_index do |protocol, i|
        logger.info("openssl parameter: npn_protocols[#{i}]=#{protocol}")
      end
      logger.info("openssl parameter: npn_select_cb=#{ssl_context.npn_select_cb.inspect}") if ssl_context.npn_select_cb
      logger.info("openssl parameter: options=0x#{'%08x' % ssl_context.options}") if ssl_context.options
      logger.info("openssl parameter: renegotiation_cb=#{ssl_context.renegotiation_cb.inspect}") if ssl_context.renegotiation_cb
      logger.info("openssl parameter: security_level=#{ssl_context.security_level}")
      logger.info("openssl parameter: servername_cb=#{ssl_context.servername_cb.inspect}") if ssl_context.servername_cb
      logger.info("openssl parameter: session_cache_mode=0x#{'%08x' % ssl_context.session_cache_mode}")
      logger.info("openssl parameter: session_cache_size=#{ssl_context.session_cache_size }")
      logger.info("openssl parameter: session_get_cb=#{ssl_context.session_get_cb.inspect}") if ssl_context.session_get_cb
      logger.info("openssl parameter: session_id_context=#{ssl_context.session_id_context}") if ssl_context.session_id_context
      logger.info("openssl parameter: session_new_cb=#{ssl_context.session_new_cb.inspect}") if ssl_context.session_new_cb
      logger.info("openssl parameter: session_remove_cb=#{ssl_context.session_remove_cb}") if ssl_context.session_remove_cb
      logger.info("openssl parameter: ssl_timeout=#{ssl_context.ssl_timeout}") if ssl_context.ssl_timeout
      logger.info("openssl parameter: tmp_dh_callback=#{ssl_context.tmp_dh_callback}") if ssl_context.tmp_dh_callback
      logger.info("openssl parameter: verify_callback=#{ssl_context.verify_callback}") if ssl_context.verify_callback
      logger.info("openssl parameter: verify_depth=#{ssl_context.verify_depth}") if ssl_context.verify_depth
      logger.info("openssl parameter: verify_hostname=#{ssl_context.verify_hostname}") if ssl_context.verify_hostname
      logger.info("openssl parameter: verify_mode=0x#{'%08x' % ssl_context.verify_mode}") if ssl_context.verify_mode
    end
    logger.info("connection parameter: send_buffer_limit_size=#{@config.send_buffer_limit_size}")
    logger.info("connection parameter: read_polling_interval_seconds=#{conn_limits.read_polling_interval_seconds}")
    logger.info("connection parameter: command_wait_timeout_seconds=#{conn_limits.command_wait_timeout_seconds}")
    logger.info("protocol parameter: line_length_limit=#{@config.protocol_line_length_limit}")
    logger.info("protocol parameter: literal_size_limit=#{@config.protocol_literal_size_limit}")
    logger.info("protocol parameter: command_size_limit=#{@config.protocol_command_size_limit}")
    @config.charset_aliases.each_with_index do |(name, enc), i|
      logger.info("charset aliases parameter: alias[#{i}]: #{name} -> #{enc.name}")
    end
    for name, value in @config.charset_convert_options
      logger.info("charset convert_options parameter: #{name}=#{value}")
    end
    logger.info("drb_services parameter: process_num=#{drb_process_num}")
    for name, value in @config.drb_server_config
      logger.info("drb_services server config parameter: #{name}=#{value}")
    end
    for name, value in @config.drb_client_config
      logger.info("drb_services client config parameter: #{name}=#{value}")
    end
    logger.info("drb_services engine parameter: bulk_response_count=#{@config.bulk_response_count}")
    logger.info("drb_services engine parameter: bulk_response_size=#{@config.bulk_response_size}")
    logger.info("drb_services engine parameter: read_lock_timeout_seconds=#{@config.read_lock_timeout_seconds}")
    logger.info("drb_services engine parameter: write_lock_timeout_seconds=#{@config.write_lock_timeout_seconds}")
    logger.info("drb_services engine parameter: cleanup_write_lock_timeout_seconds=#{@config.cleanup_write_lock_timeout_seconds}")
    kvs_meta_log.call
    kvs_text_log.call
    logger.info("authentication parameter: hostname=#{auth.hostname}")
    logger.info("authorization parameter: mail_delivery_user=#{@config.mail_delivery_user}")

    logger.info('dRuby services: start server.')
    drb_services.start_server
  }
  server.at_fork{
    logger.info('dRuby services: detach server.')
    drb_services.detach_server
  }
  server.at_stop{|stop_state|
    case (stop_state)
    when :graceful
      logger.info('autologout immediately.')
      conn_limits.command_wait_timeout_seconds = 0
    when :forced
      logger.info('forced shutdown.')
    end
  }
  server.at_stat{|info|
    logger.info("stat: #{info.to_json}")
  }
  server.preprocess{
    logger.info('dRuby services: start client.')
    drb_services.start_client
    auth.start_plug_in(logger)
  }
  server.dispatch{|socket|
    begin
      begin
        begin
          begin
            remote_address = socket.remote_address # the place where the remote socket is most likely not closed is here
            logger.info("accept connection: #{remote_address.inspect_sockaddr}")
            if (ssl_context) then
              ssl_socket = OpenSSL::SSL::SSLSocket.new(socket, ssl_context)
              logger.info("start tls: #{ssl_socket.state}")
              ssl_socket.accept
              logger.info("accept tls: #{ssl_socket.state}")
              ssl_socket.sync = true
              stream = Riser::WriteBufferStream.new(ssl_socket, @config.send_buffer_limit_size)
            else
              stream = Riser::WriteBufferStream.new(socket, @config.send_buffer_limit_size)
            end
            stream = Riser::LoggingStream.new(stream, protocol_logger)
            decoder = Protocol::Decoder.new_decoder(drb_services, auth, logger,
                                                    mail_delivery_user: @config.mail_delivery_user,
                                                    line_length_limit: @config.protocol_line_length_limit,
                                                    literal_size_limit: @config.protocol_literal_size_limit,
                                                    command_size_limit: @config.protocol_command_size_limit)
            Protocol::Decoder.repl(decoder, conn_limits, stream, stream, logger)
          ensure
            if (stream) then
              stream.flush
            end
          end
        ensure
          if (ssl_socket) then
            ssl_socket.close
            logger.info("close tls: #{ssl_socket.state}")
          end
        end
      ensure
        socket.close
        if (remote_address) then
          logger.info("close connection: #{remote_address.inspect_sockaddr}")
        else
          logger.info('close connection.')
        end
      end
    rescue
      logger.error('interrupt connection with unexpected error.')
      Error.trace_error_chain($!) do |exception|
        logger.error(exception)
      end
    end
  }
  server.postprocess{
    auth.stop_plug_in(logger)
  }
  server.after_stop{
    logger.info('dRuby services: stop server.')
    drb_services.stop_server
    logger.info('stop server.')
  }

  nil
end