class Intrinio::Realtime::Client

Public Class Methods

new(options) click to toggle source
# File lib/intrinio-realtime.rb, line 27
def initialize(options) 
  raise "Options parameter is required" if options.nil? || !options.is_a?(Hash)

  @api_key = options[:api_key]
  raise "API Key was formatted invalidly." if @api_key && !valid_api_key?(@api_key)

  unless @api_key
    @username = options[:username]
    @password = options[:password]
    raise "API Key or Username and password are required" if @username.nil? || @username.empty? || @password.nil? || @password.empty?
  end

  @provider = options[:provider]
  raise "Provider must be 'CRYPTOQUOTE', 'FXCM', 'IEX', or 'QUODD'" unless PROVIDERS.include?(@provider)

  @channels = []
  @channels = parse_channels(options[:channels]) if options[:channels]
  bad_channels = @channels.select{|x| !x.is_a?(String)}
  raise "Invalid channels to join: #{bad_channels}" unless bad_channels.empty?

  if options[:logger] == false
    @logger = nil 
  elsif !options[:logger].nil?
    @logger = options[:logger]
  else
    @logger = Logger.new($stdout)
    @logger.level = Logger::INFO
  end

  @quotes = EventMachine::Channel.new
  @ready = false
  @joined_channels = []
  @heartbeat_timer = nil
  @selfheal_timer = nil
  @selfheal_backoffs = Array.new(SELF_HEAL_BACKOFFS)
  @ws = nil
end

Public Instance Methods

connect() click to toggle source
# File lib/intrinio-realtime.rb, line 102
def connect
  raise "Must be run from within an EventMachine run loop" unless EM.reactor_running?
  return warn("Already connected!") if @ready
  debug "Connecting..."
  
  catch :fatal do
    begin
      @closing = false
      @ready = false
      refresh_token()
      refresh_websocket()
    rescue StandardError => e
      error("Connection error: #{e} \n#{e.backtrace.join("\n")}")
      try_self_heal()
    end
  end
end
disconnect() click to toggle source
# File lib/intrinio-realtime.rb, line 120
def disconnect
  EM.cancel_timer(@heartbeat_timer) if @heartbeat_timer
  EM.cancel_timer(@selfheal_timer) if @selfheal_timer
  @ready = false
  @closing = true
  @channels = []
  @joined_channels = []
  @ws.close() if @ws
  info "Connection closed"
end
join(*channels) click to toggle source
# File lib/intrinio-realtime.rb, line 69
def join(*channels)
  channels = parse_channels(channels)
  nonconforming = channels.select{|x| !x.is_a?(String)}
  return error("Invalid channels to join: #{nonconforming}") unless nonconforming.empty?
  
  @channels.concat(channels)
  @channels.uniq!
  debug "Joining channels #{channels}"
  
  refresh_channels()
end
leave(*channels) click to toggle source
# File lib/intrinio-realtime.rb, line 81
def leave(*channels)
  channels = parse_channels(channels)
  nonconforming = channels.find{|x| !x.is_a?(String)}
  return error("Invalid channels to leave: #{nonconforming}") unless nonconforming.empty?
  
  channels.each{|c| @channels.delete(c)}
  debug "Leaving channels #{channels}"
  
  refresh_channels()
end
leave_all() click to toggle source
# File lib/intrinio-realtime.rb, line 92
def leave_all
  @channels = []
  debug "Leaving all channels"
  refresh_channels()
end
on_quote(&b) click to toggle source
# File lib/intrinio-realtime.rb, line 98
def on_quote(&b)
  @quotes.subscribe(&b)
end
provider() click to toggle source
# File lib/intrinio-realtime.rb, line 65
def provider
  @provider
end

Private Instance Methods

api_auth_url(url) click to toggle source
# File lib/intrinio-realtime.rb, line 164
def api_auth_url(url)
  if url.include? "?"
    url = "#{url}&"
  else
    url = "#{url}?"
  end

  "#{url}api_key=#{@api_key}"
end
auth_url() click to toggle source
# File lib/intrinio-realtime.rb, line 149
def auth_url 
  url = ""

  case @provider 
  when IEX then url = "https://realtime.intrinio.com/auth"
  when QUODD then url = "https://api.intrinio.com/token?type=QUODD"
  when CRYPTOQUOTE then url = "https://crypto.intrinio.com/auth"
  when FXCM then url = "https://fxcm.intrinio.com/auth"
  end

  url = api_auth_url(url) if @api_key

  url
