class Msgr::Channel

Constants

EXCHANGE_NAME

Attributes

channel[R]
config[R]

Public Class Methods

new(config, connection) click to toggle source
# File lib/msgr/channel.rb, line 11
def initialize(config, connection)
  @config  = config
  @channel = connection.create_channel
end

Public Instance Methods

ack(delivery_tag) click to toggle source
# File lib/msgr/channel.rb, line 48
def ack(delivery_tag)
  @channel.ack delivery_tag
  log(:debug) { "Acked message: #{delivery_tag}" }
end
close() click to toggle source
# File lib/msgr/channel.rb, line 63
def close
  @channel.close if @channel.open?
end
exchange() click to toggle source
# File lib/msgr/channel.rb, line 20
def exchange
  @exchange ||= begin
    @channel.topic(prefix(EXCHANGE_NAME), durable: true).tap do |ex|
      log(:debug) do
        "Created exchange #{ex.name} (type: #{ex.type}, " \
            "durable: #{ex.durable?}, auto_delete: #{ex.auto_delete?})"
      end
    end
  end
end
nack(delivery_tag) click to toggle source
# File lib/msgr/channel.rb, line 53
def nack(delivery_tag)
  @channel.nack delivery_tag, false, true
  log(:debug) { "Nacked message: #{delivery_tag}" }
end
prefetch(count) click to toggle source
# File lib/msgr/channel.rb, line 16
def prefetch(count)
  @channel.prefetch count
end
prefix(name) click to toggle source
# File lib/msgr/channel.rb, line 40
def prefix(name)
  if config[:prefix].present?
    "#{config[:prefix]}-#{name}"
  else
    name
  end
end
queue(name, **opts) click to toggle source
# File lib/msgr/channel.rb, line 31
def queue(name, **opts)
  @channel.queue(prefix(name), durable: true, **opts).tap do |queue|
    log(:debug) do
      "Create queue #{queue.name} (durable: #{queue.durable?}, " \
      "auto_delete: #{queue.auto_delete?})"
    end
  end
end
reject(delivery_tag, requeue = true) click to toggle source
# File lib/msgr/channel.rb, line 58
def reject(delivery_tag, requeue = true)
  @channel.reject delivery_tag, requeue
  log(:debug) { "Rejected message: #{delivery_tag}" }
end