class SelfSDK::MessagingClient

Constants

DEFAULT_AUTO_RECONNECT
DEFAULT_DEVICE
DEFAULT_STORAGE_DIR
ON_DEMAND_CLOSE_CODE

Attributes

ack_timeout[RW]
client[RW]
device_id[RW]
encryption_client[RW]
jwt[RW]
timeout[RW]
type_observer[RW]
uuid_observer[RW]

Public Class Methods

new(url, client, storage_key, options = {}) click to toggle source

RestClient initializer

@param url [string] self-messaging url @params client [Object] SelfSDK::Client object @option opts [string] :storage_dir the folder where encryption sessions and settings will be stored @params storage_key [String] seed to encrypt messaging @params storage_folder [String] folder to perist messaging encryption @option opts [Bool] :auto_reconnect Automatically reconnects to websocket if connection is lost (defaults to true). @option opts [String] :device_id The device id to be used by the app defaults to “1”.

# File lib/messaging.rb, line 31
def initialize(url, client, storage_key, options = {})
  @mon = Monitor.new
  @url = url
  @messages = {}
  @acks = {}
  @type_observer = {}
  @uuid_observer = {}
  @jwt = client.jwt
  @client = client
  @ack_timeout = 60 # seconds
  @timeout = 120 # seconds
  @auth_id = SecureRandom.uuid
  @device_id = options.fetch(:device_id, DEFAULT_DEVICE)
  @auto_reconnect = options.fetch(:auto_reconnect, DEFAULT_AUTO_RECONNECT)
  @raw_storage_dir = options.fetch(:storage_dir, DEFAULT_STORAGE_DIR)
  @storage_dir = "#{@raw_storage_dir}/apps/#{@jwt.id}/devices/#{@device_id}"
  FileUtils.mkdir_p @storage_dir unless File.exist? @storage_dir
  @offset_file = "#{@storage_dir}/#{@jwt.id}:#{@device_id}.offset"
  @offset = read_offset
  migrate_old_storage_format

  unless options.include? :no_crypto
    crypto_path = "#{@storage_dir}/keys"
    FileUtils.mkdir_p crypto_path unless File.exist? crypto_path
    @encryption_client = Crypto.new(@client, @device_id, crypto_path, storage_key)
  end

  if options.include? :ws
    @ws = options[:ws]
  else
    start
  end
end

Public Instance Methods

add_acl_rule(payload) click to toggle source

Allows incomming messages from the given identity

@params payload [string] base64 encoded payload to be sent

# File lib/messaging.rb, line 126
def add_acl_rule(payload)
  a = SelfMsg::Acl.new
  a.id = SecureRandom.uuid
  a.command = SelfMsg::AclCommandPERMIT
  a.payload = payload

  send_message a
end
clean_observers() click to toggle source
# File lib/messaging.rb, line 225
def clean_observers
  live = {}
  @uuid_observer.clone.each do |id, msg|
    if msg[:timeout] < SelfSDK::Time.now
      message = SelfSDK::Messages::Base.new(self)
      message.status = "errored"

      @uuid_observer[id][:block].call(message)
      @uuid_observer.delete(id)
    else
      live[id] = msg
    end
  end
  @uuid_observer = live
end
close() click to toggle source
# File lib/messaging.rb, line 75
def close
  @ws.close(ON_DEMAND_CLOSE_CODE, "connection closed by the client")
end
list_acl_rules() click to toggle source

Lists acl rules

# File lib/messaging.rb, line 148
def list_acl_rules
  wait_for 'acl_list' do
    a = SelfMsg::Acl.new
    a.id = SecureRandom.uuid
    a.command = SelfMsg::AclCommandLIST

    send_raw a
  end
end
notify_observer(message) click to toggle source

Notify the type observer for the given message

# File lib/messaging.rb, line 242
def notify_observer(message)
  if @uuid_observer.include? message.id
    observer = @uuid_observer[message.id]
    message.validate!(observer[:original_message]) if observer.include?(:original_message)
    Thread.new do
      @uuid_observer[message.id][:block].call(message)
      @uuid_observer.delete(message.id)
    end
    return
  end

  # Return if there is no observer setup for this kind of message
  return unless @type_observer.include? message.typ

  Thread.new do
    @type_observer[message.typ][:block].call(message)
  end
