class XRBP::WebSocket::Client
Managed socket connection lifecycle and read/write operations
@private
Attributes
options[R]
url[R]
Public Class Methods
new(url, options={})
click to toggle source
# File lib/xrbp/websocket/client.rb, line 14 def initialize(url, options={}) @url = url @options = options @handshaked = false @closed = true @completed = true end
Public Instance Methods
add_work(&bl)
click to toggle source
Add job to internal thread pool.
# File lib/xrbp/websocket/client.rb, line 37 def add_work(&bl) pool.post &bl end
async_close(err=nil)
click to toggle source
Allow close to be run via seperate thread so as not to block caller
# File lib/xrbp/websocket/client.rb, line 57 def async_close(err=nil) Thread.new { close(err) } end
close(err=nil)
click to toggle source
# File lib/xrbp/websocket/client.rb, line 61 def close(err=nil) return if closed? # XXX set closed true first incase callbacks need to check this @closed = true @handshake = nil @handshaked = false terminate! send_data nil, :type => :close unless socket.pipe_broken emit_signal :close, err socket.close if socket @socket = nil pool.shutdown pool.wait_for_termination @pool = nil @completed = true emit :completed self end
closed?()
click to toggle source
# File lib/xrbp/websocket/client.rb, line 47 def closed? !!@closed end
completed?()
click to toggle source
# File lib/xrbp/websocket/client.rb, line 51 def completed? !!@completed end
connect()
click to toggle source
# File lib/xrbp/websocket/client.rb, line 23 def connect emit_signal :connecting @closed = false @completed = false socket.connect handshake! start_read self end
open?()
click to toggle source
# File lib/xrbp/websocket/client.rb, line 43 def open? handshake.finished? and !closed? end
send_data(data, opt={:type => :text})
click to toggle source
# File lib/xrbp/websocket/client.rb, line 128 def send_data(data, opt={:type => :text}) return if !handshaked? || closed? begin frame = data_frame(data, opt[:type]) socket.write_nonblock(frame.to_s) rescue Errno::EPIPE, OpenSSL::SSL::SSLError => e async_close(e) end end
Private Instance Methods
data_frame(data, type)
click to toggle source
# File lib/xrbp/websocket/client.rb, line 120 def data_frame(data, type) ::WebSocket::Frame::Outgoing::Client.new(:data => data, :type => type, :version => handshake.version) end
emit_signal(*args)
click to toggle source
# File lib/xrbp/websocket/client.rb, line 171 def emit_signal(*args) # TODO add args to queue, and in add_work task, pull 1 item off queue # & emit it (to enforce signal order) begin add_work do emit *args end # XXX: handle race condition where connection is closed # between calling emit_signal and pool.post (handle # error, otherwise a mutex would be needed) rescue Concurrent::RejectedExecutionError => e raise e unless closed? end end
handshake()
click to toggle source
# File lib/xrbp/websocket/client.rb, line 100 def handshake @handshake ||= ::WebSocket::Handshake::Client.new :url => url, :headers => options[:headers] end
handshake!()
click to toggle source
# File lib/xrbp/websocket/client.rb, line 109 def handshake! socket.write handshake.to_s until handshaked? || closed? #|| connection.force_quit? socket.read_next handshake @handshaked = handshake.finished? end end
handshaked?()
click to toggle source
# File lib/xrbp/websocket/client.rb, line 105 def handshaked? !!@handshaked end
pool()
click to toggle source
# File lib/xrbp/websocket/client.rb, line 94 def pool @pool ||= Concurrent::CachedThreadPool.new end
socket()
click to toggle source
# File lib/xrbp/websocket/client.rb, line 90 def socket @socket ||= Socket.new self end
start_read()
click to toggle source
# File lib/xrbp/websocket/client.rb, line 142 def start_read add_work do frame = ::WebSocket::Frame::Incoming::Client.new emit_signal :open cl = trm = eof = false until (trm = terminate?) || (cl = closed?) do begin socket.read_next(frame) if msg = frame.next emit_signal :message, msg frame = ::WebSocket::Frame::Incoming::Client.new end rescue EOFError => e emit_signal :error, e eof = e rescue => e emit_signal :error, e end end # ... is this right?: async_close(eof) if !!eof && !cl && !trm end end