module Sneakers::Worker

Constants

Classes

Attributes

id[R]
opts[R]
queue[R]

Public Class Methods

included(base) click to toggle source
# File lib/sneakers/worker.rb, line 128
def self.included(base)
  base.extend ClassMethods
  Classes << base if base.is_a? Class
end
new(queue = nil, pool = nil, opts = {}) click to toggle source
# File lib/sneakers/worker.rb, line 14
def initialize(queue = nil, pool = nil, opts = {})
  opts = opts.merge(self.class.queue_opts || {})
  queue_name = self.class.queue_name
  opts = Sneakers::CONFIG.merge(opts)

  @should_ack =  opts[:ack]
  @pool = pool || Concurrent::FixedThreadPool.new(opts[:threads] || Sneakers::Configuration::DEFAULTS[:threads])
  @call_with_params = respond_to?(:work_with_params)
  @content_type = opts[:content_type]

  @queue = queue || Sneakers::Queue.new(
    queue_name,
    opts
  )

  @opts = opts
  @id = Utils.make_worker_id(queue_name)
end

Public Instance Methods

ack!() click to toggle source
# File lib/sneakers/worker.rb, line 33
def ack!; :ack end
do_work(delivery_info, metadata, msg, handler) click to toggle source
# File lib/sneakers/worker.rb, line 44
def do_work(delivery_info, metadata, msg, handler)
  worker_trace "Working off: #{msg.inspect}"

  @pool.post do
    process_work(delivery_info, metadata, msg, handler)
  end
end
log_msg(msg) click to toggle source

Construct a log message with some standard prefix for this worker

# File lib/sneakers/worker.rb, line 118
def log_msg(msg)
  "[#{@id}][#{Thread.current}][#{@queue.name}][#{@queue.opts}] #{msg}"
end
process_work(delivery_info, metadata, msg, handler) click to toggle source
# File lib/sneakers/worker.rb, line 52
def process_work(delivery_info, metadata, msg, handler)
  res = nil
  error = nil

  begin
    metrics.increment("work.#{self.class.name}.started")
    metrics.timing("work.#{self.class.name}.time") do
      deserialized_msg = ContentType.deserialize(msg, @content_type || metadata && metadata[:content_type])

      app = -> (deserialized_msg, delivery_info, metadata, handler) do
        if @call_with_params
          work_with_params(deserialized_msg, delivery_info, metadata)
        else
          work(deserialized_msg)
        end
      end

      middlewares = Sneakers.middleware.to_a
      block_to_call = middlewares.reverse.reduce(app) do |mem, h|
        h[:class].new(mem, *h[:args])
      end
      res = block_to_call.call(deserialized_msg, delivery_info, metadata, handler)
    end
  rescue StandardError, ScriptError => ex
    res = :error
    error = ex
    worker_error(ex, log_msg: log_msg(msg), class: self.class.name,
                 message: msg, delivery_info: delivery_info, metadata: metadata)
  end

  if @should_ack

    if res == :ack
      # note to future-self. never acknowledge multiple (multiple=true) messages under threads.
      handler.acknowledge(delivery_info, metadata, msg)
    elsif res == :error
      handler.error(delivery_info, metadata, msg, error)
    elsif res == :reject
      handler.reject(delivery_info, metadata, msg)
    elsif res == :requeue
      handler.reject(delivery_info, metadata, msg, true)
    else
      handler.noop(delivery_info, metadata, msg)
    end
    metrics.increment("work.#{self.class.name}.handled.#{res || 'noop'}")
  end

  metrics.increment("work.#{self.class.name}.ended")
end
publish(msg, opts) click to toggle source
# File lib/sneakers/worker.rb, line 37
def publish(msg, opts)
  to_queue = opts.delete(:to_queue)
  opts[:routing_key] ||= to_queue
  return unless opts[:routing_key]
  @queue.exchange.publish(Sneakers::ContentType.serialize(msg, opts[:content_type]), opts)
end
reject!() click to toggle source
# File lib/sneakers/worker.rb, line 34
def reject!; :reject; end
requeue!() click to toggle source
# File lib/sneakers/worker.rb, line 35
def requeue!; :requeue; end
run() click to toggle source
# File lib/sneakers/worker.rb, line 111
def run
  worker_trace "New worker: subscribing."
  @queue.subscribe(self)
  worker_trace "New worker: I'm alive."
end
stop() click to toggle source
# File lib/sneakers/worker.rb, line 102
def stop
  worker_trace "Stopping worker: unsubscribing."
  @queue.unsubscribe
  worker_trace "Stopping worker: shutting down thread pool."
  @pool.shutdown
  @pool.wait_for_termination
  worker_trace "Stopping worker: I'm gone."
end
worker_trace(msg) click to toggle source
# File lib/sneakers/worker.rb, line 122
def worker_trace(msg)
  logger.debug(log_msg(msg))
end