class NchanTools::Subscriber::WebSocketClient

Attributes

etag[RW]
last_modified[RW]
timeout[RW]
ws[RW]

Public Class Methods

aliases() click to toggle source
# File lib/nchan_tools/pubsub.rb, line 412
def self.aliases
  [:websocket, :ws]
end
new(subscr, opt={}) click to toggle source
Calls superclass method NchanTools::Subscriber::Client::new
# File lib/nchan_tools/pubsub.rb, line 514
def initialize(subscr, opt={})
  super
  @last_modified, @etag, @timeout = opt[:last_modified], opt[:etag], opt[:timeout].to_i || 10
  @connect_timeout = opt[:connect_timeout]
  @subscriber=subscr
  @subprotocol = opt[:subprotocol]
  @url=subscr.url
  @url = @url.gsub(/^h(ttp|2)(s)?:/, "ws\\2:")
  
  if opt[:permessage_deflate]
    @permessage_deflate = true
  end
  @permessage_deflate_max_window_bits = opt[:permessage_deflate_max_window_bits]
  @permessage_deflate_server_max_window_bits = opt[:permessage_deflate_server_max_window_bits]
  
  @concurrency=(opt[:concurrency] || opt[:clients] || 1).to_i
  @retry_delay=opt[:retry_delay]
  @ws = {}
  @connected=0
  @nomsg = opt[:nomsg]
  @http2 = opt[:http2]
  @extra_headers = opt[:extra_headers]
end

Public Instance Methods

close(bundle) click to toggle source
# File lib/nchan_tools/pubsub.rb, line 697
def close(bundle)
  if bundle then
    @ws.delete bundle
    bundle.sock.close unless bundle.sock.closed?
  end
  @connected -= 1
  if @connected <= 0 then
    until @ws.count == 0 do
      sleep 0.1
    end
    @cooked.signal true
  end
end
listen(bundle) click to toggle source
# File lib/nchan_tools/pubsub.rb, line 656
def listen(bundle)
  while @ws[bundle]
    begin
      bundle.read
    rescue IOError => e
      @subscriber.on_failure error(0, "Connection closed: #{e}"), bundle
      close bundle
      return false
    rescue EOFError
      bundle.sock.close
      close bundle
      return
    rescue Errno::ECONNRESET
      close bundle
      return
    end
  end
end
provides_msgid?() click to toggle source
# File lib/nchan_tools/pubsub.rb, line 509
def provides_msgid?
  @subprotocol == "ws+meta.nchan"
end
run(was_success = nil) click to toggle source
# File lib/nchan_tools/pubsub.rb, line 546
def run(was_success = nil)
  uri = URI.parse_possibly_unix_socket(@url)
  uri.port ||= (uri.scheme == "ws" || uri.scheme == "unix" ? 80 : 443)
  @cooked=Celluloid::Condition.new
  @connected = @concurrency
  if @http2
    @subscriber.on_failure error(0, "Refusing to try websocket over HTTP/2")
    @connected = 0
    @notready = 0
    @cooked_ready.signal false
    @cooked.signal true
    return
  end
  raise ArgumentError, "invalid websocket scheme #{uri.scheme} in #{@url}" unless uri.scheme == "unix" || uri.scheme.match(/^wss?$/)
  @notready=@concurrency
  if @timeout
    @timer = after(@timeout) do
      stop "Timeout"
    end
  end
  @concurrency.times do |i|
    begin
      sock = ParserBundle.new(uri).open_socket.sock
    rescue SystemCallError => e
      @subscriber.on_failure error(0, e.to_s)
      close nil
      return
    end
    
    if uri.scheme == "unix"
      hs_url="http://#{uri.host.match "[^/]+$"}#{uri.path}#{uri.query && "?#{uri.query}"}"
    else
      hs_url=@url
    end        
    
    bundle = WebSocketBundle.new hs_url, sock, id: i, permessage_deflate: @permessage_deflate, subprotocol: @subprotocol, logger: @logger, permessage_deflate_max_window_bits: @permessage_deflate_max_window_bits, permessage_deflate_server_max_window_bits: @permessage_deflate_server_max_window_bits, extra_headers: @extra_headers
    
    bundle.ws.on :open do |ev|
      bundle.connected = true
      @notready-=1
      @cooked_ready.signal true if @notready == 0
    end
    
    bundle.ws.on :ping do |ev|
      @subscriber.on(:ping).call ev, bundle
    end
    
    bundle.ws.on :pong do |ev|
      @subscriber.on(:pong).call ev, bundle
    end
    
    bundle.ws.on :error do |ev|
      http_error_match = ev.message.match(/Unexpected response code: (\d+)/)
      @subscriber.on_failure error(http_error_match ? http_error_match[1] : 0, ev.message, bundle)
      close bundle
    end
    
    bundle.ws.on :close do |ev|
      @subscriber.on_failure error(ev.code, ev.reason, bundle)
      bundle.connected = false
      close bundle
    end
    
    bundle.ws.on :message do |ev|
      @timer.reset if @timer
      
      data = ev.data
      if Array === data #binary String
        data = data.map(&:chr).join
        data.force_encoding "ASCII-8BIT"
        bundle.last_message_frame_type=:binary
      else
        bundle.last_message_frame_type=:text
      end
      
      if bundle.ws.protocol == "ws+meta.nchan"
        @meta_regex ||= /^id: (?<id>\d+:[^n]+)\n(content-type: (?<content_type>[^\n]+)\n)?\n(?<data>.*)/m
        match = @meta_regex.match data
        if not match
          @subscriber.on_failure error(0, "Invalid ws+meta.nchan message received")
          close bundle
        else
          if @nomsg 
            msg = match[:data]
          else
            msg= Message.new match[:data]
            msg.content_type = match[:content_type]
            msg.id = match[:id]
          end
        end
      else
        msg= @nomsg ? data : Message.new(data)
      end
      
      bundle.last_message_time=Time.now.to_f
      if @subscriber.on_message(msg, bundle) == false
        close bundle
      end
      
    end
    
    @ws[bundle]=true
    
    #handhsake
    bundle.send_handshake
    
    async.listen bundle
  end
end
send_binary(data) click to toggle source
# File lib/nchan_tools/pubsub.rb, line 693
def send_binary(data)
  ws_client.send_binary data
end
send_close(reason=nil, code=1000) click to toggle source
# File lib/nchan_tools/pubsub.rb, line 687
def send_close(reason=nil, code=1000)
  ws_client.send_close reason, code
end
send_data(data) click to toggle source
# File lib/nchan_tools/pubsub.rb, line 690
def send_data(data)
  ws_client.send_data data
end
send_ping(data=nil) click to toggle source
# File lib/nchan_tools/pubsub.rb, line 684
def send_ping(data=nil)
  ws_client.send_ping data
end
stop(msg = "Stopped", src_bundle = nil) click to toggle source
Calls superclass method NchanTools::Subscriber::Client#stop
# File lib/nchan_tools/pubsub.rb, line 538
def stop(msg = "Stopped", src_bundle = nil)
  super msg, (@ws.first && @ws.first.first)
  @ws.each do |b, v|
    close b
  end
  @timer.cancel if @timer
end

Private Instance Methods

ws_client() click to toggle source
# File lib/nchan_tools/pubsub.rb, line 675
def ws_client
  if @ws.first
    @ws.first.first
  else
    raise SubscriberError, "Websocket client connection gone"
  end
end