module Mqjob::WorkerGroup
Public Class Methods
configure(opts)
click to toggle source
设置线程数并返回新类型 opts
threads 设置线程池大小
# File lib/mqjob/worker_group.rb, line 46 def self.configure(opts) thread_size = opts[:threads] raise "threads was required!" if thread_size.to_i.zero? workers = Array(opts[:clazz]) raise "clazz was required!" if workers.empty? md = Module.new md_name = "WorkerGroup#{md.object_id}".tr('-', '_') md.include(self) md.class_eval <<-RUBY, __FILE__, __LINE__+1 def threads #{thread_size} end def worker_classes #{workers} end private :threads, :workers RUBY ::Mqjob.const_set(md_name, md) end
new()
click to toggle source
# File lib/mqjob/worker_group.rb, line 3 def initialize @stoped = false # 统一线程池,防止数据库连接池不够用,推荐设置为10 @pool = ::Mqjob::ThreadPool.new(threads) end
Public Instance Methods
after_start()
click to toggle source
# File lib/mqjob/worker_group.rb, line 33 def after_start puts "call #{__method__}" end
before_fork()
click to toggle source
# File lib/mqjob/worker_group.rb, line 9 def before_fork ::Mqjob.hooks.before_fork&.call end
reload()
click to toggle source
# File lib/mqjob/worker_group.rb, line 29 def reload puts "call #{__method__}" end
run()
click to toggle source
# File lib/mqjob/worker_group.rb, line 13 def run return if @stoped ::Mqjob.hooks.after_fork&.call workers.each do |worker| worker.run end end
stop()
click to toggle source
# File lib/mqjob/worker_group.rb, line 23 def stop workers.each{|wc| wc.stop} @stoped = true end
workers()
click to toggle source
# File lib/mqjob/worker_group.rb, line 37 def workers @workers ||= worker_classes.map do |wc| wc.new(pool: @pool) end end