class Signalwire::Blade::Connection

Attributes

connected[R]
connection[R]
node_id[R]
session_id[R]

Public Class Methods

new(**options) click to toggle source
# File lib/signalwire/blade/connection.rb, line 17
def initialize(**options)
  @options = options
  @session_id = nil
  @node_id = nil
  @connected = false
  @url = @options.fetch(:url, 'wss://relay.signalwire.com')
  @log_traffic = options.fetch(:log_traffic, true)
  @authentication = options.fetch(:authentication, nil)

  

  @inbound_queue = EM::Queue.new
  @outbound_queue = EM::Queue.new

  @pong = Concurrent::AtomicBoolean.new
  @keep_alive_timer = nil
  @ping_is_sent = Concurrent::AtomicBoolean.new

  @shutdown_list = []
end

Public Instance Methods

clear_connections() click to toggle source
# File lib/signalwire/blade/connection.rb, line 154
def clear_connections
  @ws = nil
  @connected = false
  @keep_alive_timer.cancel if @keep_alive_timer
end
connect!() click to toggle source
# File lib/signalwire/blade/connection.rb, line 38
def connect!
  setup_started_event
  enable_epoll
  handle_signals

  main_loop!
end
connect_request() click to toggle source
# File lib/signalwire/blade/connection.rb, line 182
def connect_request
  req = Connect.new
  req[:params][:authentication] = @authentication if @authentication
  req
end
connected?() click to toggle source
# File lib/signalwire/blade/connection.rb, line 188
def connected?
  @connected == true
end
disconnect!() click to toggle source
# File lib/signalwire/blade/connection.rb, line 160
def disconnect!
  clear_connections
  EM.stop
end
enable_epoll() click to toggle source
# File lib/signalwire/blade/connection.rb, line 97
def enable_epoll
  # This is only enabled on Linux
  EM.epoll
  logger.debug "Running with epoll #{EM.epoll?}"
end
enqueue_inbound(message) click to toggle source
# File lib/signalwire/blade/connection.rb, line 174
def enqueue_inbound(message)
  @inbound_queue.push message
end
enqueue_outbound(message) click to toggle source
# File lib/signalwire/blade/connection.rb, line 178
def enqueue_outbound(message)
  @outbound_queue.push message
end
execute(params, &block) click to toggle source
# File lib/signalwire/blade/connection.rb, line 135
def execute(params, &block)
  block_given? ? write_command(Execute.new(params), &block) : write_command(Execute.new(params))
end
flush_queues() click to toggle source
# File lib/signalwire/blade/connection.rb, line 165
def flush_queues
  @inbound_queue.pop { |inbound| receive(inbound) } until @inbound_queue.empty?
  if connected?
    @outbound_queue.pop { |outbound| write(outbound) } until @outbound_queue.empty?
  end

  schedule_flush_queues
end
handle_close() click to toggle source
# File lib/signalwire/blade/connection.rb, line 149
def handle_close
  logger.warn "WS Socket closed!"
  reconnect!
end
handle_execute_response(event, &block) click to toggle source
# File lib/signalwire/blade/connection.rb, line 130
def handle_execute_response(event, &block)
  logger.error("Blade error occurred, code #{event.error_code}: #{event.error_message}") if event.error?
  block.call(event)
end
handle_signals() click to toggle source
# File lib/signalwire/blade/connection.rb, line 223
def handle_signals
  Signal.trap('INT') do
    shutdown_from_signal
  end
  
  Signal.trap('TERM') do
    shutdown_from_signal
  end
end
keep_alive() click to toggle source
# File lib/signalwire/blade/connection.rb, line 192
def keep_alive
  if @ping_is_sent.false?
    ping do
      @pong.make_true
    end
    @ping_is_sent.make_true
  else
    if @pong.false?
      logger.error "KEEPALIVE: Ping failed"
      reconnect! if connected?
    end
    @ping_is_sent.make_false
  end

  @keep_alive_timer = EventMachine::Timer.new(Signalwire::Relay::PING_TIMEOUT) do
    keep_alive
  end
