module Fluent::ServerModule

Public Instance Methods

after_run() click to toggle source
# File lib/fluent/supervisor.rb, line 74
def after_run
  stop_windows_event_thread if Fluent.windows?
  stop_rpc_server if @rpc_endpoint
  stop_counter_server if @counter
  cleanup_lock_dir
  Fluent::Supervisor.cleanup_resources
end
before_run() click to toggle source
# File lib/fluent/supervisor.rb, line 41
def before_run
  @fluentd_conf = config[:fluentd_conf]
  @rpc_endpoint = nil
  @rpc_server = nil
  @counter = nil

  @fluentd_lock_dir = Dir.mktmpdir("fluentd-lock-")
  ENV['FLUENTD_LOCK_DIR'] = @fluentd_lock_dir

  if config[:rpc_endpoint]
    @rpc_endpoint = config[:rpc_endpoint]
    @enable_get_dump = config[:enable_get_dump]
    run_rpc_server
  end

  if Fluent.windows?
    install_windows_event_handler
  else
    install_supervisor_signal_handlers
  end

  if counter = config[:counter_server]
    run_counter_server(counter)
  end

  if config[:disable_shared_socket]
    $log.info "shared socket for multiple workers is disabled"
  else
    server = ServerEngine::SocketManager::Server.open
    ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = server.path.to_s
  end
end
cleanup_lock_dir() click to toggle source
# File lib/fluent/supervisor.rb, line 82
def cleanup_lock_dir
  FileUtils.rm(Dir.glob(File.join(@fluentd_lock_dir, "fluentd-*.lock")))
  FileUtils.rmdir(@fluentd_lock_dir)
end
dump() click to toggle source
Calls superclass method
# File lib/fluent/supervisor.rb, line 357
def dump
  super unless @stop
end
install_supervisor_signal_handlers() click to toggle source
# File lib/fluent/supervisor.rb, line 175
def install_supervisor_signal_handlers
  return if Fluent.windows?

  trap :HUP do
    $log.debug "fluentd supervisor process get SIGHUP"
    supervisor_sighup_handler
  end

  trap :USR1 do
    $log.debug "fluentd supervisor process get SIGUSR1"
    supervisor_sigusr1_handler
  end

  trap :USR2 do
    $log.debug 'fluentd supervisor process got SIGUSR2'
    supervisor_sigusr2_handler
  end
end
install_windows_event_handler() click to toggle source
# File lib/fluent/supervisor.rb, line 212
def install_windows_event_handler
  return unless Fluent.windows?

  @pid_signame = "fluentd_#{Process.pid}"
  @signame = config[:signame]

  Thread.new do
    ipc = Win32::Ipc.new(nil)
    events = [
      {win32_event: Win32::Event.new("#{@pid_signame}_STOP_EVENT_THREAD"), action: :stop_event_thread},
      {win32_event: Win32::Event.new("#{@pid_signame}"), action: :stop},
      {win32_event: Win32::Event.new("#{@pid_signame}_HUP"), action: :hup},
      {win32_event: Win32::Event.new("#{@pid_signame}_USR1"), action: :usr1},
      {win32_event: Win32::Event.new("#{@pid_signame}_USR2"), action: :usr2},
      {win32_event: Win32::Event.new("#{@pid_signame}_CONT"), action: :cont},
    ]
    if @signame
      signame_events = [
        {win32_event: Win32::Event.new("#{@signame}"), action: :stop},
        {win32_event: Win32::Event.new("#{@signame}_HUP"), action: :hup},
        {win32_event: Win32::Event.new("#{@signame}_USR1"), action: :usr1},
        {win32_event: Win32::Event.new("#{@signame}_USR2"), action: :usr2},
        {win32_event: Win32::Event.new("#{@signame}_CONT"), action: :cont},
      ]
      events.concat(signame_events)
    end
    begin
      loop do
        infinite = 0xFFFFFFFF
        ipc_idx = ipc.wait_any(events.map {|e| e[:win32_event]}, infinite)
        event_idx = ipc_idx - 1

        if event_idx >= 0 && event_idx < events.length
          $log.debug("Got Win32 event \"#{events[event_idx][:win32_event].name}\"")
        else
          $log.warn("Unexpected return value of Win32::Ipc#wait_any: #{ipc_idx}")
        end
        case events[event_idx][:action]
        when :stop
          stop(true)
        when :hup
          supervisor_sighup_handler
        when :usr1
          supervisor_sigusr1_handler
        when :usr2
          supervisor_sigusr2_handler
        when :cont
          supervisor_dump_handler_for_windows
        when :stop_event_thread
          break
        end
      end
    ensure
      events.each { |event| event[:win32_event].close }
    end
  end