end
debug(message) click to toggle source
# File lib/intrinio-realtime.rb, line 330
def debug(message)
  message = "IntrinioRealtime | #{message}"
  @logger.debug(message) rescue
  nil
end
error(message) click to toggle source
# File lib/intrinio-realtime.rb, line 342
def error(message)
  message = "IntrinioRealtime | #{message}"
  @logger.error(message) rescue
  nil
end
fatal(message) click to toggle source
# File lib/intrinio-realtime.rb, line 348
def fatal(message)
  message = "IntrinioRealtime | #{message}"
  @logger.fatal(message) rescue
  EM.stop_event_loop
  throw :fatal
  nil
end
heartbeat_msg() click to toggle source
# File lib/intrinio-realtime.rb, line 290
def heartbeat_msg
  case @provider 
  when IEX then {topic: 'phoenix', event: 'heartbeat', payload: {}, ref: nil}.to_json
  when QUODD then {event: 'heartbeat', data: {action: 'heartbeat', ticker: (Time.now.to_f * 1000).to_i}}.to_json
  when CRYPTOQUOTE, FXCM then {topic: 'phoenix', event: 'heartbeat', payload: {}, ref: nil}.to_json
  end
end
info(message) click to toggle source
# File lib/intrinio-realtime.rb, line 336
def info(message)
  message = "IntrinioRealtime | #{message}"
  @logger.info(message) rescue
  nil
end
join_message(channel) click to toggle source
# File lib/intrinio-realtime.rb, line 374
def join_message(channel)
  case @provider 
  when IEX
    {
      topic: parse_iex_topic(channel),
      event: "phx_join",
      payload: {},
      ref: nil
    }
  when QUODD
    {
      event: "subscribe",
      data: {
        ticker: channel,
        action: "subscribe"
      }
    }
  when CRYPTOQUOTE, FXCM
    {
      topic: channel,
      event: "phx_join",
      payload: {},
      ref: nil
    }
  end
end
leave_message(channel) click to toggle source
# File lib/intrinio-realtime.rb, line 401
def leave_message(channel)
  case @provider 
  when IEX
    {
      topic: parse_iex_topic(channel),
      event: "phx_leave",
      payload: {},
      ref: nil
    }
  when QUODD
    {
      event: "unsubscribe",
      data: {
        ticker: channel,
        action: "unsubscribe"
      }
    }
  when CRYPTOQUOTE, FXCM
    {
      topic: channel,
      event: "phx_leave",
      payload: {},
      ref: nil
    }
  end
end
parse_channels(channels) click to toggle source
# File lib/intrinio-realtime.rb, line 356
def parse_channels(channels)
  channels.flatten!
  channels.uniq!
  channels.compact!
  channels
end
parse_iex_topic(channel) click to toggle source
# File lib/intrinio-realtime.rb, line 363
def parse_iex_topic(channel)
  case channel
  when "$lobby"
    "iex:lobby"
  when "$lobby_last_price"
    "iex:lobby:last_price"
  else
    "iex:securities:#{channel}"
  end
end
process_quote(quote) click to toggle source
# File lib/intrinio-realtime.rb, line 326
def process_quote(quote)
  @quotes.push(quote)
end
ready(val) click to toggle source
# File lib/intrinio-realtime.rb, line 322
def ready(val)
  @ready = val
end
refresh_channels() click to toggle source
# File lib/intrinio-realtime.rb, line 255
def refresh_channels
  return unless @ready
  debug "Refreshing channels"
  
  # Join new channels
  new_channels = @channels - @joined_channels
  new_channels.each do |channel|
    msg = join_message(channel)
    @ws.send(msg.to_json)
    info "Joined #{channel}"
  end
  
  # Leave old channels
  old_channels = @joined_channels - @channels
  old_channels.each do |channel|
    msg = leave_message(channel)
    @ws.send(msg.to_json)
    info "Left #{channel}"
  end
  
  @channels.uniq!
  @joined_channels = Array.new(@channels)
  debug "Current channels: #{@channels}"
end
refresh_token() click to toggle source
# File lib/intrinio-realtime.rb, line 133
def refresh_token
  @token = nil

  if @api_key
    response = HTTP.get(auth_url)
  else
    response = HTTP.basic_auth(:user => @username, :pass => @password).get(auth_url)
  end

  return fatal("Unable to authorize") if response.status == 401
  return fatal("Could not get auth token") if response.status != 200

  @token = response.body
  debug "Token refreshed"
