module Fluent::ServerModule
Public Instance Methods
after_run()
click to toggle source
# File lib/fluent/supervisor.rb, line 74 def after_run stop_windows_event_thread if Fluent.windows? stop_rpc_server if @rpc_endpoint stop_counter_server if @counter cleanup_lock_dir Fluent::Supervisor.cleanup_resources end
before_run()
click to toggle source
# File lib/fluent/supervisor.rb, line 41 def before_run @fluentd_conf = config[:fluentd_conf] @rpc_endpoint = nil @rpc_server = nil @counter = nil @fluentd_lock_dir = Dir.mktmpdir("fluentd-lock-") ENV['FLUENTD_LOCK_DIR'] = @fluentd_lock_dir if config[:rpc_endpoint] @rpc_endpoint = config[:rpc_endpoint] @enable_get_dump = config[:enable_get_dump] run_rpc_server end if Fluent.windows? install_windows_event_handler else install_supervisor_signal_handlers end if counter = config[:counter_server] run_counter_server(counter) end if config[:disable_shared_socket] $log.info "shared socket for multiple workers is disabled" else server = ServerEngine::SocketManager::Server.open ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = server.path.to_s end end
cleanup_lock_dir()
click to toggle source
# File lib/fluent/supervisor.rb, line 82 def cleanup_lock_dir FileUtils.rm(Dir.glob(File.join(@fluentd_lock_dir, "fluentd-*.lock"))) FileUtils.rmdir(@fluentd_lock_dir) end
dump()
click to toggle source
Calls superclass method
# File lib/fluent/supervisor.rb, line 357 def dump super unless @stop end
install_supervisor_signal_handlers()
click to toggle source
# File lib/fluent/supervisor.rb, line 175 def install_supervisor_signal_handlers return if Fluent.windows? trap :HUP do $log.debug "fluentd supervisor process get SIGHUP" supervisor_sighup_handler end trap :USR1 do $log.debug "fluentd supervisor process get SIGUSR1" supervisor_sigusr1_handler end trap :USR2 do $log.debug 'fluentd supervisor process got SIGUSR2' supervisor_sigusr2_handler end end
install_windows_event_handler()
click to toggle source
# File lib/fluent/supervisor.rb, line 212 def install_windows_event_handler return unless Fluent.windows? @pid_signame = "fluentd_#{Process.pid}" @signame = config[:signame] Thread.new do ipc = Win32::Ipc.new(nil) events = [ {win32_event: Win32::Event.new("#{@pid_signame}_STOP_EVENT_THREAD"), action: :stop_event_thread}, {win32_event: Win32::Event.new("#{@pid_signame}"), action: :stop}, {win32_event: Win32::Event.new("#{@pid_signame}_HUP"), action: :hup}, {win32_event: Win32::Event.new("#{@pid_signame}_USR1"), action: :usr1}, {win32_event: Win32::Event.new("#{@pid_signame}_USR2"), action: :usr2}, {win32_event: Win32::Event.new("#{@pid_signame}_CONT"), action: :cont}, ] if @signame signame_events = [ {win32_event: Win32::Event.new("#{@signame}"), action: :stop}, {win32_event: Win32::Event.new("#{@signame}_HUP"), action: :hup}, {win32_event: Win32::Event.new("#{@signame}_USR1"), action: :usr1}, {win32_event: Win32::Event.new("#{@signame}_USR2"), action: :usr2}, {win32_event: Win32::Event.new("#{@signame}_CONT"), action: :cont}, ] events.concat(signame_events) end begin loop do infinite = 0xFFFFFFFF ipc_idx = ipc.wait_any(events.map {|e| e[:win32_event]}, infinite) event_idx = ipc_idx - 1 if event_idx >= 0 && event_idx < events.length $log.debug("Got Win32 event \"#{events[event_idx][:win32_event].name}\"") else $log.warn("Unexpected return value of Win32::Ipc#wait_any: #{ipc_idx}") end case events[event_idx][:action] when :stop stop(true) when :hup supervisor_sighup_handler when :usr1 supervisor_sigusr1_handler when :usr2 supervisor_sigusr2_handler when :cont supervisor_dump_handler_for_windows when :stop_event_thread break end end ensure events.each { |event| event[:win32_event].close } end end end
kill_worker()
click to toggle source
# File lib/fluent/supervisor.rb, line 335 def kill_worker if config[:worker_pid] pids = config[:worker_pid].clone config[:worker_pid].clear pids.each_value do |pid| if Fluent.windows? Process.kill :KILL, pid else Process.kill :TERM, pid end end end end
reload()
click to toggle source
# File lib/fluent/supervisor.rb, line 205 def reload @monitors.each do |m| m.send_command("RELOAD\n") end end
restart(graceful)
click to toggle source
Override some methods of ServerEngine::MultiSpawnWorker Since Fluentd’s Supervisor
doesn’t use ServerEngine’s HUP, USR1 and USR2 handlers (see install_supervisor_signal_handlers
), they should be disabled also on Windows, just send commands to workers instead.
# File lib/fluent/supervisor.rb, line 199 def restart(graceful) @monitors.each do |m| m.send_command(graceful ? "GRACEFUL_RESTART\n" : "IMMEDIATE_RESTART\n") end end
run_counter_server(counter_conf)
click to toggle source
# File lib/fluent/supervisor.rb, line 163 def run_counter_server(counter_conf) @counter = Fluent::Counter::Server.new( counter_conf.scope, {host: counter_conf.bind, port: counter_conf.port, log: $log, path: counter_conf.backup_path} ) @counter.start end
run_rpc_server()
click to toggle source
# File lib/fluent/supervisor.rb, line 87 def run_rpc_server @rpc_server = RPC::Server.new(@rpc_endpoint, $log) # built-in RPC for signals @rpc_server.mount_proc('/api/processes.interruptWorkers') { |req, res| $log.debug "fluentd RPC got /api/processes.interruptWorkers request" Process.kill :INT, Process.pid nil } @rpc_server.mount_proc('/api/processes.killWorkers') { |req, res| $log.debug "fluentd RPC got /api/processes.killWorkers request" Process.kill :TERM, Process.pid nil } @rpc_server.mount_proc('/api/processes.flushBuffersAndKillWorkers') { |req, res| $log.debug "fluentd RPC got /api/processes.flushBuffersAndKillWorkers request" if Fluent.windows? supervisor_sigusr1_handler stop(true) else Process.kill :USR1, Process.pid Process.kill :TERM, Process.pid end nil } @rpc_server.mount_proc('/api/plugins.flushBuffers') { |req, res| $log.debug "fluentd RPC got /api/plugins.flushBuffers request" if Fluent.windows? supervisor_sigusr1_handler else Process.kill :USR1, Process.pid end nil } @rpc_server.mount_proc('/api/config.reload') { |req, res| $log.debug "fluentd RPC got /api/config.reload request" if Fluent.windows? # restart worker with auto restarting by killing kill_worker else Process.kill :HUP, Process.pid end nil } @rpc_server.mount_proc('/api/config.dump') { |req, res| $log.debug "fluentd RPC got /api/config.dump request" $log.info "dump in-memory config" supervisor_dump_config_handler nil } @rpc_server.mount_proc('/api/config.gracefulReload') { |req, res| $log.debug "fluentd RPC got /api/config.gracefulReload request" if Fluent.windows? supervisor_sigusr2_handler else Process.kill :USR2, Process.pid end nil } @rpc_server.mount_proc('/api/config.getDump') { |req, res| $log.debug "fluentd RPC got /api/config.getDump request" $log.info "get dump in-memory config via HTTP" res.body = supervisor_get_dump_config_handler [nil, nil, res] } if @enable_get_dump @rpc_server.start end
stop_counter_server()
click to toggle source
# File lib/fluent/supervisor.rb, line 171 def stop_counter_server @counter.stop end
stop_rpc_server()
click to toggle source
# File lib/fluent/supervisor.rb, line 159 def stop_rpc_server @rpc_server.shutdown end
stop_windows_event_thread()
click to toggle source
# File lib/fluent/supervisor.rb, line 270 def stop_windows_event_thread if Fluent.windows? ev = Win32::Event.open("#{@pid_signame}_STOP_EVENT_THREAD") ev.set ev.close end end
supervisor_dump_config_handler()
click to toggle source
# File lib/fluent/supervisor.rb, line 349 def supervisor_dump_config_handler $log.info @fluentd_conf end
supervisor_dump_handler_for_windows()
click to toggle source
# File lib/fluent/supervisor.rb, line 315 def supervisor_dump_handler_for_windows # As for UNIX-like, SIGCONT signal to each process makes the process output its dump-file, # and it is implemented before the implementation of the function for Windows. # It is possible to trap SIGCONT and handle it here also on UNIX-like, # but for backward compatibility, this handler is currently for a Windows-only. raise "[BUG] This function is for Windows ONLY." unless Fluent.windows? Thread.new do begin FluentSigdump.dump_windows rescue => e $log.error "failed to dump: #{e}" end end send_signal_to_workers(:CONT) rescue => e $log.error "failed to dump: #{e}" end
supervisor_get_dump_config_handler()
click to toggle source
# File lib/fluent/supervisor.rb, line 353 def supervisor_get_dump_config_handler { conf: @fluentd_conf } end
supervisor_sighup_handler()
click to toggle source
# File lib/fluent/supervisor.rb, line 278 def supervisor_sighup_handler kill_worker end
supervisor_sigusr1_handler()
click to toggle source
# File lib/fluent/supervisor.rb, line 282 def supervisor_sigusr1_handler reopen_log send_signal_to_workers(:USR1) end
supervisor_sigusr2_handler()
click to toggle source
# File lib/fluent/supervisor.rb, line 287 def supervisor_sigusr2_handler conf = nil t = Thread.new do $log.info 'Reloading new config' # Validate that loading config is valid at first conf = Fluent::Config.build( config_path: config[:config_path], encoding: config[:conf_encoding], additional_config: config[:inline_config], use_v1_config: config[:use_v1_config], ) Fluent::VariableStore.try_to_reset do Fluent::Engine.reload_config(conf, supervisor: true) end end t.report_on_exception = false # Error is handled by myself t.join reopen_log send_signal_to_workers(:USR2) @fluentd_conf = conf.to_s rescue => e $log.error "Failed to reload config file: #{e}" end
Private Instance Methods
dump_all_windows_workers()
click to toggle source
# File lib/fluent/supervisor.rb, line 400 def dump_all_windows_workers @monitors.each do |m| m.send_command("DUMP\n") end end
reopen_log()
click to toggle source
# File lib/fluent/supervisor.rb, line 363 def reopen_log if $log # Creating new thread due to mutex can't lock # in main thread during trap context Thread.new do $log.reopen! end end end
send_command_to_workers(signal)
click to toggle source
# File lib/fluent/supervisor.rb, line 386 def send_command_to_workers(signal) # Use SeverEngine's CommandSender on Windows case signal when :HUP restart(false) when :USR1 restart(true) when :USR2 reload when :CONT dump_all_windows_workers end end
send_signal_to_workers(signal)
click to toggle source
# File lib/fluent/supervisor.rb, line 373 def send_signal_to_workers(signal) return unless config[:worker_pid] if Fluent.windows? send_command_to_workers(signal) else config[:worker_pid].each_value do |pid| # don't rescue Errno::ESRCH here (invalid status) Process.kill(signal, pid) end end end