class Apiotics::Server

Public Class Methods

aws_endpoint() click to toggle source
# File lib/apiotics/server.rb, line 456
def self.aws_endpoint
  s = ApioticsSetting.find_by(key: "aws_endpoint")
  if s == nil
    if Apiotics.configuration.aws_endpoint == nil
      hash = Apiotics::Portal.retrieve_configuration
      Apiotics.configuration.server_type == hash["aws_endpoint"]
      aws_endpoint = hash["aws_endpoint"]
    else
      aws_endpoint = Apiotics.configuration.aws_endpoint
    end
    s = ApioticsSetting.new
    s.key = "aws_endpoint"
    s.value = aws_endpoint
    s.save
  end
  s.value
end
ca_cert() click to toggle source
# File lib/apiotics/server.rb, line 415
def self.ca_cert
  ca_cert = ApioticsSetting.find_by(key: "ca_cert")
  if ca_cert == nil
    ca_cert = Apiotics::Portal.ca_certificate
    c = ApioticsSetting.new
    c.key = "ca_cert"
    c.value = ca_cert
    c.save
  else
    ca_cert = ca_cert.value
  end
  ca_cert = OpenSSL::X509::Certificate.new ca_cert
  return ca_cert
end
cert() click to toggle source
# File lib/apiotics/server.rb, line 392
def self.cert
  cert = ApioticsSetting.find_by(key: "cert")
  if cert == nil
    Apiotics::Portal.register_client
    cert = ApioticsSetting.find_by(key: "cert")
    cert = cert.value
  else
    cert = cert.value
  end
  cert = OpenSSL::X509::Certificate.new cert
  return cert
end
clear_db_settings() click to toggle source
# File lib/apiotics/server.rb, line 430
def self.clear_db_settings
  ApioticsSetting.destroy_all
  hive_public_key = ApioticsSetting.new
  hive_public_key.key = "hive_public_key"
  hive_public_key.value = Apiotics.configuration.public_key
  hive_public_key.save
end
close() click to toggle source
# File lib/apiotics/server.rb, line 177
def self.close
  #need to check for which server.
  if Apiotics.configuration.server_type == "aws"
    if @server != nil
      @server.disconnect
    end
  else
    if @server != nil
      @server.close
    end
  end
end
gem_id() click to toggle source
# File lib/apiotics/server.rb, line 370
def self.gem_id
  gem_id = ApioticsSetting.find_by(key: "gem_id")
  if gem_id == nil
   gem_id = Apiotics::Portal.register_client
  else
   gem_id = gem_id.value
  end
  gem_id
end
generate_cert(key, public_key) click to toggle source
# File lib/apiotics/server.rb, line 405
def self.generate_cert(key, public_key)
  csr = OpenSSL::X509::Request.new
  csr.version = 0
  csr.subject = OpenSSL::X509::Name.parse "CN=simbiotes.com/O=#{Apiotics.configuration.public_key}/OU=#{Apiotics.configuration.private_key}"
  csr.public_key = key.public_key
  csr.sign key, OpenSSL::Digest::SHA1.new
  cert = Apiotics::Portal.generate_certificate(csr)
  return cert
end
key() click to toggle source
# File lib/apiotics/server.rb, line 380
def self.key
  key = ApioticsSetting.find_by(key: "private_key")
  if key == nil
    Apiotics::Portal.register_client
    key = ApioticsSetting.find_by(key: "private_key")
    key = key.value
  else
    key = key.value
  end
  return key
