class Smith::Commands::Push
Public Instance Methods
execute()
click to toggle source
# File lib/smith/commands/smithctl/push.rb, line 4 def execute push do |ret| responder.succeed(ret) end end
Private Instance Methods
iterator()
click to toggle source
Return a interator that can iterate over whatever the input is.
# File lib/smith/commands/smithctl/push.rb, line 60 def iterator case when options[:message_given] if options[:number_given] EM::Iterator.new([options[:message]] * options[:number]) else EM::Iterator.new([options[:message]]) end when options[:file_given] FileReader.new(options[:file]) else raise ArgumentError, "--number option cannot be used when reading messages from standard in." if options[:number_given] FileReader.new(STDIN) end end
json_to_payload(data, type)
click to toggle source
# File lib/smith/commands/smithctl/push.rb, line 76 def json_to_payload(data, type) begin ACL::Factory.create(type, MultiJson.load(data, :symbolize_keys => true)) rescue NoMethodError => e m = /undefined method `(.*?)=' for.*/.match(e.message) if m raise ACL::Error, "Error, invalid field name: #{m[1]}" else raise ACL::Error end end end
options_spec()
click to toggle source
# File lib/smith/commands/smithctl/push.rb, line 89 def options_spec banner "Pushs an ACL on to a queue.", "<queue>" opt :type, "message type", :type => :string, :required => true, :short => :t opt :message, "the message, as json", :type => :string, :short => :m opt :file, "read messages from the named file", :type => :string, :short => :f opt :number, "the number of times to send the message", :type => :integer, :default => 1, :short => :n opt :reply, "set a reply listener.", :short => :r opt :timeout, "timeout when waiting for a reply", :type => :integer, :depends => :reply, :default => Smith.config.smith.timeout opt :dynamic, "send message to a dynamic queue", :type => :boolean, :default => false, :short => :d opt :ignore_errors, "continue to process input data if there is an error", :type => :boolean, :default => false conflicts :reply, :number, :file conflicts :message, :file end
push(&blk)
click to toggle source
# File lib/smith/commands/smithctl/push.rb, line 12 def push(&blk) if target.size == 0 blk.call("No queue specified. Please specify a queue.") else Messaging::Sender.new(target.first, :auto_delete => options[:dynamic], :persistent => true, :nowait => false, :strict => true) do |sender| if options[:reply] begin timeout = Smith::Messaging::Timeout.new(options[:timeout]) do |message_id| blk.call("Timed out after: #{options[:timeout]} seconds for message: #{options[:message]}: message_id: #{message_id}") end sender.on_reply(:timeout => timeout) { |payload| blk.call(payload.to_hash) } sender.publish(json_to_payload(options[:message], options[:type])) rescue ACL::Error, MultiJson::DecodeError => e blk.call(e.message) end else on_work = ->(message, iter) do error_handler = -> (e) do if options[:ignore_errors] logger.error { "#{e} #{message.strip}" } iter.call else blk.call(e.message) end end sender.on_serialisation_error do |e, _| error_handler.call(e) end begin sender.publish(json_to_payload(message, options[:type]), &iter) rescue MultiJson::DecodeError => e error_handler.call(e) end end on_done = -> { blk.call("") } iterator.each(on_work, on_done) end end end end