class Fluent::Supervisor
Public Class Methods
cleanup_resources()
click to toggle source
# File lib/fluent/supervisor.rb, line 391 def self.cleanup_resources unless Fluent.windows? if ENV.has_key?('SERVERENGINE_SOCKETMANAGER_PATH') FileUtils.rm_f(ENV['SERVERENGINE_SOCKETMANAGER_PATH']) end end end
default_options()
click to toggle source
# File lib/fluent/supervisor.rb, line 368 def self.default_options { config_path: Fluent::DEFAULT_CONFIG_PATH, plugin_dirs: [Fluent::DEFAULT_PLUGIN_DIR], log_level: Fluent::Log::LEVEL_INFO, log_path: nil, daemonize: nil, libs: [], setup_path: nil, chuser: nil, chgroup: nil, root_dir: nil, suppress_interval: 0, suppress_repeated_stacktrace: true, without_source: nil, use_v1_config: true, supervise: true, standalone_worker: false, signame: nil, winsvcreg: nil, } end
load_config(path, params = {})
click to toggle source
# File lib/fluent/supervisor.rb, line 206 def self.load_config(path, params = {}) pre_loadtime = 0 pre_loadtime = params['pre_loadtime'].to_i if params['pre_loadtime'] pre_config_mtime = nil pre_config_mtime = params['pre_config_mtime'] if params['pre_config_mtime'] config_mtime = File.mtime(path) # reuse previous config if last load time is within 5 seconds and mtime of the config file is not changed if Time.now - Time.at(pre_loadtime) < 5 and config_mtime == pre_config_mtime return params['pre_conf'] end config_fname = File.basename(path) config_basedir = File.dirname(path) config_data = File.read(path) inline_config = params['inline_config'] if inline_config == '-' config_data << "\n" << STDIN.read elsif inline_config config_data << "\n" << inline_config.gsub("\\n","\n") end fluentd_conf = Fluent::Config.parse(config_data, config_fname, config_basedir, params['use_v1_config']) system_config = SystemConfig.create(fluentd_conf) # these params must NOT be configured via system config here. # these may be overridden by command line params. workers = params['workers'] root_dir = params['root_dir'] log_level = params['log_level'] suppress_repeated_stacktrace = params['suppress_repeated_stacktrace'] log_path = params['log_path'] chuser = params['chuser'] chgroup = params['chgroup'] log_rotate_age = params['log_rotate_age'] log_rotate_size = params['log_rotate_size'] rpc_endpoint = system_config.rpc_endpoint enable_get_dump = system_config.enable_get_dump log_opts = {suppress_repeated_stacktrace: suppress_repeated_stacktrace} logger_initializer = Supervisor::LoggerInitializer.new( log_path, log_level, chuser, chgroup, log_opts, log_rotate_age: log_rotate_age, log_rotate_size: log_rotate_size ) # this #init sets initialized logger to $log logger_initializer.init(:supervisor, 0) logger = $log command_sender = Fluent.windows? ? "pipe" : "signal" # ServerEngine's "daemonize" option is boolean, and path of pid file is brought by "pid_path" pid_path = params['daemonize'] daemonize = !!params['daemonize'] main_cmd = params['main_cmd'] signame = params['signame'] se_config = { worker_type: 'spawn', workers: workers, log_stdin: false, log_stdout: false, log_stderr: false, enable_heartbeat: true, auto_heartbeat: false, unrecoverable_exit_codes: [2], stop_immediately_at_unrecoverable_exit: true, root_dir: root_dir, logger: logger, log: logger.out, log_path: log_path, log_level: log_level, logger_initializer: logger_initializer, chuser: chuser, chgroup: chgroup, chumask: 0, suppress_repeated_stacktrace: suppress_repeated_stacktrace, daemonize: daemonize, rpc_endpoint: rpc_endpoint, enable_get_dump: enable_get_dump, windows_daemon_cmdline: [ServerEngine.ruby_bin_path, File.join(File.dirname(__FILE__), 'daemon.rb'), ServerModule.name, WorkerModule.name, path, JSON.dump(params)], command_sender: command_sender, fluentd_conf: fluentd_conf, main_cmd: main_cmd, signame: signame, } if daemonize se_config[:pid_path] = pid_path end pre_params = params.dup params['pre_loadtime'] = Time.now.to_i params['pre_config_mtime'] = config_mtime params['pre_conf'] = se_config # prevent pre_conf from being too big by reloading many times. pre_params['pre_conf'] = nil params['pre_conf'][:windows_daemon_cmdline][5] = JSON.dump(pre_params) return se_config end
new(opt)
click to toggle source
# File lib/fluent/supervisor.rb, line 399 def initialize(opt) @daemonize = opt[:daemonize] @supervise = opt[:supervise] @standalone_worker= opt[:standalone_worker] @config_path = opt[:config_path] @inline_config = opt[:inline_config] @use_v1_config = opt[:use_v1_config] @log_path = opt[:log_path] @dry_run = opt[:dry_run] @show_plugin_config = opt[:show_plugin_config] @libs = opt[:libs] @plugin_dirs = opt[:plugin_dirs] @chgroup = opt[:chgroup] @chuser = opt[:chuser] @rpc_server = nil @process_name = nil @workers = opt[:workers] @root_dir = opt[:root_dir] @log_level = opt[:log_level] @log_rotate_age = opt[:log_rotate_age] @log_rotate_size = opt[:log_rotate_size] @suppress_interval = opt[:suppress_interval] @suppress_config_dump = opt[:suppress_config_dump] @log_event_verbose = opt[:log_event_verbose] @without_source = opt[:without_source] @signame = opt[:signame] @suppress_repeated_stacktrace = opt[:suppress_repeated_stacktrace] log_opts = {suppress_repeated_stacktrace: @suppress_repeated_stacktrace} @log = LoggerInitializer.new( @log_path, @log_level, @chuser, @chgroup, log_opts, log_rotate_age: @log_rotate_age, log_rotate_size: @log_rotate_size ) @finished = false end
Public Instance Methods
options()
click to toggle source
# File lib/fluent/supervisor.rb, line 465 def options { 'config_path' => @config_path, 'pid_file' => @daemonize, 'plugin_dirs' => @plugin_dirs, 'log_path' => @log_path, 'root_dir' => @root_dir, } end
run_supervisor()
click to toggle source
# File lib/fluent/supervisor.rb, line 437 def run_supervisor @log.init(:supervisor, 0) show_plugin_config if @show_plugin_config read_config set_system_config if @workers < 1 raise Fluent::ConfigError, "invalid number of workers (must be > 0):#{@workers}" end if @root_dir if File.exist?(@root_dir) unless Dir.exist?(@root_dir) raise Fluent::InvalidRootDirectory, "non directory entry exists:#{@root_dir}" end else begin FileUtils.mkdir_p(@root_dir) rescue => e raise Fluent::InvalidRootDirectory, "failed to create root directory:#{@root_dir}, #{e.inspect}" end end end dry_run if @dry_run supervise end
run_worker()
click to toggle source
# File lib/fluent/supervisor.rb, line 475 def run_worker begin require 'sigdump/setup' rescue Exception # ignore LoadError and others (related with signals): it may raise these errors in Windows end worker_id = ENV['SERVERENGINE_WORKER_ID'].to_i process_type = case when @standalone_worker then :standalone when worker_id == 0 then :worker0 else :workers end @log.init(process_type, worker_id) Process.setproctitle("worker:#{@process_name}") if @process_name show_plugin_config if @show_plugin_config read_config set_system_config install_main_process_signal_handlers # This is the only log messsage for @standalone_worker $log.info "starting fluentd-#{Fluent::VERSION} without supervision", pid: Process.pid if @standalone_worker main_process do create_socket_manager if @standalone_worker change_privilege init_engine run_configure run_engine self.class.cleanup_resources if @standalone_worker exit 0 end end
Private Instance Methods
change_privilege()
click to toggle source
# File lib/fluent/supervisor.rb, line 714 def change_privilege ServerEngine::Privilege.change(@chuser, @chgroup) end
create_socket_manager()
click to toggle source
# File lib/fluent/supervisor.rb, line 512 def create_socket_manager socket_manager_path = ServerEngine::SocketManager::Server.generate_path ServerEngine::SocketManager::Server.open(socket_manager_path) ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = socket_manager_path.to_s end
dry_run()
click to toggle source
# File lib/fluent/supervisor.rb, line 518 def dry_run $log.info "starting fluentd-#{Fluent::VERSION} as dry run mode" change_privilege init_engine run_configure exit 0 rescue => e $log.error "dry run failed: #{e}" exit 1 end
flush_buffer()
click to toggle source
# File lib/fluent/supervisor.rb, line 628 def flush_buffer $log.debug "fluentd main process get SIGUSR1" $log.info "force flushing buffered events" @log.reopen! # Creating new thread due to mutex can't lock # in main thread during trap context Thread.new { begin Fluent::Engine.flush! $log.debug "flushing thread: flushed" rescue Exception => e $log.warn "flushing thread error: #{e}" end }.run end
init_engine()
click to toggle source
# File lib/fluent/supervisor.rb, line 718 def init_engine Fluent::Engine.init(@system_config) @libs.each {|lib| require lib } @plugin_dirs.each {|dir| if Dir.exist?(dir) dir = File.expand_path(dir) Fluent::Engine.add_plugin_dir(dir) end } end
install_main_process_signal_handlers()
click to toggle source
# File lib/fluent/supervisor.rb, line 572 def install_main_process_signal_handlers # Fluentd worker process (worker of ServerEngine) don't use code in serverengine to set signal handlers, # because it does almost nothing. # This method is the only method to set signal handlers in Fluentd worker process. # When user use Ctrl + C not SIGINT, SIGINT is sent to all process in same process group. # ServerEngine server process will send SIGTERM to child(spawned) processes by that SIGINT, so # worker process SHOULD NOT do anything with SIGINT, SHOULD just ignore. trap :INT do $log.debug "fluentd main process get SIGINT" # When Fluentd is launched without supervisor, worker should handle ctrl-c by itself if @standalone_worker @finished = true $log.debug "getting start to shutdown main process" Fluent::Engine.stop end end trap :TERM do $log.debug "fluentd main process get SIGTERM" unless @finished @finished = true $log.debug "getting start to shutdown main process" Fluent::Engine.stop end end trap :USR1 do flush_buffer end unless Fluent.windows? if Fluent.windows? command_pipe = STDIN.dup STDIN.reopen(File::NULL, "rb") command_pipe.binmode command_pipe.sync = true Thread.new do loop do cmd = command_pipe.gets.chomp case cmd when "GRACEFUL_STOP", "IMMEDIATE_STOP" $log.debug "fluentd main process get #{cmd} command" @finished = true $log.debug "getting start to shutdown main process" Fluent::Engine.stop break else $log.warn "fluentd main process get unknown command [#{cmd}]" end end end end end
logging_with_console_output() { |$log| ... }
click to toggle source
# File lib/fluent/supervisor.rb, line 645 def logging_with_console_output yield $log unless @log.stdout? logger = ServerEngine::DaemonLogger.new(STDOUT) log = Fluent::Log.new(logger) log.level = @log_level console = log.enable_debug yield console end end
main_process(&block)
click to toggle source
# File lib/fluent/supervisor.rb, line 656 def main_process(&block) Process.setproctitle("worker:#{@process_name}") if @process_name unrecoverable_error = false begin block.call rescue Fluent::ConfigError => e logging_with_console_output do |log| log.error "config error", file: @config_path, error: e log.debug_backtrace end unrecoverable_error = true rescue Fluent::UnrecoverableError => e logging_with_console_output do |log| log.error e.message, error: e log.error_backtrace end unrecoverable_error = true rescue ScriptError => e # LoadError, NotImplementedError, SyntaxError logging_with_console_output do |log| if e.respond_to?(:path) log.error e.message, path: e.path, error: e else log.error e.message, error: e end log.error_backtrace end unrecoverable_error = true rescue => e logging_with_console_output do |log| log.error "unexpected error", error: e log.error_backtrace end end exit!(unrecoverable_error ? 2 : 1) end
read_config()
click to toggle source
# File lib/fluent/supervisor.rb, line 695 def read_config $log.info :supervisor, "reading config file", path: @config_path @config_fname = File.basename(@config_path) @config_basedir = File.dirname(@config_path) @config_data = File.read(@config_path) if @inline_config == '-' @config_data << "\n" << STDIN.read elsif @inline_config @config_data << "\n" << @inline_config.gsub("\\n","\n") end @conf = Fluent::Config.parse(@config_data, @config_fname, @config_basedir, @use_v1_config) end
run_configure()
click to toggle source
# File lib/fluent/supervisor.rb, line 733 def run_configure Fluent::Engine.run_configure(@conf) end
run_engine()
click to toggle source
# File lib/fluent/supervisor.rb, line 737 def run_engine Fluent::Engine.run end
set_system_config()
click to toggle source
# File lib/fluent/supervisor.rb, line 708 def set_system_config @system_config = SystemConfig.create(@conf) # @conf is set in read_config @system_config.attach(self) @system_config.apply(self) end
show_plugin_config()
click to toggle source
# File lib/fluent/supervisor.rb, line 529 def show_plugin_config name, type = @show_plugin_config.split(":") # input:tail $log.info "Use fluent-plugin-config-format --format=txt #{name} #{type}" exit 0 end
supervise()
click to toggle source
# File lib/fluent/supervisor.rb, line 535 def supervise Process.setproctitle("supervisor:#{@process_name}") if @process_name $log.info "starting fluentd-#{Fluent::VERSION}", pid: Process.pid rubyopt = ENV["RUBYOPT"] fluentd_spawn_cmd = [ServerEngine.ruby_bin_path, "-Eascii-8bit:ascii-8bit"] fluentd_spawn_cmd << rubyopt if rubyopt fluentd_spawn_cmd << $0 fluentd_spawn_cmd += $fluentdargv fluentd_spawn_cmd << "--under-supervisor" $log.info "spawn command to main: ", cmdline: fluentd_spawn_cmd params = {} params['main_cmd'] = fluentd_spawn_cmd params['daemonize'] = @daemonize params['inline_config'] = @inline_config params['log_path'] = @log_path params['log_rotate_age'] = @log_rotate_age params['log_rotate_size'] = @log_rotate_size params['chuser'] = @chuser params['chgroup'] = @chgroup params['use_v1_config'] = @use_v1_config # system config parameters params['workers'] = @workers params['root_dir'] = @root_dir params['log_level'] = @log_level params['suppress_repeated_stacktrace'] = @suppress_repeated_stacktrace params['signame'] = @signame se = ServerEngine.create(ServerModule, WorkerModule){ Fluent::Supervisor.load_config(@config_path, params) } se.run end