end
kill_worker() click to toggle source
# File lib/fluent/supervisor.rb, line 335
def kill_worker
  if config[:worker_pid]
    pids = config[:worker_pid].clone
    config[:worker_pid].clear
    pids.each_value do |pid|
      if Fluent.windows?
        Process.kill :KILL, pid
      else
        Process.kill :TERM, pid
      end
    end
  end
end
reload() click to toggle source
# File lib/fluent/supervisor.rb, line 205
def reload
  @monitors.each do |m|
    m.send_command("RELOAD\n")
  end
end
restart(graceful) click to toggle source

Override some methods of ServerEngine::MultiSpawnWorker Since Fluentd’s Supervisor doesn’t use ServerEngine’s HUP, USR1 and USR2 handlers (see install_supervisor_signal_handlers), they should be disabled also on Windows, just send commands to workers instead.

# File lib/fluent/supervisor.rb, line 199
def restart(graceful)
  @monitors.each do |m|
    m.send_command(graceful ? "GRACEFUL_RESTART\n" : "IMMEDIATE_RESTART\n")
  end
end
run_counter_server(counter_conf) click to toggle source
# File lib/fluent/supervisor.rb, line 163
def run_counter_server(counter_conf)
  @counter = Fluent::Counter::Server.new(
    counter_conf.scope,
    {host: counter_conf.bind, port: counter_conf.port, log: $log, path: counter_conf.backup_path}
  )
  @counter.start
end
run_rpc_server() click to toggle source
# File lib/fluent/supervisor.rb, line 87
def run_rpc_server
  @rpc_server = RPC::Server.new(@rpc_endpoint, $log)

  # built-in RPC for signals
  @rpc_server.mount_proc('/api/processes.interruptWorkers') { |req, res|
    $log.debug "fluentd RPC got /api/processes.interruptWorkers request"
    Process.kill :INT, Process.pid
    nil
  }
  @rpc_server.mount_proc('/api/processes.killWorkers') { |req, res|
    $log.debug "fluentd RPC got /api/processes.killWorkers request"
    Process.kill :TERM, Process.pid
    nil
  }
  @rpc_server.mount_proc('/api/processes.flushBuffersAndKillWorkers') { |req, res|
    $log.debug "fluentd RPC got /api/processes.flushBuffersAndKillWorkers request"
    if Fluent.windows?
      supervisor_sigusr1_handler
      stop(true)
    else
      Process.kill :USR1, Process.pid
      Process.kill :TERM, Process.pid
    end
    nil
  }
  @rpc_server.mount_proc('/api/plugins.flushBuffers') { |req, res|
    $log.debug "fluentd RPC got /api/plugins.flushBuffers request"
    if Fluent.windows?
      supervisor_sigusr1_handler
    else
      Process.kill :USR1, Process.pid
    end
    nil
  }
  @rpc_server.mount_proc('/api/config.reload') { |req, res|
    $log.debug "fluentd RPC got /api/config.reload request"
    if Fluent.windows?
      # restart worker with auto restarting by killing
      kill_worker
    else
      Process.kill :HUP, Process.pid
    end
    nil
  }
  @rpc_server.mount_proc('/api/config.dump') { |req, res|
    $log.debug "fluentd RPC got /api/config.dump request"
    $log.info "dump in-memory config"
    supervisor_dump_config_handler
    nil
  }

  @rpc_server.mount_proc('/api/config.gracefulReload') { |req, res|
    $log.debug "fluentd RPC got /api/config.gracefulReload request"
    if Fluent.windows?
      supervisor_sigusr2_handler
    else
      Process.kill :USR2, Process.pid
    end

    nil
  }

  @rpc_server.mount_proc('/api/config.getDump') { |req, res|
    $log.debug "fluentd RPC got /api/config.getDump request"
    $log.info "get dump in-memory config via HTTP"
    res.body = supervisor_get_dump_config_handler
    [nil, nil, res]
  } if @enable_get_dump

  @rpc_server.start
