module Fluent::ServerModule

Public Instance Methods

after_run() click to toggle source
# File lib/fluent/supervisor.rb, line 62
def after_run
  stop_rpc_server if @rpc_endpoint
  Fluent::Supervisor.cleanup_resources
end
before_run() click to toggle source
# File lib/fluent/supervisor.rb, line 42
def before_run
  @start_time = Time.now

  if config[:rpc_endpoint]
    @rpc_endpoint = config[:rpc_endpoint]
    @enable_get_dump = config[:enable_get_dump]
    run_rpc_server
  end
  install_supervisor_signal_handlers

  if config[:signame]
    @signame = config[:signame]
    install_windows_event_handler
  end

  socket_manager_path = ServerEngine::SocketManager::Server.generate_path
  ServerEngine::SocketManager::Server.open(socket_manager_path)
  ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = socket_manager_path.to_s
end
install_supervisor_signal_handlers() click to toggle source
# File lib/fluent/supervisor.rb, line 129
def install_supervisor_signal_handlers
  trap :HUP do
    $log.debug "fluentd supervisor process get SIGHUP"
    supervisor_sighup_handler
  end unless Fluent.windows?

  trap :USR1 do
    $log.debug "fluentd supervisor process get SIGUSR1"
    supervisor_sigusr1_handler
  end unless Fluent.windows?
end
install_windows_event_handler() click to toggle source
# File lib/fluent/supervisor.rb, line 141
def install_windows_event_handler
  Thread.new do
    ev = Win32::Event.new(@signame)
    begin
      ev.reset
      until WaitForSingleObject(ev.handle, 0) == WAIT_OBJECT_0
        sleep 1
      end
      kill_worker
      stop(true)
    ensure
      ev.close
    end
  end
end
kill_worker() click to toggle source
# File lib/fluent/supervisor.rb, line 172
def kill_worker
  if pid = config[:worker_pid]
    if Fluent.windows?
      Process.kill :KILL, pid
    else
      Process.kill :TERM, pid
    end
  end
end
run_rpc_server() click to toggle source
# File lib/fluent/supervisor.rb, line 67
def run_rpc_server
  @rpc_server = RPC::Server.new(@rpc_endpoint, $log)

  # built-in RPC for signals
  @rpc_server.mount_proc('/api/processes.interruptWorkers') { |req, res|
    $log.debug "fluentd RPC got /api/processes.interruptWorkers request"
    Process.kill :INT, $$
    nil
  }
  @rpc_server.mount_proc('/api/processes.killWorkers') { |req, res|
    $log.debug "fluentd RPC got /api/processes.killWorkers request"
    Process.kill :TERM, $$
    nil
  }
  @rpc_server.mount_proc('/api/processes.flushBuffersAndKillWorkers') { |req, res|
    $log.debug "fluentd RPC got /api/processes.flushBuffersAndKillWorkers request"
    if Fluent.windows?
      $log.warn "operation 'flushBuffersAndKillWorkers' is not supported on Windows now."
    else
      Process.kill :USR1, $$
      Process.kill :TERM, $$
    end
    nil
  }
  @rpc_server.mount_proc('/api/plugins.flushBuffers') { |req, res|
    $log.debug "fluentd RPC got /api/plugins.flushBuffers request"
    unless Fluent.windows?
      Process.kill :USR1, $$
    end
    nil
  }
  @rpc_server.mount_proc('/api/config.reload') { |req, res|
    $log.debug "fluentd RPC got /api/config.reload request"
    if Fluent.windows?
      # restart worker with auto restarting by killing
      kill_worker
    else
      Process.kill :HUP, $$
    end
    nil
  }
  @rpc_server.mount_proc('/api/config.dump') { |req, res|
    $log.debug "fluentd RPC got /api/config.dump request"
    $log.info "dump in-memory config"
    supervisor_dump_config_handler
    nil
  }

  @rpc_server.mount_proc('/api/config.getDump') { |req, res|
    $log.debug "fluentd RPC got /api/config.dump request"
    $log.info "get dump in-memory config via HTTP"
    res.body = supervisor_get_dump_config_handler
    [nil, nil, res]
  } if @enable_get_dump

  @rpc_server.start
end
stop_rpc_server() click to toggle source
# File lib/fluent/supervisor.rb, line 125
def stop_rpc_server
  @rpc_server.shutdown
end
supervisor_dump_config_handler() click to toggle source
# File lib/fluent/supervisor.rb, line 182
def supervisor_dump_config_handler
  $log.info config[:fluentd_conf].to_s
end
supervisor_get_dump_config_handler() click to toggle source
# File lib/fluent/supervisor.rb, line 186
def supervisor_get_dump_config_handler
  {conf: config[:fluentd_conf].to_s}
end
supervisor_sighup_handler() click to toggle source
# File lib/fluent/supervisor.rb, line 157
def supervisor_sighup_handler
  kill_worker
end
supervisor_sigusr1_handler() click to toggle source
# File lib/fluent/supervisor.rb, line 161
def supervisor_sigusr1_handler
  if log = config[:logger_initializer]
    log.reopen!
  end

  if pid = config[:worker_pid]
    Process.kill(:USR1, pid)
    # don't rescue Erro::ESRSH here (invalid status)
  end
end