class Fluent::Supervisor

Constants

RUBY_ENCODING_OPTIONS_REGEX

Public Class Methods

cleanup_resources() click to toggle source
# File lib/fluent/supervisor.rb, line 502
def self.cleanup_resources
  unless Fluent.windows?
    if ENV.has_key?('SERVERENGINE_SOCKETMANAGER_PATH')
      FileUtils.rm_f(ENV['SERVERENGINE_SOCKETMANAGER_PATH'])
    end
  end
end
default_options() click to toggle source
# File lib/fluent/supervisor.rb, line 472
def self.default_options
  {
    config_path: Fluent::DEFAULT_CONFIG_PATH,
    plugin_dirs: [Fluent::DEFAULT_PLUGIN_DIR],
    log_level: Fluent::Log::LEVEL_INFO,
    log_path: nil,
    daemonize: nil,
    libs: [],
    setup_path: nil,
    chuser: nil,
    chgroup: nil,
    chumask: "0",
    root_dir: nil,
    suppress_interval: 0,
    suppress_repeated_stacktrace: true,
    ignore_repeated_log_interval: nil,
    without_source: nil,
    enable_input_metrics: nil,
    enable_size_metrics: nil,
    use_v1_config: true,
    strict_config_value: nil,
    supervise: true,
    standalone_worker: false,
    signame: nil,
    conf_encoding: 'utf-8',
    disable_shared_socket: nil,
    config_file_type: :guess,
  }
end
new(cl_opt) click to toggle source
# File lib/fluent/supervisor.rb, line 510
def initialize(cl_opt)
  @cl_opt = cl_opt
  opt = self.class.default_options.merge(cl_opt)

  @config_file_type = opt[:config_file_type]
  @daemonize = opt[:daemonize]
  @standalone_worker= opt[:standalone_worker]
  @config_path = opt[:config_path]
  @inline_config = opt[:inline_config]
  @use_v1_config = opt[:use_v1_config]
  @conf_encoding = opt[:conf_encoding]
  @log_path = opt[:log_path]
  @show_plugin_config = opt[:show_plugin_config]
  @libs = opt[:libs]
  @plugin_dirs = opt[:plugin_dirs]
  @chgroup = opt[:chgroup]
  @chuser = opt[:chuser]
  @chumask = opt[:chumask]
  @signame = opt[:signame]

  # TODO: `@log_rotate_age` and `@log_rotate_size` should be removed
  # since it should be merged with SystemConfig in `build_system_config()`.
  # We should always use `system_config.log.rotate_age` and `system_config.log.rotate_size`.
  # However, currently, there is a bug that `system_config.log` parameters
  # are not in `Fluent::SystemConfig::SYSTEM_CONFIG_PARAMETERS`, and these
  # parameters are not merged in `build_system_config()`.
  # Until we fix the bug of `Fluent::SystemConfig`, we need to use these instance variables.
  @log_rotate_age = opt[:log_rotate_age]
  @log_rotate_size = opt[:log_rotate_size]

  @finished = false
end
serverengine_config(params = {}) click to toggle source
# File lib/fluent/supervisor.rb, line 426
def self.serverengine_config(params = {})
  # ServerEngine's "daemonize" option is boolean, and path of pid file is brought by "pid_path"
  pid_path = params['daemonize']
  daemonize = !!params['daemonize']

  se_config = {
    worker_type: 'spawn',
    workers: params['workers'],
    log_stdin: false,
    log_stdout: false,
    log_stderr: false,
    enable_heartbeat: true,
    auto_heartbeat: false,
    unrecoverable_exit_codes: [2],
    stop_immediately_at_unrecoverable_exit: true,
    root_dir: params['root_dir'],
    logger: $log,
    log: $log.out,
    log_level: params['log_level'],
    chuser: params['chuser'],
    chgroup: params['chgroup'],
    chumask: params['chumask'],
    daemonize: daemonize,
    rpc_endpoint: params['rpc_endpoint'],
    counter_server: params['counter_server'],
    enable_get_dump: params['enable_get_dump'],
    windows_daemon_cmdline: [ServerEngine.ruby_bin_path,
                             File.join(File.dirname(__FILE__), 'daemon.rb'),
                             ServerModule.name,
                             WorkerModule.name,
                             JSON.dump(params)],
    command_sender: Fluent.windows? ? "pipe" : "signal",
    config_path: params['fluentd_conf_path'],
    fluentd_conf: params['fluentd_conf'],
    conf_encoding: params['conf_encoding'],
    inline_config: params['inline_config'],
    main_cmd: params['main_cmd'],
    signame: params['signame'],
    disable_shared_socket: params['disable_shared_socket'],
    restart_worker_interval: params['restart_worker_interval'],
  }
  se_config[:pid_path] = pid_path if daemonize

  se_config
