module Fluent::ServerModule
Public Instance Methods
after_run()
click to toggle source
# File lib/fluent/supervisor.rb, line 62 def after_run stop_rpc_server if @rpc_endpoint Fluent::Supervisor.cleanup_resources end
before_run()
click to toggle source
# File lib/fluent/supervisor.rb, line 42 def before_run @start_time = Time.now if config[:rpc_endpoint] @rpc_endpoint = config[:rpc_endpoint] @enable_get_dump = config[:enable_get_dump] run_rpc_server end install_supervisor_signal_handlers if config[:signame] @signame = config[:signame] install_windows_event_handler end socket_manager_path = ServerEngine::SocketManager::Server.generate_path ServerEngine::SocketManager::Server.open(socket_manager_path) ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = socket_manager_path.to_s end
install_supervisor_signal_handlers()
click to toggle source
# File lib/fluent/supervisor.rb, line 129 def install_supervisor_signal_handlers trap :HUP do $log.debug "fluentd supervisor process get SIGHUP" supervisor_sighup_handler end unless Fluent.windows? trap :USR1 do $log.debug "fluentd supervisor process get SIGUSR1" supervisor_sigusr1_handler end unless Fluent.windows? end
install_windows_event_handler()
click to toggle source
# File lib/fluent/supervisor.rb, line 141 def install_windows_event_handler Thread.new do ev = Win32::Event.new(@signame) begin ev.reset until WaitForSingleObject(ev.handle, 0) == WAIT_OBJECT_0 sleep 1 end kill_worker stop(true) ensure ev.close end end end
kill_worker()
click to toggle source
# File lib/fluent/supervisor.rb, line 172 def kill_worker if pid = config[:worker_pid] if Fluent.windows? Process.kill :KILL, pid else Process.kill :TERM, pid end end end
run_rpc_server()
click to toggle source
# File lib/fluent/supervisor.rb, line 67 def run_rpc_server @rpc_server = RPC::Server.new(@rpc_endpoint, $log) # built-in RPC for signals @rpc_server.mount_proc('/api/processes.interruptWorkers') { |req, res| $log.debug "fluentd RPC got /api/processes.interruptWorkers request" Process.kill :INT, $$ nil } @rpc_server.mount_proc('/api/processes.killWorkers') { |req, res| $log.debug "fluentd RPC got /api/processes.killWorkers request" Process.kill :TERM, $$ nil } @rpc_server.mount_proc('/api/processes.flushBuffersAndKillWorkers') { |req, res| $log.debug "fluentd RPC got /api/processes.flushBuffersAndKillWorkers request" if Fluent.windows? $log.warn "operation 'flushBuffersAndKillWorkers' is not supported on Windows now." else Process.kill :USR1, $$ Process.kill :TERM, $$ end nil } @rpc_server.mount_proc('/api/plugins.flushBuffers') { |req, res| $log.debug "fluentd RPC got /api/plugins.flushBuffers request" unless Fluent.windows? Process.kill :USR1, $$ end nil } @rpc_server.mount_proc('/api/config.reload') { |req, res| $log.debug "fluentd RPC got /api/config.reload request" if Fluent.windows? # restart worker with auto restarting by killing kill_worker else Process.kill :HUP, $$ end nil } @rpc_server.mount_proc('/api/config.dump') { |req, res| $log.debug "fluentd RPC got /api/config.dump request" $log.info "dump in-memory config" supervisor_dump_config_handler nil } @rpc_server.mount_proc('/api/config.getDump') { |req, res| $log.debug "fluentd RPC got /api/config.dump request" $log.info "get dump in-memory config via HTTP" res.body = supervisor_get_dump_config_handler [nil, nil, res] } if @enable_get_dump @rpc_server.start end
stop_rpc_server()
click to toggle source
# File lib/fluent/supervisor.rb, line 125 def stop_rpc_server @rpc_server.shutdown end
supervisor_dump_config_handler()
click to toggle source
# File lib/fluent/supervisor.rb, line 182 def supervisor_dump_config_handler $log.info config[:fluentd_conf].to_s end
supervisor_get_dump_config_handler()
click to toggle source
# File lib/fluent/supervisor.rb, line 186 def supervisor_get_dump_config_handler {conf: config[:fluentd_conf].to_s} end
supervisor_sighup_handler()
click to toggle source
# File lib/fluent/supervisor.rb, line 157 def supervisor_sighup_handler kill_worker end
supervisor_sigusr1_handler()
click to toggle source
# File lib/fluent/supervisor.rb, line 161 def supervisor_sigusr1_handler if log = config[:logger_initializer] log.reopen! end if pid = config[:worker_pid] Process.kill(:USR1, pid) # don't rescue Erro::ESRSH here (invalid status) end end