class SongkickQueue::Worker

Attributes

client[R]
consumer_classes[R]
process_name[R]

Public Class Methods

new(process_name, consumer_classes = []) click to toggle source

@param process_name [String] of the custom process name to use @param consumer_classes [Array<Class>, Class] of consumer class names

# File lib/songkick_queue/worker.rb, line 7
def initialize(process_name, consumer_classes = [])
  @process_name = process_name
  @consumer_classes = Array(consumer_classes)

  if @consumer_classes.empty?
    fail ArgumentError, 'no consumer classes given to Worker'
  end

  @client = Client.new
end

Public Instance Methods

run() click to toggle source

Subscribes the consumers classes to their defined message queues and blocks until all the work pool consumers have finished. Also sets up signal catching for graceful exits no interrupt

# File lib/songkick_queue/worker.rb, line 21
def run
  set_process_name

  consumer_classes.each do |consumer_class|
    subscribe_to_queue(consumer_class)
  end

  setup_signal_catching
  stop_if_signal_caught

  channel.work_pool.join
end

Private Instance Methods

channel() click to toggle source
# File lib/songkick_queue/worker.rb, line 113
def channel
  client.channel
end
config() click to toggle source
# File lib/songkick_queue/worker.rb, line 121
def config
  SongkickQueue.configuration
end
logger() click to toggle source
# File lib/songkick_queue/worker.rb, line 117
def logger
  config.logger
end
process_message(consumer_class, delivery_info, properties, message) click to toggle source

Handle receipt of a subscribed message

@param consumer_class [Class] that was subscribed to @param delivery_info [Bunny::DeliveryInfo] @param properties [Bunny::MessageProperties] @param message [String] to deserialize

# File lib/songkick_queue/worker.rb, line 83
def process_message(consumer_class, delivery_info, properties, message)
  message = JSON.parse(message, symbolize_names: true)

  message_id = message.fetch(:message_id)
  produced_at = message.fetch(:produced_at)
  payload = message.fetch(:payload)

  logger.info "Processing message #{message_id} via #{consumer_class}, produced at #{produced_at}"
  set_process_name(consumer_class, message_id)

  consumer = consumer_class.new(delivery_info, logger)

  instrumentation_options = {
    consumer_class: consumer_class.to_s,
    queue_name: consumer_class.queue_name,
    message_id: message_id,
    produced_at: produced_at,
  }
  ActiveSupport::Notifications.instrument('consume_message.songkick_queue', instrumentation_options) do
    consumer.process(payload)
  end
rescue Object => exception
  logger.error(exception)
  channel.reject(delivery_info.delivery_tag, config.requeue_rejected_messages)
else
  channel.ack(delivery_info.delivery_tag, false)
ensure
  set_process_name
end
set_process_name(status = 'idle', message_id = nil) click to toggle source

Update the name of this process, as viewed in `ps` or `top`

@example idle

set_process_name #=> "songkick_queue[idle]"

@example consumer running, namespace is removed

set_process_name(Foo::TweetConsumer, 'a729bcd8') #=> "songkick_queue[TweetConsumer#a729bcd8]"

@param status [String] of the program @param message_id [String] identifying the message currently being consumed

# File lib/songkick_queue/worker.rb, line 133
def set_process_name(status = 'idle', message_id = nil)
  formatted_status = String(status)
    .split('::')
    .last

  ident = [formatted_status, message_id]
    .compact
    .join('#')

  $PROGRAM_NAME = "#{process_name}[#{ident}]"
end
setup_signal_catching() click to toggle source
# File lib/songkick_queue/worker.rb, line 38
def setup_signal_catching
  trap('INT') { @shutdown = 'INT' }
  trap('TERM') { @shutdown = 'TERM' }
end
stop_if_signal_caught() click to toggle source

Checks for presence of @shutdown every 1 second and if found instructs all the channel's work pool consumers to shutdown. Each work pool thread will finish its current task and then join the main thread. Once all the threads have joined then `channel.work_pool.join` will cease blocking and return, causing the process to terminate.

# File lib/songkick_queue/worker.rb, line 48
def stop_if_signal_caught
  Thread.new do
    loop do
      sleep 1

      if @shutdown
        logger.info "Recevied SIG#{@shutdown}, shutting down consumers"

        @client.channel.work_pool.shutdown
        @shutdown = nil
      end
    end
  end
end
subscribe_to_queue(consumer_class) click to toggle source

Declare a queue and subscribe to it

@param consumer_class [Class] to subscribe to

# File lib/songkick_queue/worker.rb, line 66
def subscribe_to_queue(consumer_class)
  queue = channel.queue(consumer_class.queue_name, durable: true,
    arguments: { 'x-ha-policy' => 'all' })

  queue.subscribe(manual_ack: true) do |delivery_info, properties, message|
    process_message(consumer_class, delivery_info, properties, message)
  end

  logger.info "Subscribed #{consumer_class} to #{consumer_class.queue_name}"
end