end
remove_acl_rule(payload) click to toggle source

Blocks incoming messages from specified identities

@params payload [string] base64 encoded payload to be sent

# File lib/messaging.rb, line 138
def remove_acl_rule(payload)
  a = SelfMsg::Acl.new
  a.id = SecureRandom.uuid
  a.command = SelfMsg::AclCommandREVOKE
  a.payload = payload

  send_message a
end
send_and_wait_for_response(msgs, original) click to toggle source

Sends a message and waits for the response

@params msg [SelfMsg::Message] message object to be sent

# File lib/messaging.rb, line 161
def send_and_wait_for_response(msgs, original)
  wait_for msgs.first.id, original do
    msgs.each do |msg|
      send_message msg
    end
  end
end
send_custom(recipient, request_body) click to toggle source

Send custom mmessage

@param recipient [string] selfID to be requested @param type [string] message type @param request [hash] original message requesing information

# File lib/messaging.rb, line 99
def send_custom(recipient, request_body)
  @client.devices(recipient).each do |to_device|
    m = SelfMsg::Message.new
    m.id = SecureRandom.uuid
    m.sender = "#{@jwt.id}:#{@device_id}"
    m.recipient = "#{recipient}:#{to_device}"
    m.ciphertext = @jwt.prepare(request_body)

    send_message m
  end

  @client.devices(@jwt.id).each do |to_device|
    if to_device != @device_id 
      m = SelfMsg::Message.new
      m.id = SecureRandom.uuid
      m.sender = "#{@jwt.id}:#{@device_id}"
      m.recipient = "#{recipient}:#{to_device}"
      m.ciphertext = @jwt.prepare(request_body)

      send_message m
    end
  end
end
send_message(msg) click to toggle source

Send a message through self network

@params msg [SelfMsg::Message] message object to be sent

# File lib/messaging.rb, line 202
def send_message(msg)
  uuid = msg.id
  @mon.synchronize do
    @acks[uuid] = {
      waiting_cond: @mon.new_cond,
      waiting: true,
      timeout: SelfSDK::Time.now + @ack_timeout,
    }
  end
  send_raw(msg)
  SelfSDK.logger.info "waiting for acknowledgement #{uuid}"
  @mon.synchronize do
    @acks[uuid][:waiting_cond].wait_while do
      @acks[uuid][:waiting]
    end
  end
  SelfSDK.logger.info "acknowledged #{uuid}"
  true
ensure
  @acks.delete(uuid)
  false
end
set_observer(original, options = {}, &block) click to toggle source
# File lib/messaging.rb, line 261
def set_observer(original, options = {}, &block)
  request_timeout = options.fetch(:timeout, @timeout)
  @uuid_observer[original.id] = { original_message: original, block: block, timeout: SelfSDK::Time.now + request_timeout }
end
share_information(recipient, recipient_device, request) click to toggle source

Responds a request information request

@param recipient [string] selfID to be requested @param recipient_device [string] device id for the selfID to be requested @param request [string] original message requesing information

# File lib/messaging.rb, line 84
def share_information(recipient, recipient_device, request)
  m = SelfMsg::Message.new
  m.id = SecureRandom.uuid 
  m.sender = "#{@jwt.id}:#{@device_id}"
  m.recipient = "#{recipient}:#{recipient_device}"
  m.ciphertext = @jwt.prepare(request)

  send_message m
end
stop() click to toggle source
# File lib/messaging.rb, line 65
def stop
  @acks.each do |k, _v|
    mark_as_acknowledged(k)
  end
  @messages.each do |k, _v|
    mark_as_acknowledged(k)
    mark_as_arrived(k)
  end
end
subscribe(type, &block) click to toggle source
# File lib/messaging.rb, line 266
def subscribe(type, &block)
  type = SelfSDK::message_type(type) if type.is_a? Symbol
  @type_observer[type] = { block: block }
end
wait_for(uuid, msg = nil) { || ... } click to toggle source

Executes the given block and waits for the message id specified on the uuid.

