class XRBP::WebSocket::Client

Managed socket connection lifecycle and read/write operations

@private

Attributes

options[R]
url[R]

Public Class Methods

new(url, options={}) click to toggle source
# File lib/xrbp/websocket/client.rb, line 14
def initialize(url, options={})
  @url = url
  @options = options

  @handshaked = false
  @closed = true
  @completed = true
end

Public Instance Methods

add_work(&bl) click to toggle source

Add job to internal thread pool.

# File lib/xrbp/websocket/client.rb, line 37
def add_work(&bl)
  pool.post &bl
end
async_close(err=nil) click to toggle source

Allow close to be run via seperate thread so as not to block caller

# File lib/xrbp/websocket/client.rb, line 57
def async_close(err=nil)
  Thread.new { close(err)  }
end
close(err=nil) click to toggle source
# File lib/xrbp/websocket/client.rb, line 61
def close(err=nil)
  return if closed?

  # XXX set closed true first incase callbacks need to check this
  @closed = true
  @handshake = nil
  @handshaked = false

  terminate!

  send_data nil, :type => :close unless socket.pipe_broken
  emit_signal :close, err

  socket.close if socket
  @socket = nil

  pool.shutdown
  pool.wait_for_termination
  @pool = nil

  @completed = true
  emit :completed
  self
end
closed?() click to toggle source
# File lib/xrbp/websocket/client.rb, line 47
def closed?
  !!@closed
end
completed?() click to toggle source
# File lib/xrbp/websocket/client.rb, line 51
def completed?
  !!@completed
end
connect() click to toggle source
# File lib/xrbp/websocket/client.rb, line 23
def connect
  emit_signal :connecting

  @closed = false
  @completed = false
  socket.connect
  handshake!

  start_read

  self
end
open?() click to toggle source
# File lib/xrbp/websocket/client.rb, line 43
def open?
  handshake.finished? and !closed?
end
send_data(data, opt={:type => :text}) click to toggle source
# File lib/xrbp/websocket/client.rb, line 128
def send_data(data, opt={:type => :text})
  return if !handshaked? || closed?

  begin
    frame = data_frame(data, opt[:type])
    socket.write_nonblock(frame.to_s)

  rescue Errno::EPIPE, OpenSSL::SSL::SSLError => e
    async_close(e)
  end
end

Private Instance Methods

data_frame(data, type) click to toggle source
# File lib/xrbp/websocket/client.rb, line 120
def data_frame(data, type)
  ::WebSocket::Frame::Outgoing::Client.new(:data => data,
                                           :type => type,
                                        :version => handshake.version)
end
emit_signal(*args) click to toggle source
# File lib/xrbp/websocket/client.rb, line 171
def emit_signal(*args)
  # TODO add args to queue, and in add_work task, pull 1 item off queue
  #      & emit it (to enforce signal order)
  begin
    add_work do
      emit *args
    end

    # XXX: handle race condition where connection is closed
    #      between calling emit_signal and pool.post (handle
    #      error, otherwise a mutex would be needed)
  rescue Concurrent::RejectedExecutionError => e
    raise e unless closed?
  end
end
handshake() click to toggle source
# File lib/xrbp/websocket/client.rb, line 100
def handshake
  @handshake ||= ::WebSocket::Handshake::Client.new :url => url,
                                                :headers => options[:headers]
end
handshake!() click to toggle source
# File lib/xrbp/websocket/client.rb, line 109
def handshake!
  socket.write handshake.to_s

  until handshaked? || closed? #|| connection.force_quit?
    socket.read_next handshake
    @handshaked = handshake.finished?
  end
end
handshaked?() click to toggle source
# File lib/xrbp/websocket/client.rb, line 105
def handshaked?
  !!@handshaked
end
pool() click to toggle source
# File lib/xrbp/websocket/client.rb, line 94
def pool
  @pool ||= Concurrent::CachedThreadPool.new
end
socket() click to toggle source
# File lib/xrbp/websocket/client.rb, line 90
def socket
  @socket ||= Socket.new self
end
start_read() click to toggle source
# File lib/xrbp/websocket/client.rb, line 142
def start_read
  add_work do
    frame = ::WebSocket::Frame::Incoming::Client.new
    emit_signal :open

    cl = trm = eof = false
    until (trm = terminate?) || (cl = closed?) do
      begin
        socket.read_next(frame)

        if msg = frame.next
          emit_signal :message, msg
          frame = ::WebSocket::Frame::Incoming::Client.new
        end

      rescue EOFError => e
        emit_signal :error, e
        eof = e

      rescue => e
        emit_signal :error, e
      end
    end

    # ... is this right?:
    async_close(eof) if !!eof && !cl && !trm
  end
end