module Smith::Messaging::Util

Public Instance Methods

number_of_consumers() { |num_consumers| ... } click to toggle source
# File lib/smith/messaging/util.rb, line 12
def number_of_consumers
  status do |_, num_consumers|
    yield num_consumers
  end
end
number_of_messages() { |num_messages| ... } click to toggle source
# File lib/smith/messaging/util.rb, line 6
def number_of_messages
  status do |num_messages, _|
    yield num_messages
  end
end

Private Instance Methods

open_channel(opts={}, &blk) click to toggle source
# File lib/smith/messaging/util.rb, line 23
def open_channel(opts={}, &blk)
  AMQP::Channel.new(Smith.connection) do |channel,ok|
    logger.verbose { "Opened channel: #{"%x" % channel.object_id}" }

    # Set up auto-recovery. This will ensure that amqp will
    # automatically reconnet to the broker if there is an error.
    channel.auto_recovery = true
    logger.verbose { "Channel auto recovery set to ture" }

    # Set up QOS. If you do not do this then any subscribes will get
    # overwhelmed if there are too many messages.
    prefetch = opts[:prefetch] || Smith.config.agent.prefetch

    channel.prefetch(prefetch)
    logger.verbose { "AMQP prefetch set to: #{prefetch}" }

    blk.call(channel)
  end
end
option_or_default(options, key, default, &blk) click to toggle source
# File lib/smith/messaging/util.rb, line 47
def option_or_default(options, key, default, &blk)
  if options.is_a?(Hash)
    if options.key?(key)
      v = options.delete(key)
      (blk) ? blk.call(v) : v
    else
      default
    end
  else
    raise ArguementError, "Options must be a Hash."
  end
end
random(prefix = '', suffix = '') click to toggle source
# File lib/smith/messaging/util.rb, line 43
def random(prefix = '', suffix = '')
  "#{prefix}#{SecureRandom.hex(8)}#{suffix}"
end