class Rx::Concurrent::ThreadPool

Attributes

pool[R]
queue[R]
size[R]

Public Class Methods

new(size = Etc.nprocessors) click to toggle source
# File lib/rx/concurrent/thread_pool.rb, line 6
def initialize(size = Etc.nprocessors)
  @pool = []
  @size = size
end

Public Instance Methods

restart() click to toggle source
# File lib/rx/concurrent/thread_pool.rb, line 28
def restart
  shutdown
  start
end
shutdown() click to toggle source
# File lib/rx/concurrent/thread_pool.rb, line 11
def shutdown
  return unless started?

  queue.close
  pool.map(&:join)
  pool.clear
end
start() click to toggle source
# File lib/rx/concurrent/thread_pool.rb, line 19
def start
  return if started?

  @queue = Queue.new
  size.times { pool << Thread.new(&worker) }

  self
end
started?() click to toggle source
# File lib/rx/concurrent/thread_pool.rb, line 33
def started?
  pool.map(&:alive?).any?
end
submit(&block) click to toggle source
# File lib/rx/concurrent/thread_pool.rb, line 37
def submit(&block)
  return unless started?
  queue << block
end

Private Instance Methods

worker() click to toggle source
# File lib/rx/concurrent/thread_pool.rb, line 45
def worker
  -> {
    while job = queue.pop
      begin
        job.call
      rescue StandardError => _
        # do nothing
      end
    end
  }
end