class Smith::Commands::Firehose

Public Instance Methods

correct_direction?() click to toggle source
# File lib/smith/commands/smithctl/firehose.rb, line 38
def correct_direction?
  options[:direction] == 'deliver' || options[:direction] == 'publish'
end
execute() click to toggle source
# File lib/smith/commands/smithctl/firehose.rb, line 5
def execute
  queue_name = target.first || '#'
  AMQP::Channel.new(Smith.connection) do |channel,ok|
    channel.topic('amq.rabbitmq.trace', :durable => true) do |exchange|
      channel.queue('smith.firehose', :durable => true) do |queue|
        if correct_direction?
          routing_key = "#{options[:direction]}.#{Smith.config.smith.namespace}.#{queue_name}"

          queue.bind(exchange, :routing_key => routing_key).subscribe do |m, payload|
            acl_type_cache = AclTypeCache.instance
            clazz = acl_type_cache.get_by_hash(m.headers['properties']['type'])
            message = {options[:direction] => clazz.new.parse_from_string(payload)}
            puts (options[:json_given]) ? message.to_json : message.inspect
          end
        else
          responder.succeed("--direction must be either deliver or publish")
        end
      end
    end
  end
end
options_spec() click to toggle source
# File lib/smith/commands/smithctl/firehose.rb, line 27
    def options_spec
      banner %(Listens on the rabbitmq firehose for the named queue and prints decoded
message to stdout.

Be warned it can produce vast amounts of outpu. You _must_ run 'rabbitmqctl
trace_on' for this to work.), "<queue>"

      opt    :json,       "return the JSON representation of the message", :short => :j
      opt    :direction,  "Show messages that are leaving the broker [deliver|publish]", :short => :d, :type => :string, :default => 'deliver'
    end