module RabbitWQ::Work
Constants
- YAML_MIMETYPE
Public Class Methods
config()
click to toggle source
# File lib/rabbit_wq/work.rb, line 8 def self.config RabbitWQ.configuration ||= RabbitWQ::Configuration.load( RabbitWQ::DEFAULT_CONFIG_PATH ) end
enqueue( worker, options={} )
click to toggle source
# File lib/rabbit_wq/work.rb, line 12 def self.enqueue( worker, options={} ) payload = worker.to_yaml enqueue_payload( payload, options ) end
enqueue_error_payload( payload, options={} )
click to toggle source
# File lib/rabbit_wq/work.rb, line 50 def self.enqueue_error_payload( payload, options={} ) with_channel do |channel| error_q = channel.queue( config.error_queue, durable: true ) error_q.publish( payload, durable: true, content_type: YAML_MIMETYPE, headers: options ) end end
enqueue_payload( payload, options={} )
click to toggle source
# File lib/rabbit_wq/work.rb, line 17 def self.enqueue_payload( payload, options={} ) delay = options.delete( :delay ) delay = nil if delay && delay < 5000 if delay with_channel do |channel| delay_x = channel.direct( "#{config.delayed_exchange_prefix}-#{delay}ms", durable: true ) work_x = channel.send( config.work_exchange_type, config.work_exchange, durable: true ) channel.queue( "#{config.delayed_queue_prefix}-#{delay}ms", durable: true, arguments: { "x-dead-letter-exchange" => work_x.name, "x-message-ttl" => delay } ). bind( delay_x ) delay_x.publish( payload, durable: true, content_type: YAML_MIMETYPE, headers: options ) end return end with_work_exchange do |work_x, work_pub_q, work_sub_q| work_pub_q.publish( payload, durable: true, content_type: YAML_MIMETYPE, headers: options ) end end
with_channel() { |c| ... }
click to toggle source
# File lib/rabbit_wq/work.rb, line 76 def self.with_channel Bunny.new.tap do |b| b.start begin b.create_channel.tap do |c| yield c end ensure b.stop end end end
with_work_exchange() { |exchange, work_pub_q, work_sub_q| ... }
click to toggle source
# File lib/rabbit_wq/work.rb, line 59 def self.with_work_exchange with_channel do |channel| begin exchange = channel.send( config.work_exchange_type, config.work_exchange, durable: true ) work_pub_q = channel.queue( config.work_publish_queue, durable: true ) work_sub_q = channel.queue( config.work_subscribe_queue, durable: true ) work_sub_q.bind( exchange ) yield exchange, work_pub_q, work_sub_q ensure end end end