class Smith::Commands::Push

Public Instance Methods

execute() click to toggle source
# File lib/smith/commands/smithctl/push.rb, line 4
def execute
  push do |ret|
    responder.succeed(ret)
  end
end

Private Instance Methods

iterator() click to toggle source

Return a interator that can iterate over whatever the input is.

# File lib/smith/commands/smithctl/push.rb, line 60
def iterator
  case
  when options[:message_given]
    if options[:number_given]
      EM::Iterator.new([options[:message]] * options[:number])
    else
      EM::Iterator.new([options[:message]])
    end
  when options[:file_given]
    FileReader.new(options[:file])
  else
    raise ArgumentError, "--number option cannot be used when reading messages from standard in." if options[:number_given]
    FileReader.new(STDIN)
  end
end
json_to_payload(data, type) click to toggle source
# File lib/smith/commands/smithctl/push.rb, line 76
def json_to_payload(data, type)
  begin
    ACL::Factory.create(type, MultiJson.load(data, :symbolize_keys => true))
  rescue NoMethodError => e
    m = /undefined method `(.*?)=' for.*/.match(e.message)
    if m
      raise ACL::Error, "Error, invalid field name: #{m[1]}"
    else
      raise ACL::Error
    end
  end
end
options_spec() click to toggle source
# File lib/smith/commands/smithctl/push.rb, line 89
def options_spec
  banner "Pushs an ACL on to a queue.", "<queue>"

  opt :type,          "message type", :type => :string, :required => true, :short => :t
  opt :message,       "the message, as json", :type => :string, :short => :m
  opt :file,          "read messages from the named file", :type => :string, :short => :f
  opt :number,        "the number of times to send the message", :type => :integer, :default => 1, :short => :n
  opt :reply,         "set a reply listener.", :short => :r
  opt :timeout,       "timeout when waiting for a reply", :type => :integer, :depends => :reply, :default => Smith.config.smith.timeout
  opt :dynamic,       "send message to a dynamic queue", :type => :boolean, :default => false, :short => :d
  opt :ignore_errors, "continue to process input data if there is an error", :type => :boolean, :default => false

  conflicts :reply, :number, :file
  conflicts :message, :file
end
push(&blk) click to toggle source
# File lib/smith/commands/smithctl/push.rb, line 12
def push(&blk)
  if target.size == 0
    blk.call("No queue specified. Please specify a queue.")
  else
    Messaging::Sender.new(target.first, :auto_delete => options[:dynamic], :persistent => true, :nowait => false, :strict => true) do |sender|
      if options[:reply]
        begin
          timeout = Smith::Messaging::Timeout.new(options[:timeout]) do |message_id|
            blk.call("Timed out after: #{options[:timeout]} seconds for message: #{options[:message]}: message_id: #{message_id}")
          end

          sender.on_reply(:timeout => timeout) { |payload| blk.call(payload.to_hash) }
          sender.publish(json_to_payload(options[:message], options[:type]))
        rescue ACL::Error, MultiJson::DecodeError => e
          blk.call(e.message)
        end
      else
        on_work = ->(message, iter) do

          error_handler = -> (e) do
            if options[:ignore_errors]
              logger.error { "#{e} #{message.strip}" }
              iter.call
            else
              blk.call(e.message)
            end
          end

          sender.on_serialisation_error do |e, _|
            error_handler.call(e)
          end

          begin
            sender.publish(json_to_payload(message, options[:type]), &iter)
          rescue MultiJson::DecodeError => e
            error_handler.call(e)
          end
        end

        on_done = -> { blk.call("") }

        iterator.each(on_work, on_done)
      end
    end
  end
end