@params uuid [string] unique identifier for a conversation

# File lib/messaging.rb, line 173
def wait_for(uuid, msg = nil)
  SelfSDK.logger.info "sending #{uuid}"
  @mon.synchronize do
    @messages[uuid] = {
      waiting_cond: @mon.new_cond,
      waiting: true,
      timeout: SelfSDK::Time.now + @timeout,
      original_message: msg,
    }
  end

  yield

  SelfSDK.logger.info "waiting for client to respond #{uuid}"
  @mon.synchronize do
    @messages[uuid][:waiting_cond].wait_while do
      @messages[uuid][:waiting]
    end
  end

  SelfSDK.logger.info "response received for #{uuid}"
  @messages[uuid][:response]
ensure
  @messages.delete(uuid)
end

Private Instance Methods

authenticate() click to toggle source

Authenticates current client on the websocket server.

# File lib/messaging.rb, line 422
def authenticate
  @auth_id = SecureRandom.uuid if @auth_id.nil?

  SelfSDK.logger.info "authenticating"

  a = SelfMsg::Auth.new
  a.id = @auth_id
  a.token = @jwt.auth_token
  a.device = @device_id
  a.offset = @offset

  send_raw a

  @auth_id = nil
end
clean_timeouts() click to toggle source

Cleans expired messages

# File lib/messaging.rb, line 307
def clean_timeouts
  clean_observers
  clean_timeouts_for(@messages)
  clean_timeouts_for(@acks)
end
clean_timeouts_for(list) click to toggle source
# File lib/messaging.rb, line 313
def clean_timeouts_for(list)
  list.clone.each do |uuid, _msg|
    next unless list[uuid][:timeout] < SelfSDK::Time.now

    @mon.synchronize do
      SelfSDK.logger.info "message response timed out #{uuid}"
      list[uuid][:waiting] = false
      list[uuid][:waiting_cond].broadcast
    end
  end
end
mark_as_acknowledged(id) click to toggle source

Marks a message as acknowledged by the server.

# File lib/messaging.rb, line 454
def mark_as_acknowledged(id)
  return unless @acks.include? id

  @mon.synchronize do
    @acks[id][:waiting] = false
    @acks[id][:waiting_cond].broadcast
  end
end
mark_as_arrived(id) click to toggle source

Marks a message as arrived.

# File lib/messaging.rb, line 443
def mark_as_arrived(id)
  # Return if no one is waiting for this message
  return unless @messages.include? id

  @mon.synchronize do
    @messages[id][:waiting] = false
    @messages[id][:waiting_cond].broadcast
  end
end
migrate_old_storage_format() click to toggle source
# File lib/messaging.rb, line 478
def migrate_old_storage_format
  # Move the offset file
  old_offset_file = "#{@raw_storage_dir}/#{@jwt.id}:#{@device_id}.offset"
  if File.file?(old_offset_file)
    File.open(old_offset_file, 'rb') do |f|
      offset = f.read.unpack('q')[0]
      write_offset(offset)
    end
    File.delete(old_offset_file)
  end
  
  # Move all pickle files
  crypto_path = "#{@storage_dir}/keys/#{@jwt.key_id}"
  FileUtils.mkdir_p crypto_path unless File.exist? crypto_path
  Dir[File.join(@raw_storage_dir, "*.pickle")].each do |file|
    filename = File.basename(file, ".pickle")
    File.rename file, "#{crypto_path}/#{filename}.pickle"
  end
    
end
on_message(event) click to toggle source

Process an event when it arrives through the websocket connection.

# File lib/messaging.rb, line 360
def on_message(event)
  data = event.data.pack('c*')
  hdr = SelfMsg::Header.new(data: data)

  SelfSDK.logger.info " - received #{hdr.id} (#{hdr.type})"
  case hdr.type
  when SelfMsg::MsgTypeMSG
    SelfSDK.logger.info "Message #{hdr.id} received"
    m = SelfMsg::Message.new(data: data)
    process_incomming_message m
  when SelfMsg::MsgTypeACK
    SelfSDK.logger.info "#{hdr.id} acknowledged"
    mark_as_acknowledged hdr.id
  when SelfMsg::MsgTypeERR
    SelfSDK.logger.warn "error on #{hdr.id}"
    e = SelfMsg::Notification.new(data: data)
    SelfSDK.logger.warn "#{e.error}"
    mark_as_arrived(hdr.id)
  when SelfMsg::MsgTypeACL
    SelfSDK.logger.info "ACL received"
    a = SelfMsg::Acl.new(data: data)
    process_incomming_acl a
  end
