module Mqjob::WorkerGroup

Public Class Methods

configure(opts) click to toggle source

设置线程数并返回新类型 opts

threads 设置线程池大小
# File lib/mqjob/worker_group.rb, line 46
    def self.configure(opts)
      thread_size = opts[:threads]
      raise "threads was required!" if thread_size.to_i.zero?
      workers = Array(opts[:clazz])
      raise "clazz was required!" if workers.empty?

      md = Module.new
      md_name = "WorkerGroup#{md.object_id}".tr('-', '_')

      md.include(self)
      md.class_eval <<-RUBY, __FILE__, __LINE__+1
        def threads
          #{thread_size}
        end

        def worker_classes
          #{workers}
        end

        private :threads, :workers
      RUBY

      ::Mqjob.const_set(md_name, md)
    end
new() click to toggle source
# File lib/mqjob/worker_group.rb, line 3
def initialize
  @stoped = false
  # 统一线程池,防止数据库连接池不够用,推荐设置为10
  @pool = ::Mqjob::ThreadPool.new(threads)
end

Public Instance Methods

after_start() click to toggle source
# File lib/mqjob/worker_group.rb, line 33
def after_start
  puts "call #{__method__}"
end
before_fork() click to toggle source
# File lib/mqjob/worker_group.rb, line 9
def before_fork
  ::Mqjob.hooks.before_fork&.call
end
reload() click to toggle source
# File lib/mqjob/worker_group.rb, line 29
def reload
  puts "call #{__method__}"
end
run() click to toggle source
# File lib/mqjob/worker_group.rb, line 13
def run
  return if @stoped

  ::Mqjob.hooks.after_fork&.call

  workers.each do |worker|
    worker.run
  end
end
stop() click to toggle source
# File lib/mqjob/worker_group.rb, line 23
def stop
  workers.each{|wc| wc.stop}

  @stoped = true
end
workers() click to toggle source
# File lib/mqjob/worker_group.rb, line 37
def workers
  @workers ||= worker_classes.map do |wc|
    wc.new(pool: @pool)
  end
end