class Fluffle::Confirmer

Attributes

channel[R]

Public Class Methods

new(channel:) click to toggle source
# File lib/fluffle/confirmer.rb, line 5
def initialize(channel:)
  @channel = channel

  @pending_confirms = Concurrent::Map.new
end

Public Instance Methods

confirm_select() click to toggle source

Enables confirms on the channel and sets up callback to receive and unblock corresponding `with_confirmation` call.

# File lib/fluffle/confirmer.rb, line 13
def confirm_select
  handle_confirm = ->(tag, _multiple, nack) do
    ivar = @pending_confirms.delete tag

    if ivar
      ivar.set nack
    else
      self.logger.error "Missing confirm IVar: tag=#{tag}"
    end
  end

  # Set the channel in confirmation mode so that we can receive confirms
  # of published messages
  @channel.confirm_select handle_confirm
end
with_confirmation(timeout:) { || ... } click to toggle source

Wraps a block (which should publish a message) with a blocking check that it received a confirmation from the RabbitMQ server that the message that was received and routed successfully.

# File lib/fluffle/confirmer.rb, line 32
def with_confirmation(timeout:)
  tag = @channel.next_publish_seq_no
  confirm_ivar = Concurrent::IVar.new
  @pending_confirms[tag] = confirm_ivar

  result = yield

  nack = confirm_ivar.value timeout
  if confirm_ivar.incomplete?
    raise Errors::ConfirmTimeoutError.new('Timed out waiting for confirm')
  elsif nack
    raise Errors::NackError.new('Received nack from confirmation')
  end

  result
end