end

Public Instance Methods

configure(supervisor: false) click to toggle source
# File lib/fluent/supervisor.rb, line 623
def configure(supervisor: false)
  setup_global_logger(supervisor: supervisor)

  if @show_plugin_config
    show_plugin_config
  end

  if @inline_config == '-'
    $log.warn('the value "-" for `inline_config` is deprecated. See https://github.com/fluent/fluentd/issues/2711')
    @inline_config = STDIN.read
  end
  @conf = Fluent::Config.build(
    config_path: @config_path,
    encoding: @conf_encoding,
    additional_config: @inline_config,
    use_v1_config: @use_v1_config,
    type: @config_file_type,
  )
  @system_config = build_system_config(@conf)

  $log.info :supervisor, 'parsing config file is succeeded', path: @config_path

  @libs.each do |lib|
    require lib
  end

  @plugin_dirs.each do |dir|
    if Dir.exist?(dir)
      dir = File.expand_path(dir)
      Fluent::Plugin.add_plugin_dir(dir)
    end
  end

  if supervisor
    # plugins / configuration dumps
    Gem::Specification.find_all.select { |x| x.name =~ /^fluent(d|-(plugin|mixin)-.*)$/ }.each do |spec|
      $log.info("gem '#{spec.name}' version '#{spec.version}'")
    end
  end
end
options() click to toggle source
# File lib/fluent/supervisor.rb, line 586
def options
  {
    'config_path' => @config_path,
    'pid_file' => @daemonize,
    'plugin_dirs' => @plugin_dirs,
    'log_path' => @log_path,
    'root_dir' => @system_config.root_dir,
  }
end
run_supervisor(dry_run: false) click to toggle source
# File lib/fluent/supervisor.rb, line 543
def run_supervisor(dry_run: false)
  if dry_run
    $log.info "starting fluentd-#{Fluent::VERSION} as dry run mode", ruby: RUBY_VERSION
  end

  if @system_config.workers < 1
    raise Fluent::ConfigError, "invalid number of workers (must be > 0):#{@system_config.workers}"
  end

  root_dir = @system_config.root_dir
  if root_dir
    if File.exist?(root_dir)
      unless Dir.exist?(root_dir)
        raise Fluent::InvalidRootDirectory, "non directory entry exists:#{root_dir}"
      end
    else
      begin
        FileUtils.mkdir_p(root_dir, mode: @system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION)
      rescue => e
        raise Fluent::InvalidRootDirectory, "failed to create root directory:#{root_dir}, #{e.inspect}"
      end
    end
  end

  begin
    ServerEngine::Privilege.change(@chuser, @chgroup)
    MessagePackFactory.init(enable_time_support: @system_config.enable_msgpack_time_support)
    Fluent::Engine.init(@system_config, supervisor_mode: true)
    Fluent::Engine.run_configure(@conf, dry_run: dry_run)
  rescue Fluent::ConfigError => e
    $log.error 'config error', file: @config_path, error: e
    $log.debug_backtrace
    exit!(1)
  end

  if dry_run
    $log.info 'finished dry run mode'
    exit 0
  else
    supervise
  end
end
run_worker() click to toggle source
# File lib/fluent/supervisor.rb, line 596
def run_worker
  Process.setproctitle("worker:#{@system_config.process_name}") if @process_name

  if @standalone_worker && @system_config.workers != 1
    raise Fluent::ConfigError, "invalid number of workers (must be 1 or unspecified) with --no-supervisor: #{@system_config.workers}"
  end

  install_main_process_signal_handlers

  # This is the only log messsage for @standalone_worker
  $log.info "starting fluentd-#{Fluent::VERSION} without supervision", pid: Process.pid, ruby: RUBY_VERSION if @standalone_worker

  main_process do
    create_socket_manager if @standalone_worker
    if @standalone_worker
      ServerEngine::Privilege.change(@chuser, @chgroup)
      File.umask(@chumask.to_i(8))
    end
    MessagePackFactory.init(enable_time_support: @system_config.enable_msgpack_time_support)
    Fluent::Engine.init(@system_config)
    Fluent::Engine.run_configure(@conf)
    Fluent::Engine.run
    self.class.cleanup_resources if @standalone_worker
    exit 0
  end
end

Private Instance Methods

