class MqttBench
MQTT benchmark software written in Ruby Message is created in JSON format. Because each message has minimum fields, minimum message size can be calculated as follows:
Example configuration: {
"mqtt_opts": { "host": "127.0.0.1", "port": 1883, "keep_alive": 15 }, "bench_opts": { "num_clients": 512, "data_size": 64, "num_msgs": 1000, "interval": 0 }
}
Example message:
{:cid=>"001", :data=>"", :mid=>"00001", :time=>1455754303515}
Field name and symbols have 50 bytes. “data” field size can be specified in configuration. “cid” (client id) and “mid” (message id) have number of digits with maximum number specified in configuration (num_clients and num_msgs. Time to send a message is recorded in the message as time field in ms.
Attributes
results[RW]
total_results[RW]
Public Class Methods
new(conf_file)
click to toggle source
# File bin/mqtt_bench, line 40 def initialize(conf_file) conf = JSON.parse(open(conf_file || File.dirname(__FILE__) + '/../conf/mqtt_bench.conf').read, symbolize_names: true) @mqtt_opts = { host: "127.0.0.1", port: 1883, keep_alive: 15 }.merge(conf[:mqtt_opts]) @bench_opts = { topic: "mqtt_bench", num_clients: 5, data_size: 128, num_msgs: 100, interval: 1, time_key: "time", timestamp: nil }.merge(conf[:bench_opts]) end
Public Instance Methods
align_digits(basis, target)
click to toggle source
# File bin/mqtt_bench, line 63 def align_digits(basis, target) sprintf("%0#{@bench_opts[basis].to_s.length}d", target) end
curr_time()
click to toggle source
# File bin/mqtt_bench, line 59 def curr_time Time.now.instance_eval { self.to_i * 1000 + (usec/1000) } end
print_results()
click to toggle source
# File bin/mqtt_bench, line 123 def print_results total_duration = @total_results[:end_time] - @total_results[:start_time] results = [] @results.size.times do results << @results.pop end duration = results.map {|r| r[:end_time] - r[:start_time]} avg_duration = duration.inject(:+).to_f / duration.size dev_duration = duration.map {|d| (d - avg_duration)**2} stddev_duration = Math.sqrt(dev_duration.inject(:+).to_f) / dev_duration.size puts "total duration:\t\t\t\t#{total_duration} ms" puts "total throughput:\t\t\t#{@bench_opts[:num_msgs].to_f * @bench_opts[:num_clients] / total_duration} msgs/ms" puts "average duration/client:\t\t#{avg_duration} ms" puts "standard deviation of duration/client:\t#{stddev_duration} ms" end
start()
click to toggle source
# File bin/mqtt_bench, line 67 def start $stderr.puts "start mqtt #{@mqtt_opts[:host]}" @clients = [] @threads = [] @results = Queue.new @total_results = {} @data = (0..(@bench_opts[:data_size]-1)).map {|i| i % 10}.join @total_results[:start_time] = curr_time @bench_opts[:num_clients].times do |i| @threads[i] = Thread.new do result = {} msg_count = 0 topic = "#{@bench_opts[:topic]}/#{align_digits(:num_clients, i)}" message = { cid: align_digits(:num_clients, i), data: @data } begin client = MQTT::Client.new(@mqtt_opts) result[:start_time] = curr_time client.connect while (true) message[:mid] = align_digits(:num_msgs, msg_count + 1) message[@bench_opts[:time_key].to_sym] = curr_time message[@bench_opts[:timestamp].to_sym] = Time.now.utc.iso8601 if !@bench_opts[:timestamp].nil? client.publish(topic, message.to_json) msg_count += 1 break if @bench_opts[:num_msgs] - msg_count <= 0 sleep @bench_opts[:interval] end client.disconnect result[:end_time] = curr_time @results.push(result) rescue MQTT::ProtocolException => e $stderr.puts "Protocol error occurs: #{e.class},#{e.message}" next rescue Timeout::Error => e $stderr.puts "Timeout error occurs: #{e.class},#{e.message}" next rescue SystemCallError => e $stderr.puts "System call error occurs: #{e.class},#{e.message}" next rescue StandardError=> e $stderr.puts "The other error occurs: #{e.class},#{e.message}" next end end #p @threads[i] end end
wait_clients()
click to toggle source
# File bin/mqtt_bench, line 118 def wait_clients ThreadsWait.all_waits(*@threads) # {|th| puts "end #{th.inspect}"} @total_results[:end_time] = curr_time end