end
log_traffic(direction, message) click to toggle source
# File lib/signalwire/blade/connection.rb, line 211
def log_traffic(direction, message)
  if @log_traffic
    pretty = case direction
             when :send
               JSON.pretty_generate(JSON.parse(message))
             when :recv
               JSON.pretty_generate(message)
             end
  end
  logger.debug "#{direction.to_s.upcase}: #{pretty}"
end
main_loop!() click to toggle source
# File lib/signalwire/blade/connection.rb, line 54
def main_loop!
  EM.run do
    logger.info "CREATING SOCKET"
    @ws = Faye::WebSocket::Client.new(@url)

    @ws.on(:open) { |event| broadcast :started, event }
    @ws.on(:message) { |event| enqueue_inbound event }
    @ws.on(:close) { handle_close }

    @ws.on :error do |error|
      logger.error "Error occurred: #{error.message}"
    end

    schedule_flush_queues
  end
end
ping(&block) click to toggle source
# File lib/signalwire/blade/connection.rb, line 139
def ping(&block)
  ping_cmd = Ping.new
  block_given? ? write_command(ping_cmd, &block) : write_command(ping_cmd)
  ping_cmd
end
receive(message) click to toggle source
# File lib/signalwire/blade/connection.rb, line 112
def receive(message)
  event = Message.from_json(message.data)
  log_traffic :recv, event.payload
  EM.defer do
    broadcast :message, event
  end
end
reconnect!() click to toggle source
# File lib/signalwire/blade/connection.rb, line 46
def reconnect!
  clear_connections
  return if @shutdown
  sleep Signalwire::Blade::RECONNECT_PERIOD
  logger.info "Attempting reconnection"
  main_loop!
end
register_for_shutdown(obj) click to toggle source
# File lib/signalwire/blade/connection.rb, line 241
def register_for_shutdown(obj)
  @shutdown_list << obj
end
schedule_flush_queues() click to toggle source
# File lib/signalwire/blade/connection.rb, line 71
def schedule_flush_queues
  EM.add_timer(0.005) { flush_queues }
end
setup_started_event() click to toggle source
# File lib/signalwire/blade/connection.rb, line 75
def setup_started_event
  on :started do |_event|
    begin
      @connected = true
      myreq = connect_request
      @pong.make_false

      write_command(myreq) do |event|
        @session_id = event.dig(:result, :sessionid) unless @session_id
        @node_id = event.dig(:result, :nodeid) unless @node_d
        logger.info "Blade Session connected with id: #{@session_id}"
        broadcast :connected, event
        keep_alive
      end

    rescue StandardError => e
      logger.error e.inspect
      logger.error e.backtrace
    end
  end
end
shutdown_from_signal() click to toggle source
# File lib/signalwire/blade/connection.rb, line 234
def shutdown_from_signal
  @shutdown = true
  shutdown_registered
  disconnect!
  exit
end
shutdown_registered() click to toggle source
# File lib/signalwire/blade/connection.rb, line 245
def shutdown_registered
  @shutdown_list.each do |obj|
    obj.stop
  end
end
subscribe(params, &block) click to toggle source
# File lib/signalwire/blade/connection.rb, line 145
def subscribe(params, &block)
  block_given? ? write_command(Subscribe.new(params), &block) : write_command(Subscribe.new(params))
end
transmit(message) click to toggle source
# File lib/signalwire/blade/connection.rb, line 103
def transmit(message)
  enqueue_outbound message
end
write(message) click to toggle source
# File lib/signalwire/blade/connection.rb, line 107
def write(message)
  log_traffic :send, message
  @ws.send(message)
end
write_command(command, &block) click to toggle source
# File lib/signalwire/blade/connection.rb, line 120
def write_command(command, &block)
  if block_given?
    once :message, id: command.id do |event|
      handle_execute_response(event, &block)
    end
  end

  transmit(command.build_request.to_json)
end