build_spawn_command() click to toggle source
# File lib/fluent/supervisor.rb, line 1015
def build_spawn_command
  if ENV['TEST_RUBY_PATH']
    fluentd_spawn_cmd = [ENV['TEST_RUBY_PATH']]
  else
    fluentd_spawn_cmd = [ServerEngine.ruby_bin_path]
  end

  rubyopt = ENV['RUBYOPT']
  if rubyopt
    encodes, others = rubyopt.split(' ').partition { |e| e.match?(RUBY_ENCODING_OPTIONS_REGEX) }
    fluentd_spawn_cmd.concat(others)

    adopted_encodes = encodes.empty? ? ['-Eascii-8bit:ascii-8bit'] : encodes
    fluentd_spawn_cmd.concat(adopted_encodes)
  else
    fluentd_spawn_cmd << '-Eascii-8bit:ascii-8bit'
  end

  if @system_config.enable_jit
    $log.info "enable Ruby JIT for workers (--jit)"
    fluentd_spawn_cmd << '--jit'
  end

  # Adding `-h` so that it can avoid ruby's command blocking
  # e.g. `ruby -Eascii-8bit:ascii-8bit` will block. but `ruby -Eascii-8bit:ascii-8bit -h` won't.
  _, e, s = Open3.capture3(*fluentd_spawn_cmd, "-h")
  if s.exitstatus != 0
    $log.error('Invalid option is passed to RUBYOPT', command: fluentd_spawn_cmd, error: e)
    exit s.exitstatus
  end

  fluentd_spawn_cmd << $0
  fluentd_spawn_cmd += $fluentdargv
  fluentd_spawn_cmd << '--under-supervisor'

  fluentd_spawn_cmd
end
build_system_config(conf) click to toggle source
# File lib/fluent/supervisor.rb, line 995
def build_system_config(conf)
  system_config = SystemConfig.create(conf, @cl_opt[:strict_config_value])
  # Prefer the options explicitly specified in the command line
  #
  # TODO: There is a bug that `system_config.log.rotate_age/rotate_size` are
  # not merged with the command line options since they are not in
  # `SYSTEM_CONFIG_PARAMETERS`.
  # We have to fix this bug.
  opt = {}
  Fluent::SystemConfig::SYSTEM_CONFIG_PARAMETERS.each do |param|
    if @cl_opt.key?(param) && !@cl_opt[param].nil?
      opt[param] = @cl_opt[param]
    end
  end
  system_config.overwrite_variables(**opt)
  system_config
end
create_socket_manager() click to toggle source
# File lib/fluent/supervisor.rb, line 750
def create_socket_manager
  server = ServerEngine::SocketManager::Server.open
  ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = server.path.to_s
end
dump_non_windows() click to toggle source
# File lib/fluent/supervisor.rb, line 921
def dump_non_windows
  begin
    Sigdump.dump unless @finished
  rescue => e
    $log.error("failed to dump: #{e}")
  end
end
dump_windows() click to toggle source
# File lib/fluent/supervisor.rb, line 929
def dump_windows
  Thread.new do
    begin
      FluentSigdump.dump_windows
    rescue => e
      $log.error("failed to dump: #{e}")
    end
  end
end
flush_buffer() click to toggle source
# File lib/fluent/supervisor.rb, line 878
def flush_buffer
  # Creating new thread due to mutex can't lock
  # in main thread during trap context
  Thread.new do
    begin
      $log.debug "fluentd main process get SIGUSR1"
      $log.info "force flushing buffered events"
      $log.reopen!
      Fluent::Engine.flush!
      $log.debug "flushing thread: flushed"
    rescue Exception => e
      $log.warn "flushing thread error: #{e}"
    end
  end
end
install_main_process_command_handlers() click to toggle source
# File lib/fluent/supervisor.rb, line 844
def install_main_process_command_handlers
  command_pipe = $stdin.dup
  $stdin.reopen(File::NULL, "rb")
  command_pipe.binmode
  command_pipe.sync = true

  Thread.new do
    loop do
      cmd = command_pipe.gets
      break unless cmd

      case cmd.chomp!
      when "GRACEFUL_STOP", "IMMEDIATE_STOP"
        $log.debug "fluentd main process get #{cmd} command"
        @finished = true
        $log.debug "getting start to shutdown main process"
        Fluent::Engine.stop
        break
      when "GRACEFUL_RESTART"
        $log.debug "fluentd main process get #{cmd} command"
        flush_buffer
      when "RELOAD"
        $log.debug "fluentd main process get #{cmd} command"
        reload_config
      when "DUMP"
        $log.debug "fluentd main process get #{cmd} command"
        dump_windows
      else
        $log.warn "fluentd main process get unknown command [#{cmd}]"
      end
    end
  end
