module Sneakers::Worker
Constants
- Classes
Attributes
id[R]
opts[R]
queue[R]
Public Class Methods
included(base)
click to toggle source
# File lib/sneakers/worker.rb, line 128 def self.included(base) base.extend ClassMethods Classes << base if base.is_a? Class end
new(queue = nil, pool = nil, opts = {})
click to toggle source
# File lib/sneakers/worker.rb, line 14 def initialize(queue = nil, pool = nil, opts = {}) opts = opts.merge(self.class.queue_opts || {}) queue_name = self.class.queue_name opts = Sneakers::CONFIG.merge(opts) @should_ack = opts[:ack] @pool = pool || Concurrent::FixedThreadPool.new(opts[:threads] || Sneakers::Configuration::DEFAULTS[:threads]) @call_with_params = respond_to?(:work_with_params) @content_type = opts[:content_type] @queue = queue || Sneakers::Queue.new( queue_name, opts ) @opts = opts @id = Utils.make_worker_id(queue_name) end
Public Instance Methods
ack!()
click to toggle source
# File lib/sneakers/worker.rb, line 33 def ack!; :ack end
do_work(delivery_info, metadata, msg, handler)
click to toggle source
# File lib/sneakers/worker.rb, line 44 def do_work(delivery_info, metadata, msg, handler) worker_trace "Working off: #{msg.inspect}" @pool.post do process_work(delivery_info, metadata, msg, handler) end end
log_msg(msg)
click to toggle source
Construct a log message with some standard prefix for this worker
# File lib/sneakers/worker.rb, line 118 def log_msg(msg) "[#{@id}][#{Thread.current}][#{@queue.name}][#{@queue.opts}] #{msg}" end
process_work(delivery_info, metadata, msg, handler)
click to toggle source
# File lib/sneakers/worker.rb, line 52 def process_work(delivery_info, metadata, msg, handler) res = nil error = nil begin metrics.increment("work.#{self.class.name}.started") metrics.timing("work.#{self.class.name}.time") do deserialized_msg = ContentType.deserialize(msg, @content_type || metadata && metadata[:content_type]) app = -> (deserialized_msg, delivery_info, metadata, handler) do if @call_with_params work_with_params(deserialized_msg, delivery_info, metadata) else work(deserialized_msg) end end middlewares = Sneakers.middleware.to_a block_to_call = middlewares.reverse.reduce(app) do |mem, h| h[:class].new(mem, *h[:args]) end res = block_to_call.call(deserialized_msg, delivery_info, metadata, handler) end rescue StandardError, ScriptError => ex res = :error error = ex worker_error(ex, log_msg: log_msg(msg), class: self.class.name, message: msg, delivery_info: delivery_info, metadata: metadata) end if @should_ack if res == :ack # note to future-self. never acknowledge multiple (multiple=true) messages under threads. handler.acknowledge(delivery_info, metadata, msg) elsif res == :error handler.error(delivery_info, metadata, msg, error) elsif res == :reject handler.reject(delivery_info, metadata, msg) elsif res == :requeue handler.reject(delivery_info, metadata, msg, true) else handler.noop(delivery_info, metadata, msg) end metrics.increment("work.#{self.class.name}.handled.#{res || 'noop'}") end metrics.increment("work.#{self.class.name}.ended") end
publish(msg, opts)
click to toggle source
# File lib/sneakers/worker.rb, line 37 def publish(msg, opts) to_queue = opts.delete(:to_queue) opts[:routing_key] ||= to_queue return unless opts[:routing_key] @queue.exchange.publish(Sneakers::ContentType.serialize(msg, opts[:content_type]), opts) end
reject!()
click to toggle source
# File lib/sneakers/worker.rb, line 34 def reject!; :reject; end
requeue!()
click to toggle source
# File lib/sneakers/worker.rb, line 35 def requeue!; :requeue; end
run()
click to toggle source
# File lib/sneakers/worker.rb, line 111 def run worker_trace "New worker: subscribing." @queue.subscribe(self) worker_trace "New worker: I'm alive." end
stop()
click to toggle source
# File lib/sneakers/worker.rb, line 102 def stop worker_trace "Stopping worker: unsubscribing." @queue.unsubscribe worker_trace "Stopping worker: shutting down thread pool." @pool.shutdown @pool.wait_for_termination worker_trace "Stopping worker: I'm gone." end
worker_trace(msg)
click to toggle source
# File lib/sneakers/worker.rb, line 122 def worker_trace(msg) logger.debug(log_msg(msg)) end