end
stop_counter_server() click to toggle source
# File lib/fluent/supervisor.rb, line 171
def stop_counter_server
  @counter.stop
end
stop_rpc_server() click to toggle source
# File lib/fluent/supervisor.rb, line 159
def stop_rpc_server
  @rpc_server.shutdown
end
stop_windows_event_thread() click to toggle source
# File lib/fluent/supervisor.rb, line 270
def stop_windows_event_thread
  if Fluent.windows?
    ev = Win32::Event.open("#{@pid_signame}_STOP_EVENT_THREAD")
    ev.set
    ev.close
  end
end
supervisor_dump_config_handler() click to toggle source
# File lib/fluent/supervisor.rb, line 349
def supervisor_dump_config_handler
  $log.info @fluentd_conf
end
supervisor_dump_handler_for_windows() click to toggle source
# File lib/fluent/supervisor.rb, line 315
def supervisor_dump_handler_for_windows
  # As for UNIX-like, SIGCONT signal to each process makes the process output its dump-file,
  # and it is implemented before the implementation of the function for Windows.
  # It is possible to trap SIGCONT and handle it here also on UNIX-like,
  # but for backward compatibility, this handler is currently for a Windows-only.
  raise "[BUG] This function is for Windows ONLY." unless Fluent.windows?

  Thread.new do
    begin
      FluentSigdump.dump_windows
    rescue => e
      $log.error "failed to dump: #{e}"
    end
  end

  send_signal_to_workers(:CONT)
rescue => e
  $log.error "failed to dump: #{e}"
end
supervisor_get_dump_config_handler() click to toggle source
# File lib/fluent/supervisor.rb, line 353
def supervisor_get_dump_config_handler
  { conf: @fluentd_conf }
end
supervisor_sighup_handler() click to toggle source
# File lib/fluent/supervisor.rb, line 278
def supervisor_sighup_handler
  kill_worker
end
supervisor_sigusr1_handler() click to toggle source
# File lib/fluent/supervisor.rb, line 282
def supervisor_sigusr1_handler
  reopen_log
  send_signal_to_workers(:USR1)
end
supervisor_sigusr2_handler() click to toggle source
# File lib/fluent/supervisor.rb, line 287
def supervisor_sigusr2_handler
  conf = nil
  t = Thread.new do
    $log.info 'Reloading new config'

    # Validate that loading config is valid at first
    conf = Fluent::Config.build(
      config_path: config[:config_path],
      encoding: config[:conf_encoding],
      additional_config: config[:inline_config],
      use_v1_config: config[:use_v1_config],
    )

    Fluent::VariableStore.try_to_reset do
      Fluent::Engine.reload_config(conf, supervisor: true)
    end
  end

  t.report_on_exception = false # Error is handled by myself
  t.join

  reopen_log
  send_signal_to_workers(:USR2)
  @fluentd_conf = conf.to_s
rescue => e
  $log.error "Failed to reload config file: #{e}"
end

Private Instance Methods

dump_all_windows_workers() click to toggle source
# File lib/fluent/supervisor.rb, line 400
def dump_all_windows_workers
  @monitors.each do |m|
    m.send_command("DUMP\n")
  end
end
reopen_log() click to toggle source
# File lib/fluent/supervisor.rb, line 363
def reopen_log
  if $log
    # Creating new thread due to mutex can't lock
    # in main thread during trap context
    Thread.new do
      $log.reopen!
    end
  end
end
send_command_to_workers(signal) click to toggle source
# File lib/fluent/supervisor.rb, line 386
def send_command_to_workers(signal)
  # Use SeverEngine's CommandSender on Windows
  case signal
  when :HUP
    restart(false)
  when :USR1
    restart(true)
  when :USR2
    reload
  when :CONT
    dump_all_windows_workers
  end
end
send_signal_to_workers(signal) click to toggle source
# File lib/fluent/supervisor.rb, line 373
def send_signal_to_workers(signal)
  return unless config[:worker_pid]

  if Fluent.windows?
    send_command_to_workers(signal)
  else
    config[:worker_pid].each_value do |pid|
      # don't rescue Errno::ESRCH here (invalid status)
      Process.kill(signal, pid)
    end
  end
end