class GrpcKit::Server

Public Class Methods

new(interceptors: [], shutdown_timeout: 30, min_pool_size: nil, max_pool_size: nil, settings: [], max_receive_message_size: nil, max_send_message_size: nil) click to toggle source

@param interceptors [Array<GrpcKit::Grpc::ServerInterceptor>] list of interceptors @param shutdown_timeout [Integer] Number of seconds to wait for the server shutdown @param min_pool_size [Integer] A mininum thread pool size @param max_pool_size [Integer] A maximum thread pool size @param max_receive_message_size [Integer, nil] Specify the maximum size of inbound message in bytes. Default to 4MB. @param max_send_message_size [Integer, nil] Specify the maximum size of outbound message in bytes. Default to 4MB.

# File lib/grpc_kit/server.rb, line 15
def initialize(interceptors: [], shutdown_timeout: 30, min_pool_size: nil, max_pool_size: nil, settings: [], max_receive_message_size: nil, max_send_message_size: nil)
  @interceptors = interceptors
  @shutdown_timeout = shutdown_timeout
  @min_pool_size = min_pool_size || GrpcKit::RpcDispatcher::DEFAULT_MIN
  @max_pool_size = max_pool_size || GrpcKit::RpcDispatcher::DEFAULT_MAX
  @max_receive_message_size = max_receive_message_size
  @max_send_message_size = max_send_message_size
  @sessions = []
  @rpc_descs = {}
  @mutex = Mutex.new
  @stopping = false
  @settings = settings

  GrpcKit.logger.debug("Launched grpc_kit(v#{GrpcKit::VERSION})")
end

Public Instance Methods

force_shutdown() click to toggle source

This method is expected to be called in trap context @return [void]

# File lib/grpc_kit/server.rb, line 66
def force_shutdown
  @stopping = true

  Thread.new {
    GrpcKit.logger.debug('force shutdown')
    shutdown_sessions
  }
end
graceful_shutdown(timeout: true) click to toggle source

This method is expected to be called in trap context @params timeout [Boolean] timeout error could be raised or not @return [void]

# File lib/grpc_kit/server.rb, line 78
def graceful_shutdown(timeout: true)
  @stopping = true

  Thread.new do
    GrpcKit.logger.debug('graceful shutdown')
    @mutex.synchronize { @sessions.each(&:drain) }

    begin
      sec = timeout ? @shutdown_timeout : 0
      Timeout.timeout(sec) do
        sleep 1 until @sessions.empty?
      end
    rescue Timeout::Error => _
      GrpcKit.logger.error("Graceful shutdown is timeout (#{@shutdown_timeout}sec). Perform shutdown forceibly")
      shutdown_sessions
    end
  end
end
handle(handler) click to toggle source

@param handler [GrpcKit::Grpc::GenericService] gRPC handler object or class @return [void]

# File lib/grpc_kit/server.rb, line 33
def handle(handler)
  klass = handler.is_a?(Class) ? handler : handler.class
  unless klass.include?(GrpcKit::Grpc::GenericService)
    raise "#{klass} must include Grpc::GenericService"
  end

  klass.rpc_descs.each do |path, rpc_desc|
    if @rpc_descs[path]
      raise "Duplicated method registered #{path}, class: #{klass}"
    end

    @rpc_descs[path] = rpc_desc.build_server(
      handler.is_a?(Class) ? handler.new : handler,
      interceptors: @interceptors,
      max_receive_message_size: @max_receive_message_size,
      max_send_message_size: @max_send_message_size,
    )
  end
end
run(conn) click to toggle source

@param conn [TCPSocket] @return [void]

# File lib/grpc_kit/server.rb, line 55
def run(conn)
  raise 'Stopping server' if @stopping

  establish_session(conn) do |s|
    s.submit_settings(@settings)
    s.start
  end
end
session_count() click to toggle source
# File lib/grpc_kit/server.rb, line 97
def session_count
  @mutex.synchronize { @sessions.size }
end

Private Instance Methods

dispatcher() click to toggle source
# File lib/grpc_kit/server.rb, line 103
def dispatcher
  @dispatcher ||= GrpcKit::RpcDispatcher.new(@rpc_descs, min: @min_pool_size, max: @max_pool_size)
end
establish_session(conn) { |session| ... } click to toggle source
# File lib/grpc_kit/server.rb, line 111
def establish_session(conn)
  session = GrpcKit::Session::ServerSession.new(GrpcKit::Session::IO.new(conn), dispatcher)
  begin
    @mutex.synchronize { @sessions << session }
    yield(session)
  ensure
    @mutex.synchronize { @sessions.delete(session) }
  end
end
shutdown_sessions() click to toggle source
# File lib/grpc_kit/server.rb, line 107
def shutdown_sessions
  @mutex.synchronize { @sessions.each(&:shutdown) }
end