module Mqjob::Worker::ClassMethods

Attributes

topic[R]
topic_opts[R]

Public Instance Methods

enqueue(msg, opts={}) click to toggle source

opts

in publish message in X seconds
at publish message at specific time
init_subscription Boolean 是否先初始化一个订阅
perform_now Boolean 立即执行,通常用于测试环境减少流程
# File lib/mqjob/worker.rb, line 113
def enqueue(msg, opts={})
  if topic_opts[:topic_type] != :normal
    ::Mqjob.logger.error(__method__){
      "message enqueue only support topic_type set to normal, but got 「#{topic_opts[:topic_type]}」! After action skipped!"
    }
    return false
  end

  if !opts[:perform_now]
    @mq ||= Plugin.client(topic_opts[:client])
    @mq.publish(topic, msg, topic_opts.merge(opts))
    return true
  end

  begin
    worker = self.new({})
    if worker.respond_to?(:perform)
      msg = JSON.parse(JSON.dump(msg))
      ::Mqjob.logger.info('perform message now'){msg.inspect}
      worker.send(:process_work, nil, OpenStruct.new(payload: msg))
    else
      ::Mqjob.logger.error('perform_now required 「perform」 method, 「perform_full_msg」not supported!')
    end
  rescue => exp
    ::Mqjob.logger.error("#{self.name} perform_now") {exp}
  end
  true
end
from_topic(name, opts={}) click to toggle source

client: MQ, plugin: :pulsar, prefetch: 1, subscription_mode: SUBSCRIPTION_MODES, # 不同类型需要不同配置参数,互斥模式下需要指定订阅名 subscription_name logger: MyLogger topic_type [:normal, :regex] default normal

# File lib/mqjob/worker.rb, line 98
def from_topic(name, opts={})
  @topic = name.respond_to?(:call) ? name.call : name
  @topic_opts = opts

  topic_type = @topic_opts[:topic_type]&.to_sym
  @topic_opts[:topic_type] = topic_type || :normal

  @topic_opts[:subscription_name] ||= (self.name.split('::') << 'Consumer').join
end