end
refresh_websocket() click to toggle source
# File lib/intrinio-realtime.rb, line 183
def refresh_websocket
  me = self

  @ws.close() unless @ws.nil?
  @ready = false
  @joined_channels = []
  
  @ws = ws = WebSocket::Client::Simple.connect(socket_url)
  me.send :info, "Connection opening"

  ws.on :open do
    me.send :info, "Connection established"
    me.send :ready, true
    if [IEX, CRYPTOQUOTE, FXCM].include?(me.send(:provider))
      me.send :refresh_channels
    end
    me.send :start_heartbeat
    me.send :stop_self_heal
  end

  ws.on :message do |frame|
    message = frame.data
    me.send :debug, "Message: #{message}"
    
    begin
      json = JSON.parse(message)

      if json["event"] == "phx_reply" && json["payload"]["status"] == "error"
        me.send :error, json["payload"]["response"]
      end

      quote =
        case me.send(:provider)
        when IEX
          if json["event"] == "quote"
            json["payload"]
          end
        when QUODD
          if json["event"] == "info" && json["data"]["message"] == "Connected"
            me.send :refresh_channels
          elsif json["event"] == "quote" || json["event"] == "trade"
            json["data"]
          end
        when CRYPTOQUOTE
          if json["event"] == "book_update" || json["event"] == "ticker" || json["event"] == "trade"
            json["payload"]
          end
        when FXCM
          if json["event"] == "price_update"
            json["payload"]
          end
        end
      
      if quote && quote.is_a?(Hash)
        me.send :process_quote, quote
      end
    rescue StandardError => e
      me.send :error, "Could not parse message: #{message} #{e}"
    end
  end
  
  ws.on :close do |e|
    me.send :disconnect
  end

  ws.on :error do |e|
    me.send :ready, false
    me.send :error, "Connection error: #{e}"
    me.send :try_self_heal
  end
end
socket_url() click to toggle source
# File lib/intrinio-realtime.rb, line 174
def socket_url 
  case @provider 
  when IEX then URI.escape("wss://realtime.intrinio.com/socket/websocket?vsn=1.0.0&token=#{@token}")
  when QUODD then URI.escape("wss://www5.quodd.com/websocket/webStreamer/intrinio/#{@token}")
  when CRYPTOQUOTE then URI.escape("wss://crypto.intrinio.com/socket/websocket?vsn=1.0.0&token=#{@token}")
  when FXCM then URI.escape("wss://fxcm.intrinio.com/socket/websocket?vsn=1.0.0&token=#{@token}")
  end
end
start_heartbeat() click to toggle source
# File lib/intrinio-realtime.rb, line 280
def start_heartbeat
  EM.cancel_timer(@heartbeat_timer) if @heartbeat_timer
  @heartbeat_timer = EM.add_periodic_timer(HEARTBEAT_TIME) do
    if msg = heartbeat_msg()
      @ws.send(msg)
      debug "Heartbeat #{msg}"
    end
  end
end
stop_heartbeat() click to toggle source
# File lib/intrinio-realtime.rb, line 298
def stop_heartbeat
  EM.cancel_timer(@heartbeat_timer) if @heartbeat_timer
end
stop_self_heal() click to toggle source
# File lib/intrinio-realtime.rb, line 317
def stop_self_heal
  EM.cancel_timer(@selfheal_timer) if @selfheal_timer
  @selfheal_backoffs = Array.new(SELF_HEAL_BACKOFFS)
end
try_self_heal() click to toggle source
# File lib/intrinio-realtime.rb, line 302
def try_self_heal
  return if @closing
  debug "Attempting to self-heal"
  
  time = @selfheal_backoffs.first
  @selfheal_backoffs.delete_at(0) if @selfheal_backoffs.count > 1
  
  EM.cancel_timer(@heartbeat_timer) if @heartbeat_timer
  EM.cancel_timer(@selfheal_timer) if @selfheal_timer
  
  @selfheal_timer = EM.add_timer(time/1000) do
    connect()
  end
end
valid_api_key?(api_key) click to toggle source
# File lib/intrinio-realtime.rb, line 428
def valid_api_key?(api_key)
  return false unless api_key.is_a?(String)
  return false if api_key.empty?
  true
end