class Griffin::ThreadPool
Constants
- DEFAULT_MAX
- DEFAULT_MIN
- QUEUE_SIZE
Public Class Methods
new(interval: 60, max: DEFAULT_MAX, min: DEFAULT_MIN, &block)
click to toggle source
# File lib/griffin/thread_pool.rb, line 11 def initialize(interval: 60, max: DEFAULT_MAX, min: DEFAULT_MIN, &block) @max_pool_size = max @min_pool_size = min @block = block @shutdown = false @tasks = SizedQueue.new(QUEUE_SIZE) @spawned = 0 @workers = [] @mutex = Mutex.new @waiting = 0 @min_pool_size.times { spawn_thread } @auto_trimmer = GrpcKit::RpcDispatcher::AutoTrimmer.new(self, interval: interval + rand(10)).tap(&:start!) end
Public Instance Methods
resouce_available?()
click to toggle source
# File lib/griffin/thread_pool.rb, line 44 def resouce_available? (@waiting != 0) || (@spawned != @max_pool_size) end
schedule(task, &block)
click to toggle source
# File lib/griffin/thread_pool.rb, line 27 def schedule(task, &block) if task.nil? return end if @shutdown raise "scheduling new task isn't allowed during shutdown" end # TODO: blocking now.. @tasks.push(block || task) if @mutex.synchronize { (@waiting < @tasks.size) && (@spawned < @max_pool_size) } spawn_thread end end
shutdown()
click to toggle source
# File lib/griffin/thread_pool.rb, line 48 def shutdown @shutdown = true @max_pool_size.times { @tasks.push(nil) } @auto_trimmer.stop until @workers.empty? Griffin.logger.debug("Shutdown waiting #{@waiting} workers") sleep 1 end end
trim(force = false)
click to toggle source
For GrpcKit::ThreadPool::AutoTrimmer
# File lib/griffin/thread_pool.rb, line 59 def trim(force = false) if @mutex.synchronize { (force || (@waiting > 0)) && (@spawned > @min_pool_size) } GrpcKit.logger.info("Trim worker! Next worker size #{@spawned - 1}") @tasks.push(nil) end end
Private Instance Methods
spawn_thread()
click to toggle source
# File lib/griffin/thread_pool.rb, line 68 def spawn_thread @spawned += 1 worker = Thread.new(@spawned) do |i| Thread.current.name = "Griffin worker thread #{i}" Griffin.logger.debug("#{Thread.current.name} started") loop do if @shutdown break end @mutex.synchronize { @waiting += 1 } task = @tasks.pop @mutex.synchronize { @waiting -= 1 } if task.nil? break end begin @block.call(task) rescue Exception => e # rubocop:disable Lint/RescueException Griffin.logger.error("An error occured on top level in worker #{Thread.current.name}: #{e.message} (#{e.class})\n #{Thread.current.backtrace.join("\n")} ") end end Griffin.logger.debug("worker thread #{Thread.current.name} is stopping") @mutex.synchronize do @spawned -= 1 @workers.delete(worker) end end @workers.push(worker) end