end
install_main_process_signal_handlers() click to toggle source
# File lib/fluent/supervisor.rb, line 799
def install_main_process_signal_handlers
  # Fluentd worker process (worker of ServerEngine) don't use code in serverengine to set signal handlers,
  # because it does almost nothing.
  # This method is the only method to set signal handlers in Fluentd worker process.

  # When user use Ctrl + C not SIGINT, SIGINT is sent to all process in same process group.
  # ServerEngine server process will send SIGTERM to child(spawned) processes by that SIGINT, so
  # worker process SHOULD NOT do anything with SIGINT, SHOULD just ignore.
  trap :INT do
    $log.debug "fluentd main process get SIGINT"

    # When Fluentd is launched without supervisor, worker should handle ctrl-c by itself
    if @standalone_worker
      @finished = true
      $log.debug "getting start to shutdown main process"
      Fluent::Engine.stop
    end
  end

  trap :TERM do
    $log.debug "fluentd main process get SIGTERM"
    unless @finished
      @finished = true
      $log.debug "getting start to shutdown main process"
      Fluent::Engine.stop
    end
  end

  if Fluent.windows?
    install_main_process_command_handlers
  else
    trap :USR1 do
      flush_buffer
    end

    trap :USR2 do
      reload_config
    end

    trap :CONT do
      dump_non_windows
    end
  end
end
logging_with_console_output() { |$log| ... } click to toggle source
# File lib/fluent/supervisor.rb, line 939
def logging_with_console_output
  yield $log
  unless $log.stdout?
    logger = ServerEngine::DaemonLogger.new(STDOUT)
    log = Fluent::Log.new(logger)
    log.level = @system_config.log_level
    console = log.enable_debug
    yield console
  end
end
main_process(&block) click to toggle source
# File lib/fluent/supervisor.rb, line 950
def main_process(&block)
  if @system_config.process_name
    if @system_config.workers > 1
      Process.setproctitle("worker:#{@system_config.process_name}#{ENV['SERVERENGINE_WORKER_ID']}")
    else
      Process.setproctitle("worker:#{@system_config.process_name}")
    end
  end

  unrecoverable_error = false

  begin
    block.call
  rescue Fluent::ConfigError => e
    logging_with_console_output do |log|
      log.error "config error", file: @config_path, error: e
      log.debug_backtrace
    end
    unrecoverable_error = true
  rescue Fluent::UnrecoverableError => e
    logging_with_console_output do |log|
      log.error e.message, error: e
      log.error_backtrace
    end
    unrecoverable_error = true
  rescue ScriptError => e # LoadError, NotImplementedError, SyntaxError
    logging_with_console_output do |log|
      if e.respond_to?(:path)
        log.error e.message, path: e.path, error: e
      else
        log.error e.message, error: e
      end
      log.error_backtrace
    end
    unrecoverable_error = true
  rescue => e
    logging_with_console_output do |log|
      log.error "unexpected error", error: e
      log.error_backtrace
    end
  end

  exit!(unrecoverable_error ? 2 : 1)
end
reload_config() click to toggle source
# File lib/fluent/supervisor.rb, line 894
def reload_config
  Thread.new do
    $log.debug('worker got SIGUSR2')

    begin
      conf = Fluent::Config.build(
        config_path: @config_path,
        encoding: @conf_encoding,
        additional_config: @inline_config,
        use_v1_config: @use_v1_config,
        type: @config_file_type,
      )

      Fluent::VariableStore.try_to_reset do
        Fluent::Engine.reload_config(conf)
      end
    rescue => e
      # it is guaranteed that config file is valid by supervisor side. but it's not atomic because of using signals to commnicate between worker and super
      # So need this rescue code
      $log.error("failed to reload config: #{e}")
      next
    end

    @conf = conf
  end