end
lookup() click to toggle source
# File lib/apiotics/server.rb, line 341
def self.lookup
  if Apiotics.configuration.tls == true
    socket = TCPSocket.new(Apiotics.configuration.server, Apiotics.configuration.server_port)
    context = OpenSSL::SSL::SSLContext.new
    context.key = Server.key
    context.cert = Server.cert
    if Apiotics.configuration.verify_peer == true
      ca_tempfile = Tempfile.new
      ca_tempfile.write Server.ca_cert.to_pem
      ca_tempfile.rewind
      context.ca_file = ca_tempfile.path
      context.verify_mode = OpenSSL::SSL::VERIFY_PEER
    end
    @apiary_server = OpenSSL::SSL::SSLSocket.new socket, context
    @apiary_server.sync_close = true
    @apiary_server.connect
  else
    @apiary_server = TCPSocket.open(Apiotics.configuration.server, Apiotics.configuration.server_port)
  end
  @apiary_server.puts('{"action":"lookup"}')
  msg = @apiary_server.gets
  hash = JSON.parse(msg)
  @apiary_server.close
  if Apiotics.configuration.verify_peer == true
    ca_tempfile.close(true)
  end
  return hash
end
new() click to toggle source
# File lib/apiotics/server.rb, line 13
def initialize
  begin
    if Apiotics.configuration.redis_comms_connection == true
      if $redis == nil
        $redis = ::Redis.new
      end
    end
    @localport = Apiotics.configuration.local_port
    @error_msg = nil
    Thread.abort_on_exception = true
    hive_public_key = ApioticsSetting.find_by(key: "hive_public_key")
    if hive_public_key == nil || hive_public_key.value != Apiotics.configuration.public_key
      Server.clear_db_settings
    end
    if Apiotics::Server.server_type == "aws"
      port = Apiotics.configuration.mqtt_port
      key = Apiotics::Server.key
      cert = Apiotics::Server.cert
      ca_cert = Apiotics::Server.ca_cert
      key_file = Tempfile.new
      cert_file = Tempfile.new
      ca_cert_file = Tempfile.new
      key_file.write(key)
      key_file.rewind
      cert_file.write(cert)
      cert_file.rewind
      ca_cert_file.write(ca_cert)
      ca_cert_file.rewind
      aws_endpoint = Apiotics::Server.aws_endpoint
      gem_id = Apiotics::Server.gem_id
      Server.sync_devices(nil)
      
      @server = AwsIotDevice::MqttShadowClient::MqttManager.new
      @server.config_endpoint(aws_endpoint, port)
      @server.config_ssl_context(ca_cert_file.path, key_file.path, cert_file.path)
      #@server.persistent = true
      @server.connect(aws_endpoint, port, true, true, false)

      @server.on_message do |packet|
        msg = packet.payload
        puts "Message received: " + msg
        Server.parse_message(msg)
      end
      
      #PahoMqtt.logger = "/Users/mackie/.apiotics/paho.log"
      
      @server.subscribe("topic/#{Apiotics.configuration.public_key}/setcomplete", 1)
      @server.subscribe("topic/#{Apiotics.configuration.public_key}/setrequestack", 1)
      @server.subscribe("topic/#{Apiotics.configuration.public_key}/getack", 1)
      
      Server.ping(@server)
      
    else
      server_details = Server.lookup
      rgs = ApioticsSetting.find_by(key: "server")
      rgs_port = ApioticsSetting.find_by(key: "port")
      if rgs == nil
        if server_details["status"] == "ok"
          rgs = server_details["ip"]
          c = ApioticsSetting.new
          c.key = "server"
          c.value = rgs
          c.save
          rgs_port = server_details["port"]
          c = ApioticsSetting.new
          c.key = "port"
          c.value = rgs_port
          c.save
        else
          @error_msg = server_details["status_msg"]
        end
      else
        if server_details["status"] == "ok"
          rgs.value = server_details["ip"]
          rgs.save
          rgs = rgs.value
          rgs_port.value = server_details["port"]
          rgs_port.save
          rgs_port = rgs_port.value
        else
          @error_msg = server_details["status_msg"]
        end
      end
      if Apiotics.configuration.tls == true
        socket = TCPSocket.new(rgs, rgs_port)
        context = OpenSSL::SSL::SSLContext.new
        context.key = Server.key
        context.cert = Server.cert
        if Apiotics.configuration.verify_peer == true
          ca_tempfile = Tempfile.new
          ca_tempfile.write Server.ca_cert.to_pem
          ca_tempfile.rewind
          context.ca_file = ca_tempfile.path
          context.verify_mode = OpenSSL::SSL::VERIFY_PEER
        end
        @server = OpenSSL::SSL::SSLSocket.new socket, context
        @server.sync_close = true
        @server.connect
        if Apiotics.configuration.verify_peer == true
          ca_tempfile.close(true)
        end
      else
        @server = TCPSocket.open(rgs, rgs_port)
      end
      Server.sync_devices(nil)
    
      listen_remote
      @heartbeat_state = Hash.new
      heartbeat = {
        "action" => "heartbeat",
        "hive_id" => Apiotics.configuration.public_key,
        "instance_id" => "0000000000000000000000000000000000000000000000000000000000000000",
        "driver" => "Core::Heartbeat",
        "interface" => nil
      }
      send_heartbeat(heartbeat)
    end
    puts "Starting local server..."
    listen_local
  rescue => e
    Server.close
    puts e.message
    puts e.backtrace.inspect
    raise e
  end
