module Nsqcd::Worker
Constants
- Classes
Attributes
channel[R]
id[R]
opts[R]
topic[R]
Public Class Methods
included(base)
click to toggle source
# File lib/nsqcd/worker.rb, line 75 def self.included(base) base.extend ClassMethods Classes << base if base.is_a? Class end
new(pool = nil, opts = {})
click to toggle source
# File lib/nsqcd/worker.rb, line 11 def initialize(pool = nil, opts = {}) worker_opts = opts.merge(self.class.opts || {}) worker_opts = Nsqcd::CONFIG.merge(worker_opts) @pool = pool || Concurrent::FixedThreadPool.new(worker_opts[:threads] || Nsqcd::Configuration::DEFAULTS[:threads]) @call_with_params = respond_to?(:work_with_params) @opts = worker_opts puts '==================' puts "#{self.class.name} #{@opts.inspect}" puts '==================' @id = Utils.make_worker_id(self.class.name) end
Public Instance Methods
log_msg(msg)
click to toggle source
Construct a log message with some standard prefix for this worker
# File lib/nsqcd/worker.rb, line 65 def log_msg(msg) "[#{@id}][#{Thread.current}][#{self.class.name}] #{msg}" end
process_work(msg)
click to toggle source
# File lib/nsqcd/worker.rb, line 49 def process_work(msg) begin work(msg) rescue StandardError, ScriptError => ex worker_error(ex, log_msg: log_msg(msg), class: self.class.name, message: msg) end end
publish(msg, opts)
click to toggle source
# File lib/nsqcd/worker.rb, line 43 def publish(msg, opts) topic = opts.delete(:topic) producer = Nsq::Producer.new(opts[:nsqlookupd], topic) producer.write(msg) end
reject!()
click to toggle source
# File lib/nsqcd/worker.rb, line 26 def reject!; :reject; end
requeue!()
click to toggle source
# File lib/nsqcd/worker.rb, line 27 def requeue!; :requeue; end
run()
click to toggle source
# File lib/nsqcd/worker.rb, line 29 def run worker_trace "New worker: #{self.class} running." consumer = Nsq::Consumer.new(@opts) @pool.post do loop do msg = consumer.pop worker_trace "Working off: #{msg.data.inspect} #{msg.body}" process_work(msg) msg.finish end end end
stop()
click to toggle source
# File lib/nsqcd/worker.rb, line 57 def stop worker_trace "Stopping worker: shutting down thread pool." @pool.shutdown @pool.wait_for_termination worker_trace "Stopping worker: I'm gone." end
worker_trace(msg)
click to toggle source
# File lib/nsqcd/worker.rb, line 69 def worker_trace(msg) logger.debug(log_msg(msg)) end