module RabbitWQ::Work

Constants

YAML_MIMETYPE

Public Class Methods

config() click to toggle source
# File lib/rabbit_wq/work.rb, line 8
def self.config
  RabbitWQ.configuration ||= RabbitWQ::Configuration.load( RabbitWQ::DEFAULT_CONFIG_PATH )
end
enqueue( worker, options={} ) click to toggle source
# File lib/rabbit_wq/work.rb, line 12
def self.enqueue( worker, options={} )
  payload = worker.to_yaml
  enqueue_payload( payload, options )
end
enqueue_error_payload( payload, options={} ) click to toggle source
# File lib/rabbit_wq/work.rb, line 50
def self.enqueue_error_payload( payload, options={} )
  with_channel do |channel|
    error_q = channel.queue( config.error_queue, durable: true )
    error_q.publish( payload, durable: true,
                              content_type: YAML_MIMETYPE,
                              headers: options )
  end
end
enqueue_payload( payload, options={} ) click to toggle source
# File lib/rabbit_wq/work.rb, line 17
def self.enqueue_payload( payload, options={} )
  delay = options.delete( :delay )
  delay = nil if delay && delay < 5000

  if delay
    with_channel do |channel|
      delay_x = channel.direct( "#{config.delayed_exchange_prefix}-#{delay}ms", durable: true )

      work_x = channel.send( config.work_exchange_type,
                             config.work_exchange,
                             durable: true )

      channel.queue( "#{config.delayed_queue_prefix}-#{delay}ms",
                     durable: true,
                     arguments: { "x-dead-letter-exchange" => work_x.name,
                                  "x-message-ttl" => delay } ).
              bind( delay_x )

      delay_x.publish( payload, durable: true,
                                content_type: YAML_MIMETYPE,
                                headers: options )
    end

    return
  end

  with_work_exchange do |work_x, work_pub_q, work_sub_q|
    work_pub_q.publish( payload, durable: true,
                                 content_type: YAML_MIMETYPE,
                                 headers: options )
  end
end
with_channel() { |c| ... } click to toggle source
# File lib/rabbit_wq/work.rb, line 76
def self.with_channel
  Bunny.new.tap do |b|
    b.start
    begin
      b.create_channel.tap do |c|
        yield c
      end
    ensure
      b.stop
    end
  end
end
with_work_exchange() { |exchange, work_pub_q, work_sub_q| ... } click to toggle source
# File lib/rabbit_wq/work.rb, line 59
def self.with_work_exchange
  with_channel do |channel|
    begin
      exchange = channel.send( config.work_exchange_type,
                               config.work_exchange,
                               durable: true )

      work_pub_q = channel.queue( config.work_publish_queue, durable: true )
      work_sub_q = channel.queue( config.work_subscribe_queue, durable: true )
      work_sub_q.bind( exchange )

      yield exchange, work_pub_q, work_sub_q
    ensure
    end
  end
end