module Mqjob::Worker
Constants
- SUBSCRIPTION_MODES
Public Class Methods
new(opts)
click to toggle source
# File lib/mqjob/worker.rb, line 5 def initialize(opts) @pool = opts[:pool] @topic = self.class.topic @topic_opts = self.class.topic_opts @mq = Plugin.client(@topic_opts[:client]) end
Private Class Methods
included(base)
click to toggle source
# File lib/mqjob/worker.rb, line 82 def self.included(base) base.extend ClassMethods ::Mqjob.regist_class(base) if base.is_a? Class end
Public Instance Methods
ack!()
click to toggle source
# File lib/mqjob/worker.rb, line 13 def ack!; :ack end
do_work(cmd, msg)
click to toggle source
# File lib/mqjob/worker.rb, line 17 def do_work(cmd, msg) @pool.post do begin wrap_perform = ::Mqjob.hooks&.wrap_perform ::Mqjob.logger.debug(__method__){'Begin process'} if wrap_perform.nil? process_work(cmd, msg) else wrap_perform.call do process_work(cmd, msg) end end ::Mqjob.logger.debug(__method__){'Finish process'} rescue => exp ::Mqjob.logger.error(__method__){"message process error: #{exp.message}! cmd: #{cmd}, msg: #{msg}"} ::Mqjob.logger.error(__method__){exp} end end end
perform(msg)
click to toggle source
# File lib/mqjob/worker.rb, line 48 def perform(msg); end
reject!()
click to toggle source
# File lib/mqjob/worker.rb, line 14 def reject!; :reject; end
requeue!()
click to toggle source
# File lib/mqjob/worker.rb, line 15 def requeue!; :requeue; end
run()
click to toggle source
# File lib/mqjob/worker.rb, line 40 def run @mq.listen(@topic, self, @topic_opts) end
stop()
click to toggle source
# File lib/mqjob/worker.rb, line 44 def stop @mq.close_listen end
Private Instance Methods
process_work(cmd, msg)
click to toggle source
# File lib/mqjob/worker.rb, line 51 def process_work(cmd, msg) RequestStore.clear! if Object.const_defined?(:RequestStore) result = nil begin result = if respond_to?(:perform_full_msg) ::Mqjob.logger.info(__method__){"perform_full_msg: #{msg.inspect}"} perform_full_msg(cmd, msg) else ::Mqjob.logger.info(__method__){"perform: #{msg.payload.inspect}"} perform(msg.payload) end rescue => exp ::Mqjob.logger.error(__method__){exp} result = :error end case result when :error, :reject ::Mqjob.logger.info(__method__) {"Redeliver messages! Current message id is: #{msg.message_id.inspect}"} msg.nack when :requeue ::Mqjob.logger.info(__method__) {"Requeue! message id is: #{msg.message_id.inspect}"} msg.ack self.class.enqueue(msg.payload, in: 10) else ::Mqjob.logger.info(__method__) {"Acknowledge message!"} msg.ack end end