rescue TypeError
  SelfSDK.logger.info "invalid array message"
end
process_incomming_acl(input) click to toggle source
# File lib/messaging.rb, line 387
def process_incomming_acl(input)
  list = JSON.parse(input.payload)

  @messages['acl_list'][:response] = list
  mark_as_arrived 'acl_list'
rescue StandardError => e
  p "Error processing incoming ACL #{input.id} #{input.payload}"
  SelfSDK.logger.info e
  SelfSDK.logger.info e.backtrace
  nil
end
process_incomming_message(input) click to toggle source
# File lib/messaging.rb, line 399
def process_incomming_message(input)
  message = SelfSDK::Messages.parse(input, self)
  @offset = input.offset
  write_offset(@offset)

  if @messages.include? message.id
    message.validate! @messages[message.id][:original_message]
    @messages[message.id][:response] = message
    mark_as_arrived message.id
  else
    SelfSDK.logger.info "Received async message #{input.id}"
    message.validate! @uuid_observer[message.id][:original_message] if @uuid_observer.include? message.id
    notify_observer(message)
  end

rescue StandardError => e
  p "Error processing incoming message #{input.to_json}"
  SelfSDK.logger.info e
  p e.backtrace
  nil
end
read_offset() click to toggle source
# File lib/messaging.rb, line 463
def read_offset
  return 0 unless File.exist? @offset_file

  File.open(@offset_file, 'rb') do |f|
    return f.read.to_i
  end
end
send_raw(msg) click to toggle source
# File lib/messaging.rb, line 438
def send_raw(msg)
  @ws.send(msg.to_fb.bytes)
end
start() click to toggle source

Start sthe websocket listener

# File lib/messaging.rb, line 274
def start
  SelfSDK.logger.info "starting"
  auth_id = @auth_id.dup

  @mon.synchronize do
    @acks[auth_id] = { waiting_cond: @mon.new_cond,
                                waiting: true,
                                timeout: SelfSDK::Time.now + @ack_timeout }
  end

  Thread.new do
    EM.run start_connection
  end

  Thread.new do
    loop { sleep 10; clean_timeouts }
  end

  @mon.synchronize do
    @acks[auth_id][:waiting_cond].wait_while { @acks[auth_id][:waiting] }
    @acks.delete(auth_id)
  end
  # In case this does not succeed start the process again.
  if @acks.include? auth_id
    if @acks[auth_id][:waiting]
      close
      start_connection
    end
    @acks.delete(auth_id)
  end
end
start_connection() click to toggle source

Creates a websocket connection and sets up its callbacks.

# File lib/messaging.rb, line 326
def start_connection
  SelfSDK.logger.info "starting listener"
  @ws = Faye::WebSocket::Client.new(@url)
  SelfSDK.logger.info "initialized"

  @ws.on :open do |_event|
    SelfSDK.logger.info "websocket connection established"
    authenticate
  end

  @ws.on :message do |event|
    on_message(event)
  end

  @ws.on :close do |event|
    if event.code == ON_DEMAND_CLOSE_CODE
      puts "client closed connection"
    else
      if !@auto_reconnect
        raise StandardError "websocket connection closed"
      end
      if !@reconnection_delay.nil?
        SelfSDK.logger.info "websocket connection closed (#{event.code}) #{event.reason}"
        sleep @reconnection_delay
        SelfSDK.logger.info "reconnecting..."
      end
      @reconnection_delay = 3
      start_connection
    end
  end
end
write_offset(offset) click to toggle source
# File lib/messaging.rb, line 471
def write_offset(offset)
  File.open(@offset_file, 'wb') do |f|
    f.flock(File::LOCK_EX)
    f.write(offset.to_s.rjust(19, "0"))
  end
end