class MqttBench

MQTT benchmark software written in Ruby Message is created in JSON format. Because each message has minimum fields, minimum message size can be calculated as follows:

Example configuration: {

"mqtt_opts": {
  "host": "127.0.0.1",
  "port": 1883,
  "keep_alive": 15
},
"bench_opts": {
  "num_clients": 512,
  "data_size": 64,
  "num_msgs": 1000,
  "interval": 0
}

}

Example message:

{:cid=>"001", :data=>"", :mid=>"00001", :time=>1455754303515}

Field name and symbols have 50 bytes. “data” field size can be specified in configuration. “cid” (client id) and “mid” (message id) have number of digits with maximum number specified in configuration (num_clients and num_msgs. Time to send a message is recorded in the message as time field in ms.

Attributes

results[RW]
total_results[RW]

Public Class Methods

new(conf_file) click to toggle source
# File bin/mqtt_bench, line 40
def initialize(conf_file)
  conf = JSON.parse(open(conf_file || File.dirname(__FILE__) + '/../conf/mqtt_bench.conf').read, symbolize_names: true)
  @mqtt_opts = {
    host: "127.0.0.1",
    port: 1883,
    keep_alive: 15
  }.merge(conf[:mqtt_opts])

  @bench_opts = {
    topic: "mqtt_bench",
    num_clients: 5,
    data_size: 128,
    num_msgs: 100,
    interval: 1,
    time_key: "time",
    timestamp: nil
  }.merge(conf[:bench_opts])
end

Public Instance Methods

align_digits(basis, target) click to toggle source
# File bin/mqtt_bench, line 63
def align_digits(basis, target)
  sprintf("%0#{@bench_opts[basis].to_s.length}d", target)
end
curr_time() click to toggle source
# File bin/mqtt_bench, line 59
def curr_time
  Time.now.instance_eval { self.to_i * 1000 + (usec/1000) }
end
print_results() click to toggle source
start() click to toggle source
# File bin/mqtt_bench, line 67
def start
  $stderr.puts "start mqtt #{@mqtt_opts[:host]}"
  @clients = []
  @threads = []
  @results = Queue.new
  @total_results = {}
  @data = (0..(@bench_opts[:data_size]-1)).map {|i| i % 10}.join
  @total_results[:start_time] = curr_time
  @bench_opts[:num_clients].times do |i|
    @threads[i] = Thread.new do
      result = {}
      msg_count = 0
      topic = "#{@bench_opts[:topic]}/#{align_digits(:num_clients, i)}"
      message = {
        cid: align_digits(:num_clients, i),
        data: @data
      }
      begin
        client = MQTT::Client.new(@mqtt_opts)
        result[:start_time] = curr_time
        client.connect
        while (true)
          message[:mid] = align_digits(:num_msgs, msg_count + 1)
          message[@bench_opts[:time_key].to_sym] = curr_time
          message[@bench_opts[:timestamp].to_sym] = Time.now.utc.iso8601 if !@bench_opts[:timestamp].nil?
          client.publish(topic, message.to_json)
          msg_count += 1
          break if @bench_opts[:num_msgs] - msg_count <= 0
          sleep @bench_opts[:interval]
        end
        client.disconnect
        result[:end_time] = curr_time
        @results.push(result)
      rescue MQTT::ProtocolException => e
        $stderr.puts "Protocol error occurs: #{e.class},#{e.message}"
        next
      rescue Timeout::Error => e
        $stderr.puts "Timeout error occurs: #{e.class},#{e.message}"
        next
      rescue SystemCallError => e
        $stderr.puts "System call error occurs: #{e.class},#{e.message}"
        next
      rescue StandardError=> e
        $stderr.puts "The other error occurs: #{e.class},#{e.message}"
        next
      end
    end
    #p @threads[i]
  end
end
wait_clients() click to toggle source
# File bin/mqtt_bench, line 118
def wait_clients
  ThreadsWait.all_waits(*@threads) # {|th| puts "end #{th.inspect}"}
  @total_results[:end_time] = curr_time
end