class NchanTools::Subscriber::WebSocketClient
Attributes
etag[RW]
last_modified[RW]
timeout[RW]
ws[RW]
Public Class Methods
aliases()
click to toggle source
# File lib/nchan_tools/pubsub.rb, line 412 def self.aliases [:websocket, :ws] end
new(subscr, opt={})
click to toggle source
Calls superclass method
NchanTools::Subscriber::Client::new
# File lib/nchan_tools/pubsub.rb, line 514 def initialize(subscr, opt={}) super @last_modified, @etag, @timeout = opt[:last_modified], opt[:etag], opt[:timeout].to_i || 10 @connect_timeout = opt[:connect_timeout] @subscriber=subscr @subprotocol = opt[:subprotocol] @url=subscr.url @url = @url.gsub(/^h(ttp|2)(s)?:/, "ws\\2:") if opt[:permessage_deflate] @permessage_deflate = true end @permessage_deflate_max_window_bits = opt[:permessage_deflate_max_window_bits] @permessage_deflate_server_max_window_bits = opt[:permessage_deflate_server_max_window_bits] @concurrency=(opt[:concurrency] || opt[:clients] || 1).to_i @retry_delay=opt[:retry_delay] @ws = {} @connected=0 @nomsg = opt[:nomsg] @http2 = opt[:http2] @extra_headers = opt[:extra_headers] end
Public Instance Methods
close(bundle)
click to toggle source
# File lib/nchan_tools/pubsub.rb, line 697 def close(bundle) if bundle then @ws.delete bundle bundle.sock.close unless bundle.sock.closed? end @connected -= 1 if @connected <= 0 then until @ws.count == 0 do sleep 0.1 end @cooked.signal true end end
listen(bundle)
click to toggle source
# File lib/nchan_tools/pubsub.rb, line 656 def listen(bundle) while @ws[bundle] begin bundle.read rescue IOError => e @subscriber.on_failure error(0, "Connection closed: #{e}"), bundle close bundle return false rescue EOFError bundle.sock.close close bundle return rescue Errno::ECONNRESET close bundle return end end end
provides_msgid?()
click to toggle source
# File lib/nchan_tools/pubsub.rb, line 509 def provides_msgid? @subprotocol == "ws+meta.nchan" end
run(was_success = nil)
click to toggle source
# File lib/nchan_tools/pubsub.rb, line 546 def run(was_success = nil) uri = URI.parse_possibly_unix_socket(@url) uri.port ||= (uri.scheme == "ws" || uri.scheme == "unix" ? 80 : 443) @cooked=Celluloid::Condition.new @connected = @concurrency if @http2 @subscriber.on_failure error(0, "Refusing to try websocket over HTTP/2") @connected = 0 @notready = 0 @cooked_ready.signal false @cooked.signal true return end raise ArgumentError, "invalid websocket scheme #{uri.scheme} in #{@url}" unless uri.scheme == "unix" || uri.scheme.match(/^wss?$/) @notready=@concurrency if @timeout @timer = after(@timeout) do stop "Timeout" end end @concurrency.times do |i| begin sock = ParserBundle.new(uri).open_socket.sock rescue SystemCallError => e @subscriber.on_failure error(0, e.to_s) close nil return end if uri.scheme == "unix" hs_url="http://#{uri.host.match "[^/]+$"}#{uri.path}#{uri.query && "?#{uri.query}"}" else hs_url=@url end bundle = WebSocketBundle.new hs_url, sock, id: i, permessage_deflate: @permessage_deflate, subprotocol: @subprotocol, logger: @logger, permessage_deflate_max_window_bits: @permessage_deflate_max_window_bits, permessage_deflate_server_max_window_bits: @permessage_deflate_server_max_window_bits, extra_headers: @extra_headers bundle.ws.on :open do |ev| bundle.connected = true @notready-=1 @cooked_ready.signal true if @notready == 0 end bundle.ws.on :ping do |ev| @subscriber.on(:ping).call ev, bundle end bundle.ws.on :pong do |ev| @subscriber.on(:pong).call ev, bundle end bundle.ws.on :error do |ev| http_error_match = ev.message.match(/Unexpected response code: (\d+)/) @subscriber.on_failure error(http_error_match ? http_error_match[1] : 0, ev.message, bundle) close bundle end bundle.ws.on :close do |ev| @subscriber.on_failure error(ev.code, ev.reason, bundle) bundle.connected = false close bundle end bundle.ws.on :message do |ev| @timer.reset if @timer data = ev.data if Array === data #binary String data = data.map(&:chr).join data.force_encoding "ASCII-8BIT" bundle.last_message_frame_type=:binary else bundle.last_message_frame_type=:text end if bundle.ws.protocol == "ws+meta.nchan" @meta_regex ||= /^id: (?<id>\d+:[^n]+)\n(content-type: (?<content_type>[^\n]+)\n)?\n(?<data>.*)/m match = @meta_regex.match data if not match @subscriber.on_failure error(0, "Invalid ws+meta.nchan message received") close bundle else if @nomsg msg = match[:data] else msg= Message.new match[:data] msg.content_type = match[:content_type] msg.id = match[:id] end end else msg= @nomsg ? data : Message.new(data) end bundle.last_message_time=Time.now.to_f if @subscriber.on_message(msg, bundle) == false close bundle end end @ws[bundle]=true #handhsake bundle.send_handshake async.listen bundle end end
send_binary(data)
click to toggle source
# File lib/nchan_tools/pubsub.rb, line 693 def send_binary(data) ws_client.send_binary data end
send_close(reason=nil, code=1000)
click to toggle source
# File lib/nchan_tools/pubsub.rb, line 687 def send_close(reason=nil, code=1000) ws_client.send_close reason, code end
send_data(data)
click to toggle source
# File lib/nchan_tools/pubsub.rb, line 690 def send_data(data) ws_client.send_data data end
send_ping(data=nil)
click to toggle source
# File lib/nchan_tools/pubsub.rb, line 684 def send_ping(data=nil) ws_client.send_ping data end
stop(msg = "Stopped", src_bundle = nil)
click to toggle source
Calls superclass method
NchanTools::Subscriber::Client#stop
# File lib/nchan_tools/pubsub.rb, line 538 def stop(msg = "Stopped", src_bundle = nil) super msg, (@ws.first && @ws.first.first) @ws.each do |b, v| close b end @timer.cancel if @timer end
Private Instance Methods
ws_client()
click to toggle source
# File lib/nchan_tools/pubsub.rb, line 675 def ws_client if @ws.first @ws.first.first else raise SubscriberError, "Websocket client connection gone" end end