class GrpcKit::RpcDispatcher
Constants
- DEFAULT_MAX
- DEFAULT_MIN
Public Class Methods
new(rpcs, max: DEFAULT_MAX, min: DEFAULT_MIN, interval: 30)
click to toggle source
@param rpcs [Hash<String,GrpcKit::RpcDesc>] @param min [Integer] A mininum thread pool size @param max [Integer] A maximum thread pool size @param interval [Integer] An interval time of calling trim
# File lib/grpc_kit/rpc_dispatcher.rb, line 16 def initialize(rpcs, max: DEFAULT_MAX, min: DEFAULT_MIN, interval: 30) @rpcs = rpcs @max_pool_size = max @min_pool_size = min unless max == min @auto_trimmer = AutoTrimmer.new(self, interval: interval).tap(&:start!) end @shutdown = false @tasks = Queue.new @spawned = 0 @workers = [] @mutex = Mutex.new @min_pool_size.times { spawn_thread } end
Public Instance Methods
schedule(task)
click to toggle source
@param task [Object] task to dispatch
# File lib/grpc_kit/rpc_dispatcher.rb, line 34 def schedule(task) if task.nil? return end if @shutdown raise "scheduling new task isn't allowed during shutdown" end @tasks.push(task) if @tasks.size > 1 && @mutex.synchronize { @spawned < @max_pool_size } spawn_thread end end
shutdown()
click to toggle source
# File lib/grpc_kit/rpc_dispatcher.rb, line 49 def shutdown @shutdown = true @auto_trimmer.stop if @auto_trimmer @max_pool_size.times { @tasks.push(nil) } end
trim(force = false)
click to toggle source
# File lib/grpc_kit/rpc_dispatcher.rb, line 55 def trim(force = false) if (force || @tasks.empty?) && @mutex.synchronize { @spawned > @min_pool_size } GrpcKit.logger.debug("Decrease RpcDipatcher's worker. Next worker size is #{@spawned - 1}") @tasks.push(nil) end end
Private Instance Methods
dispatch(stream, control_queue)
click to toggle source
# File lib/grpc_kit/rpc_dispatcher.rb, line 64 def dispatch(stream, control_queue) transport = GrpcKit::Transport::ServerTransport.new(control_queue, stream) server_stream = GrpcKit::Stream::ServerStream.new(transport) path = stream.headers.path rpc = @rpcs[path] unless rpc e = GrpcKit::Errors::Unimplemented.new(path) server_stream.send_status(status: e.code, msg: e.message) return end server_stream.invoke(rpc) end
spawn_thread()
click to toggle source
# File lib/grpc_kit/rpc_dispatcher.rb, line 79 def spawn_thread @spawned += 1 worker = Thread.new(@spawned) do |i| Thread.current.name = "RpcDispatcher #{i}" GrpcKit.logger.debug("#{Thread.current.name} started") loop do if @shutdown break end task = @tasks.pop if task.nil? break end begin dispatch(task[0], task[1]) rescue Exception => e # rubocop:disable Lint/RescueException GrpcKit.logger.error("An error occured on top level in worker #{Thread.current.name}: #{e.message} (#{e.class})\n #{e.backtrace.join("\n")}") end end GrpcKit.logger.debug("#{Thread.current.name} stopped") @mutex.synchronize do @spawned -= 1 @workers.delete(worker) end end @workers.push(worker) end