end
setup_global_logger(supervisor: false) click to toggle source
# File lib/fluent/supervisor.rb, line 666
def setup_global_logger(supervisor: false)
  if supervisor
    worker_id = 0
    process_type = :supervisor
  else
    worker_id = ENV['SERVERENGINE_WORKER_ID'].to_i
    process_type = case
                   when @standalone_worker then :standalone
                   when worker_id == 0 then :worker0
                   else :workers
                   end
  end

  # Parse configuration immediately to initialize logger in early stage.
  # Since we can't confirm the log messages in this parsing process,
  # we must parse the config again after initializing logger.
  conf = Fluent::Config.build(
    config_path: @config_path,
    encoding: @conf_encoding,
    additional_config: @inline_config,
    use_v1_config: @use_v1_config,
    type: @config_file_type,
  )
  system_config = build_system_config(conf)

  # TODO: we should remove this logic. This merging process should be done
  # in `build_system_config()`.
  @log_rotate_age ||= system_config.log.rotate_age
  @log_rotate_size ||= system_config.log.rotate_size

  rotate = @log_rotate_age || @log_rotate_size
  actual_log_path = @log_path

  # We need to prepare a unique path for each worker since Windows locks files.
  if Fluent.windows? && rotate && @log_path && @log_path != "-"
    actual_log_path = Fluent::Log.per_process_path(@log_path, process_type, worker_id)
  end

  if actual_log_path && actual_log_path != "-"
    FileUtils.mkdir_p(File.dirname(actual_log_path)) unless File.exist?(actual_log_path)
    if rotate
      logdev = Fluent::LogDeviceIO.new(
        actual_log_path,
        shift_age: @log_rotate_age,
        shift_size: @log_rotate_size,
      )
    else
      logdev = File.open(actual_log_path, "a")
    end

    if @chuser || @chgroup
      chuid = @chuser ? ServerEngine::Privilege.get_etc_passwd(@chuser).uid : nil
      chgid = @chgroup ? ServerEngine::Privilege.get_etc_group(@chgroup).gid : nil
      File.chown(chuid, chgid, actual_log_path)
    end

    if system_config.dir_permission
      File.chmod(system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION, File.dirname(actual_log_path))
    end
  else
    logdev = STDOUT
  end

  $log = Fluent::Log.new(
    # log_level: subtract 1 to match serverengine daemon logger side logging severity.
    ServerEngine::DaemonLogger.new(logdev, log_level: system_config.log_level - 1),
    path: actual_log_path,
    process_type: process_type,
    worker_id: worker_id,
    format: system_config.log.format,
    time_format: system_config.log.time_format,
    suppress_repeated_stacktrace: system_config.suppress_repeated_stacktrace,
    ignore_repeated_log_interval: system_config.ignore_repeated_log_interval,
    ignore_same_log_interval: system_config.ignore_same_log_interval,
  )
  $log.enable_color(false) if actual_log_path
  $log.enable_debug if system_config.log_level <= Fluent::Log::LEVEL_DEBUG

  $log.info "init #{process_type} logger",
            path: actual_log_path, 
            rotate_age: @log_rotate_age,
            rotate_size: @log_rotate_size
end
show_plugin_config() click to toggle source
# File lib/fluent/supervisor.rb, line 755
def show_plugin_config
  name, type = @show_plugin_config.split(":") # input:tail
  $log.info "show_plugin_config option is deprecated. Use fluent-plugin-config-format --format=txt #{name} #{type}"
  exit 0
end
supervise() click to toggle source
# File lib/fluent/supervisor.rb, line 761
def supervise
  Process.setproctitle("supervisor:#{@system_config.process_name}") if @system_config.process_name
  $log.info "starting fluentd-#{Fluent::VERSION}", pid: Process.pid, ruby: RUBY_VERSION

  fluentd_spawn_cmd = build_spawn_command
  $log.info "spawn command to main: ", cmdline: fluentd_spawn_cmd

  params = {
    'main_cmd' => fluentd_spawn_cmd,
    'daemonize' => @daemonize,
    'inline_config' => @inline_config,
    'chuser' => @chuser,
    'chgroup' => @chgroup,
    'fluentd_conf_path' => @config_path,
    'fluentd_conf' => @conf.to_s,
    'use_v1_config' => @use_v1_config,
    'conf_encoding' => @conf_encoding,
    'signame' => @signame,

    'workers' => @system_config.workers,
    'root_dir' => @system_config.root_dir,
    'log_level' => @system_config.log_level,
    'rpc_endpoint' => @system_config.rpc_endpoint,
    'enable_get_dump' => @system_config.enable_get_dump,
    'counter_server' => @system_config.counter_server,
    'disable_shared_socket' => @system_config.disable_shared_socket,
    'restart_worker_interval' => @system_config.restart_worker_interval,
  }

  se = ServerEngine.create(ServerModule, WorkerModule) {
    # Note: This is called only at the initialization of ServerEngine, since
    # Fluentd overwrites all related SIGNAL(HUP,USR1,USR2) and have own reloading feature.
    Fluent::Supervisor.serverengine_config(params)
  }

  se.run
end