end
parse_message(msg) click to toggle source
# File lib/apiotics/server.rb, line 224
def self.parse_message(msg)
  msg_hash = Apiotics::Parse.message(msg)
  #puts "Driver: " + msg_hash["driver"]
  unless msg_hash["driver"] == "Core::Heartbeat" || msg_hash["action"] == "sync" || msg_hash["error"] != nil
    r = Apiotics::Insert.new(msg_hash)
    if r.valid == true
      if r.action == "set-request-ack" || r.action == "set-complete" || r.action == "get-ack"
        r.save
        unless Apiotics.configuration.local_logging == false
          r.save_log
        end
      end
    end
  end 
  msg_hash
end
ping(server) click to toggle source
# File lib/apiotics/server.rb, line 140
def self.ping(server)
  Thread.abort_on_exception = true
  Thread.new do
    msg = {"p" => "p"}
    loop do
      server.publish("topic/ping", msg.to_json, false, 1)
      sleep 30
    end
  end
end
server_type() click to toggle source
# File lib/apiotics/server.rb, line 438
def self.server_type
  s = ApioticsSetting.find_by(key: "server_type")
  if s == nil
    if Apiotics.configuration.server_type == nil
      hash = Apiotics::Portal.retrieve_configuration
      Apiotics.configuration.server_type == hash["server_version"]
      server_type = hash["server_version"]
    else
      server_type = Apiotics.configuration.server_type
    end
    s = ApioticsSetting.new
    s.key = "server_type"
    s.value = server_type
    s.save
  end
  s.value
end
sync_devices(model_name) click to toggle source
# File lib/apiotics/server.rb, line 317
def self.sync_devices(model_name)
  value = false
  if model_name == nil
    unless Rails.env.production?
      Rails.application.eager_load!
    end
    models = ApplicationRecord.descendants.collect(&:name)
  else
    model_name = model_name + "::" + model_name
    models = [model_name]
  end
  workers = []
  models.each do |m|
    if Apiotics.configuration.targets.keys.include?(m.deconstantize)
      workers << m.deconstantize
    end
  end
  workers.each do |w|
    w.constantize.sync_devices
    value = true
  end
  return value
end

Public Instance Methods

do_at_exit() click to toggle source
# File lib/apiotics/server.rb, line 474
def do_at_exit
end
listen_local() click to toggle source
# File lib/apiotics/server.rb, line 241
def listen_local
  if Apiotics.configuration.redis_comms_connection == false
    begin
      @local_server = TCPServer.open(@localport)
      puts "Local server started..."
      loop do
        Thread.abort_on_exception = true
            Thread.fork(@local_server.accept) do |client| 
                    s = client.gets
                    if @error_msg != nil
                      string = '{"error":"' + error_msg + '"}'
                      client.puts(string)
                    end
                    #puts "Sending message to Server.send #{s}"
                    self.send(s)
            end
      end
    rescue => e
      @local_server.close
      sleep 1
      raise e
    end
  else
    $redis.subscribe(Apiotics.configuration.public_key) do |on|
      on.message do |channel, msg|
        if @error_msg != nil
                      string = '{"error":"' + error_msg + '"}'
                      client.puts(string)
                    end
        #puts msg
        self.send(msg)
      end
    end
  end
