module Nsqcd::Worker

Constants

Classes

Attributes

channel[R]
id[R]
opts[R]
topic[R]

Public Class Methods

included(base) click to toggle source
# File lib/nsqcd/worker.rb, line 75
def self.included(base)
  base.extend ClassMethods
  Classes << base if base.is_a? Class
end
new(pool = nil, opts = {}) click to toggle source
# File lib/nsqcd/worker.rb, line 11
def initialize(pool = nil, opts = {})
  worker_opts = opts.merge(self.class.opts || {})
  worker_opts = Nsqcd::CONFIG.merge(worker_opts)

  @pool = pool || Concurrent::FixedThreadPool.new(worker_opts[:threads] || Nsqcd::Configuration::DEFAULTS[:threads])
  @call_with_params = respond_to?(:work_with_params)

  @opts = worker_opts
  puts '=================='
  puts "#{self.class.name} #{@opts.inspect}"
  puts '=================='

  @id = Utils.make_worker_id(self.class.name)
end

Public Instance Methods

log_msg(msg) click to toggle source

Construct a log message with some standard prefix for this worker

# File lib/nsqcd/worker.rb, line 65
def log_msg(msg)
  "[#{@id}][#{Thread.current}][#{self.class.name}] #{msg}"
end
process_work(msg) click to toggle source
# File lib/nsqcd/worker.rb, line 49
def process_work(msg)
  begin
    work(msg)
  rescue StandardError, ScriptError => ex
    worker_error(ex, log_msg: log_msg(msg), class: self.class.name, message: msg)
  end
end
publish(msg, opts) click to toggle source
# File lib/nsqcd/worker.rb, line 43
def publish(msg, opts)
  topic = opts.delete(:topic)
  producer = Nsq::Producer.new(opts[:nsqlookupd], topic)
  producer.write(msg)
end
reject!() click to toggle source
# File lib/nsqcd/worker.rb, line 26
def reject!; :reject; end
requeue!() click to toggle source
# File lib/nsqcd/worker.rb, line 27
def requeue!; :requeue; end
run() click to toggle source
# File lib/nsqcd/worker.rb, line 29
def run
  worker_trace "New worker: #{self.class} running."
  consumer = Nsq::Consumer.new(@opts)
  @pool.post do
    loop do 
      msg = consumer.pop

      worker_trace "Working off: #{msg.data.inspect} #{msg.body}"
      process_work(msg)
      msg.finish
    end
  end
end
stop() click to toggle source
# File lib/nsqcd/worker.rb, line 57
def stop
  worker_trace "Stopping worker: shutting down thread pool."
  @pool.shutdown
  @pool.wait_for_termination
  worker_trace "Stopping worker: I'm gone."
end
worker_trace(msg) click to toggle source
# File lib/nsqcd/worker.rb, line 69
def worker_trace(msg)
  logger.debug(log_msg(msg))
end