class RIMS::Service
Constants
- DEFAULT_CONFIG
Public Class Methods
new(config)
click to toggle source
# File lib/rims/service.rb, line 820 def initialize(config) @config = config end
Public Instance Methods
setup(server, daemon: false)
click to toggle source
# File lib/rims/service.rb, line 826 def setup(server, daemon: false) *file_logger_params, file_logger_opts = @config.make_file_logger_params logger = Logger.new(*file_logger_params, **file_logger_opts) *stdout_logger_params, stdout_logger_opts = @config.make_stdout_logger_params unless (daemon && @config.daemonize?) then logger += Logger.new(*stdout_logger_params, **stdout_logger_opts) end *protocol_logger_params, protocol_logger_opts = @config.make_protocol_logger_params protocol_logger = Logger.new(*protocol_logger_params, **protocol_logger_opts) logger.info('preload libraries.') Riser.preload Riser.preload(RIMS) Riser.preload(RIMS::Protocol) @config.require_features logger.info('setup server.') server.accept_polling_timeout_seconds = @config.accept_polling_timeout_seconds server.process_num = @config.process_num server.process_queue_size = @config.process_queue_size server.process_queue_polling_timeout_seconds = @config.process_queue_polling_timeout_seconds server.process_send_io_polling_timeout_seconds = @config.process_send_io_polling_timeout_seconds server.thread_num = @config.thread_num server.thread_queue_size = @config.thread_queue_size server.thread_queue_polling_timeout_seconds = @config.thread_queue_polling_timeout_seconds ssl_context = @config.ssl_context conn_limits = @config.connection_limits make_kvs_factory = lambda{|kvs_params, kvs_type| kvs_factory = kvs_params.build_factory return lambda{|mailbox_data_structure_version, unique_user_id, db_name| kvs_path = @config.make_key_value_store_path(mailbox_data_structure_version, unique_user_id) unless (kvs_path.directory?) then logger.debug("make a directory: #{kvs_path}") if logger.debug? kvs_path.mkpath end db_path = kvs_path + db_name logger.debug("#{kvs_type} data key-value sotre path: #{db_path}") if logger.debug? kvs_factory.call(db_path.to_s) }, lambda{ logger.info("#{kvs_type} key-value store parameter: type=#{kvs_params.origin_type}") logger.info("#{kvs_type} key-value store parameter: config=#{kvs_params.origin_config.to_json}") kvs_params.middleware_list.each_with_index do |middleware, i| logger.info("#{kvs_type} key-value store parameter: middleware[#{i}]=#{middleware}") end } } kvs_meta_open, kvs_meta_log = make_kvs_factory.call(@config.make_meta_key_value_store_params, 'meta') kvs_text_open, kvs_text_log = make_kvs_factory.call(@config.make_text_key_value_store_params, 'text') auth = @config.make_authentication drb_process_num = @config.drb_process_num if (@config.process_num > 0) then unless (drb_process_num > 0) then drb_process_num = 1 logger.warn('the number of dRuby processes needs to be 1 or more when the number of server processes is 1 or more.') logger.warn("drb_services parameter: changed process_num: #{@config.drb_process_num} -> #{drb_process_num}") end end drb_services = Riser::DRbServices.new(drb_process_num, server_config: @config.drb_server_config, client_config: @config.drb_client_config) drb_services.add_sticky_process_service(:engine, Riser::ResourceSet.build{|builder| builder.at_create{|unique_user_id| mail_store = MailStore.build(unique_user_id, kvs_meta_open, kvs_text_open) Protocol::Decoder::Engine.new(unique_user_id, mail_store, logger, bulk_response_count: @config.bulk_response_count, bulk_response_size: @config.bulk_response_size, read_lock_timeout_seconds: @config.read_lock_timeout_seconds, write_lock_timeout_seconds: @config.write_lock_timeout_seconds, cleanup_write_lock_timeout_seconds: @config.cleanup_write_lock_timeout_seconds, charset_aliases: @config.charset_aliases, charset_convert_options: @config.charset_convert_options) } builder.at_destroy{|engine| engine.destroy } builder.alias_unref(:destroy) }) server.before_start{|server_socket| logger.info('start server.') for feature in @config.get_required_features logger.info("required feature: #{feature}") end logger.info("file logging parameter: path=#{file_logger_params[0]}") file_logger_params[1..-2].each_with_index do |value, i| logger.info("file logging parameter: shift_args[#{i}]=#{value}") end for name, value in file_logger_opts logger.info("file logging parameter: #{name}=#{value}") end for name, value in stdout_logger_opts logger.info("stdout logging parameter: #{name}=#{value}") end logger.info("protocol logging parameter: path=#{protocol_logger_params[0]}") protocol_logger_params[1..-2].each_with_index do |value, i| logger.info("protocol logging parameter: shift_args[#{i}]=#{value}") end for name, value in protocol_logger_opts logger.info("protocol logging parameter: #{name}=#{value}") end logger.info("listen address: #{server_socket.local_address.inspect_sockaddr}") privileged_user = Etc.getpwuid(Process.euid).name rescue '' logger.info("server privileged user: #{privileged_user}(#{Process.euid})") privileged_group = Etc.getgrgid(Process.egid).name rescue '' logger.info("server privileged group: #{privileged_group}(#{Process.egid})") logger.info("server parameter: accept_polling_timeout_seconds=#{server.accept_polling_timeout_seconds}") logger.info("server parameter: process_num=#{server.process_num}") logger.info("server parameter: process_queue_size=#{server.process_queue_size}") logger.info("server parameter: process_queue_polling_timeout_seconds=#{server.process_queue_polling_timeout_seconds}") logger.info("server parameter: process_send_io_polling_timeout_seconds=#{server.process_send_io_polling_timeout_seconds}") logger.info("server parameter: thread_num=#{server.thread_num}") logger.info("server parameter: thread_queue_size=#{server.thread_queue_size}") logger.info("server parameter: thread_queue_polling_timeout_seconds=#{server.thread_queue_polling_timeout_seconds}") if (ssl_context) then Array(ssl_context.alpn_protocols).each_with_index do |protocol, i| logger.info("openssl parameter: alpn_protocols[#{i}]=#{protocol}") end logger.info("openssl parameter: alpn_select_cb=#{ssl_context.alpn_select_cb.inspect}") if ssl_context.alpn_select_cb logger.info("openssl parameter: ca_file=#{ssl_context.ca_file}") if ssl_context.ca_file logger.info("openssl parameter: ca_path=#{ssl_context.ca_path}") if ssl_context.ca_path if (ssl_context.cert) then ssl_context.cert.to_text.each_line do |line| logger.info("openssl parameter: [cert] #{line.chomp}") end else logger.warn('openssl parameter: not defined cert attribute.') end logger.info("openssl parameter: cert_store=#{ssl_context.cert_store.inspect}") if ssl_context.cert_store Array(ssl_context.ciphers).each_with_index do |cipher, i| logger.info("openssl parameter: ciphers[#{i}]=#{cipher.join(',')}") end Array(ssl_context.client_ca).each_with_index do |cert, i| cert.to_text.each_line do |line| logger.info("openssl parameter: client_ca[#{i}]: #{line.chomp}") end end logger.info("openssl parameter: client_cert_cb=#{ssl_context.client_cert_cb.inspect}") if ssl_context.client_cert_cb Array(ssl_context.extra_chain_cert).each_with_index do |cert, i| cert.to_text.each_line do |line| logger.info("openssl parameter: extra_chain_cert[#{i}]: #{line.chomp}") end end if (ssl_context.key) then logger.info("openssl parameter: key=#{ssl_context.key.inspect}") if (logger.debug?) then ssl_context.key.to_text.each_line do |line| logger.debug("openssl parameter: [key] #{line.chomp}") end end else logger.warn('openssl parameter: not defined key attribute.') end Array(ssl_context.npn_protocols).each_with_index do |protocol, i| logger.info("openssl parameter: npn_protocols[#{i}]=#{protocol}") end logger.info("openssl parameter: npn_select_cb=#{ssl_context.npn_select_cb.inspect}") if ssl_context.npn_select_cb logger.info("openssl parameter: options=0x#{'%08x' % ssl_context.options}") if ssl_context.options logger.info("openssl parameter: renegotiation_cb=#{ssl_context.renegotiation_cb.inspect}") if ssl_context.renegotiation_cb logger.info("openssl parameter: security_level=#{ssl_context.security_level}") logger.info("openssl parameter: servername_cb=#{ssl_context.servername_cb.inspect}") if ssl_context.servername_cb logger.info("openssl parameter: session_cache_mode=0x#{'%08x' % ssl_context.session_cache_mode}") logger.info("openssl parameter: session_cache_size=#{ssl_context.session_cache_size }") logger.info("openssl parameter: session_get_cb=#{ssl_context.session_get_cb.inspect}") if ssl_context.session_get_cb logger.info("openssl parameter: session_id_context=#{ssl_context.session_id_context}") if ssl_context.session_id_context logger.info("openssl parameter: session_new_cb=#{ssl_context.session_new_cb.inspect}") if ssl_context.session_new_cb logger.info("openssl parameter: session_remove_cb=#{ssl_context.session_remove_cb}") if ssl_context.session_remove_cb logger.info("openssl parameter: ssl_timeout=#{ssl_context.ssl_timeout}") if ssl_context.ssl_timeout logger.info("openssl parameter: tmp_dh_callback=#{ssl_context.tmp_dh_callback}") if ssl_context.tmp_dh_callback logger.info("openssl parameter: verify_callback=#{ssl_context.verify_callback}") if ssl_context.verify_callback logger.info("openssl parameter: verify_depth=#{ssl_context.verify_depth}") if ssl_context.verify_depth logger.info("openssl parameter: verify_hostname=#{ssl_context.verify_hostname}") if ssl_context.verify_hostname logger.info("openssl parameter: verify_mode=0x#{'%08x' % ssl_context.verify_mode}") if ssl_context.verify_mode end logger.info("connection parameter: send_buffer_limit_size=#{@config.send_buffer_limit_size}") logger.info("connection parameter: read_polling_interval_seconds=#{conn_limits.read_polling_interval_seconds}") logger.info("connection parameter: command_wait_timeout_seconds=#{conn_limits.command_wait_timeout_seconds}") logger.info("protocol parameter: line_length_limit=#{@config.protocol_line_length_limit}") logger.info("protocol parameter: literal_size_limit=#{@config.protocol_literal_size_limit}") logger.info("protocol parameter: command_size_limit=#{@config.protocol_command_size_limit}") @config.charset_aliases.each_with_index do |(name, enc), i| logger.info("charset aliases parameter: alias[#{i}]: #{name} -> #{enc.name}") end for name, value in @config.charset_convert_options logger.info("charset convert_options parameter: #{name}=#{value}") end logger.info("drb_services parameter: process_num=#{drb_process_num}") for name, value in @config.drb_server_config logger.info("drb_services server config parameter: #{name}=#{value}") end for name, value in @config.drb_client_config logger.info("drb_services client config parameter: #{name}=#{value}") end logger.info("drb_services engine parameter: bulk_response_count=#{@config.bulk_response_count}") logger.info("drb_services engine parameter: bulk_response_size=#{@config.bulk_response_size}") logger.info("drb_services engine parameter: read_lock_timeout_seconds=#{@config.read_lock_timeout_seconds}") logger.info("drb_services engine parameter: write_lock_timeout_seconds=#{@config.write_lock_timeout_seconds}") logger.info("drb_services engine parameter: cleanup_write_lock_timeout_seconds=#{@config.cleanup_write_lock_timeout_seconds}") kvs_meta_log.call kvs_text_log.call logger.info("authentication parameter: hostname=#{auth.hostname}") logger.info("authorization parameter: mail_delivery_user=#{@config.mail_delivery_user}") logger.info('dRuby services: start server.') drb_services.start_server } server.at_fork{ logger.info('dRuby services: detach server.') drb_services.detach_server } server.at_stop{|stop_state| case (stop_state) when :graceful logger.info('autologout immediately.') conn_limits.command_wait_timeout_seconds = 0 when :forced logger.info('forced shutdown.') end } server.at_stat{|info| logger.info("stat: #{info.to_json}") } server.preprocess{ logger.info('dRuby services: start client.') drb_services.start_client auth.start_plug_in(logger) } server.dispatch{|socket| begin begin begin begin remote_address = socket.remote_address # the place where the remote socket is most likely not closed is here logger.info("accept connection: #{remote_address.inspect_sockaddr}") if (ssl_context) then ssl_socket = OpenSSL::SSL::SSLSocket.new(socket, ssl_context) logger.info("start tls: #{ssl_socket.state}") ssl_socket.accept logger.info("accept tls: #{ssl_socket.state}") ssl_socket.sync = true stream = Riser::WriteBufferStream.new(ssl_socket, @config.send_buffer_limit_size) else stream = Riser::WriteBufferStream.new(socket, @config.send_buffer_limit_size) end stream = Riser::LoggingStream.new(stream, protocol_logger) decoder = Protocol::Decoder.new_decoder(drb_services, auth, logger, mail_delivery_user: @config.mail_delivery_user, line_length_limit: @config.protocol_line_length_limit, literal_size_limit: @config.protocol_literal_size_limit, command_size_limit: @config.protocol_command_size_limit) Protocol::Decoder.repl(decoder, conn_limits, stream, stream, logger) ensure if (stream) then stream.flush end end ensure if (ssl_socket) then ssl_socket.close logger.info("close tls: #{ssl_socket.state}") end end ensure socket.close if (remote_address) then logger.info("close connection: #{remote_address.inspect_sockaddr}") else logger.info('close connection.') end end rescue logger.error('interrupt connection with unexpected error.') Error.trace_error_chain($!) do |exception| logger.error(exception) end end } server.postprocess{ auth.stop_plug_in(logger) } server.after_stop{ logger.info('dRuby services: stop server.') drb_services.stop_server logger.info('stop server.') } nil end