end
listen_remote() click to toggle source
# File lib/apiotics/server.rb, line 190
def listen_remote
  Thread.abort_on_exception = true
  Thread.new do
    begin
      loop do
              msg = @server.gets
              puts "Message received: " + msg
        msg_hash = Server.parse_message(msg)
        if msg_hash["driver"] == "Core::Heartbeat"
          monitor_heartbeat(msg_hash)
        elsif msg_hash["action"] == "sync"
          if Server.sync_devices(msg_hash["worker"])
            sync_msg = {
              "action" => "sync-ack",
              "worker" => msg_hash["worker"],
              "status" => "ok"
            }
          else
            sync_msg = {
              "action" => "sync-ack",
              "worker" => msg_hash["worker"],
              "status" => "error"
            }
          end
          self.send(sync_msg.to_json)
        end
      end
    rescue => e
      puts e
    end
    ActiveRecord::Base.connection.close
  end
end
monitor_heartbeat(heartbeat) click to toggle source
# File lib/apiotics/server.rb, line 304
def monitor_heartbeat(heartbeat)
  if heartbeat["action"] == "heartbeat"
    @heartbeat_state[heartbeat["interface"]] = heartbeat
    #puts @heartbeat_state
  elsif heartbeat["action"] == "heartbeat-ack"
    @heartbeat_state = Hash.new
    #puts @heartbeat_state
  end
  if @heartbeat_state.length > Apiotics.configuration.max_missed_heartbeats
    raise Apiotics::ConnectionError
  end
end
send(msg) click to toggle source
# File lib/apiotics/server.rb, line 151
def send(msg)
  # need to check for which server.
  if Apiotics.configuration.server_type == "aws"
    msg = JSON.parse(msg)
    if msg["action"] == "get"
      msg["driver"] = msg["driver"].split("::")[1]
      @server.publish("topic/#{Apiotics.configuration.public_key}/get", msg.to_json, false, 1)
    else
      #send_msg = {
      #  "state" => {
      #    "desired" => {
      #      msg["driver"].split("::")[1] => msg["interface"]
      #    }
      #  }
      #}
      #puts "Message sent: #{send_msg.to_json} to #{"$aws/things/#{msg["instance_id"]}/shadow/update"}"
      #@server.publish("$aws/things/#{msg["instance_id"]}/shadow/update", send_msg.to_json, false, 1)
      puts "Message sent: #{msg.to_json} to #{"topic/#{Apiotics.configuration.public_key}/setrequest"}"
      @server.publish("topic/#{Apiotics.configuration.public_key}/setrequest", msg.to_json, false, 1)
    end
  else  
    puts "Message sent: #{msg}"
    @server.puts( msg )
  end
end
send_heartbeat(heartbeat) click to toggle source
# File lib/apiotics/server.rb, line 277
def send_heartbeat(heartbeat)
  if Apiotics.configuration.handshake == true
    gem_id = Server.gem_id
    connect_msg = {
      "action" => "connect",
      "hive_id" => Apiotics.configuration.public_key,
      "instance_id" => gem_id
    }
    self.send(connect_msg.to_json)
  end
  Thread.abort_on_exception = true
  Thread.new do
    loop do
      heartbeat["interface"] = DateTime.now.to_i.to_s
      unless Apiotics.configuration.create_heartbeat_error == true
        heartbeat_msg = heartbeat.to_json
      else
        heartbeat_msg = heartbeat
      end
      self.send(heartbeat_msg)
      self.monitor_heartbeat(heartbeat)
      sleep Apiotics.configuration.heartbeat_interval
    end
    ActiveRecord::Base.connection.close
  end
end