module Mqjob::Worker

Constants

SUBSCRIPTION_MODES

Public Class Methods

new(opts) click to toggle source
# File lib/mqjob/worker.rb, line 5
def initialize(opts)
  @pool = opts[:pool]
  @topic = self.class.topic
  @topic_opts = self.class.topic_opts

  @mq = Plugin.client(@topic_opts[:client])
end

Private Class Methods

included(base) click to toggle source
# File lib/mqjob/worker.rb, line 82
def self.included(base)
  base.extend ClassMethods
  ::Mqjob.regist_class(base) if base.is_a? Class
end

Public Instance Methods

ack!() click to toggle source
# File lib/mqjob/worker.rb, line 13
def ack!; :ack end
do_work(cmd, msg) click to toggle source
# File lib/mqjob/worker.rb, line 17
def do_work(cmd, msg)
  @pool.post do
    begin
      wrap_perform = ::Mqjob.hooks&.wrap_perform

      ::Mqjob.logger.debug(__method__){'Begin process'}

      if wrap_perform.nil?
        process_work(cmd, msg)
      else
        wrap_perform.call do
          process_work(cmd, msg)
        end
      end

      ::Mqjob.logger.debug(__method__){'Finish process'}
    rescue => exp
      ::Mqjob.logger.error(__method__){"message process error: #{exp.message}! cmd: #{cmd}, msg: #{msg}"}
      ::Mqjob.logger.error(__method__){exp}
    end
  end
end
perform(msg) click to toggle source
# File lib/mqjob/worker.rb, line 48
def perform(msg); end
reject!() click to toggle source
# File lib/mqjob/worker.rb, line 14
def reject!; :reject; end
requeue!() click to toggle source
# File lib/mqjob/worker.rb, line 15
def requeue!; :requeue; end
run() click to toggle source
# File lib/mqjob/worker.rb, line 40
def run
  @mq.listen(@topic, self, @topic_opts)
end
stop() click to toggle source
# File lib/mqjob/worker.rb, line 44
def stop
  @mq.close_listen
end

Private Instance Methods

process_work(cmd, msg) click to toggle source
# File lib/mqjob/worker.rb, line 51
def process_work(cmd, msg)
  RequestStore.clear! if Object.const_defined?(:RequestStore)

  result = nil
  begin
    result = if respond_to?(:perform_full_msg)
      ::Mqjob.logger.info(__method__){"perform_full_msg: #{msg.inspect}"}
      perform_full_msg(cmd, msg)
    else
      ::Mqjob.logger.info(__method__){"perform: #{msg.payload.inspect}"}
      perform(msg.payload)
    end
  rescue => exp
    ::Mqjob.logger.error(__method__){exp}
    result = :error
  end

  case result
  when :error, :reject
    ::Mqjob.logger.info(__method__) {"Redeliver messages! Current message id is: #{msg.message_id.inspect}"}
    msg.nack
  when :requeue
    ::Mqjob.logger.info(__method__) {"Requeue! message id is: #{msg.message_id.inspect}"}
    msg.ack
    self.class.enqueue(msg.payload, in: 10)
  else
    ::Mqjob.logger.info(__method__) {"Acknowledge message!"}
    msg.ack
  end
end