class NchanTools::Subscriber::LongPollClient

Attributes

timeout[RW]

Public Class Methods

aliases() click to toggle source
# File lib/nchan_tools/pubsub.rb, line 715
def self.aliases
  [:longpoll]
end
new(subscr, opt={}) click to toggle source
Calls superclass method NchanTools::Subscriber::Client::new
# File lib/nchan_tools/pubsub.rb, line 979
def initialize(subscr, opt={})
  super
  @last_modified, @etag, @timeout = opt[:last_modified], opt[:etag], opt[:timeout].to_i || 10
  @connect_timeout = opt[:connect_timeout]
  @subscriber=subscr
  @url=subscr.url
  @concurrency=opt[:concurrency] || opt[:clients] || 1
  @gzip=opt[:gzip]
  @retry_delay=opt[:retry_delay]
  @nomsg=opt[:nomsg]
  @bundles={}
  @body_buf=""
  @extra_headers = opt[:extra_headers]
  @verbose=opt[:verbose]
  @http2=opt[:http2] || opt[:h2]
end

Public Instance Methods

close(bundle) click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1120
def close(bundle)
  if bundle
    bundle.done=true
    bundle.sock.close unless bundle.sock.closed?
    @bundles.delete bundle
  end
  @connected -= 1
  if @connected <= 0
    @cooked.signal true
  end
end
error(*args) click to toggle source
Calls superclass method NchanTools::Subscriber::Client#error
# File lib/nchan_tools/pubsub.rb, line 719
def error(*args)
  @error_what||= ["#{@http2 ? "HTTP/2" : "HTTP"} Request"]
  super
end
listen(bundle) click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1108
def listen(bundle)
  loop do
    begin
      return false if bundle.read == false
    rescue EOFError
      @subscriber.on_failure error(0, "Server Closed Connection"), bundle
      close bundle
      return false
    end
  end
end
new_bundle(uri, opt={}) click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1051
def new_bundle(uri, opt={})
  opt[:headers]||={}
  if @extra_headers
    opt[:headers].merge! @extra_headers
  end
  if @gzip
    opt[:headers]["Accept-Encoding"]="gzip, deflate"
  end
  b=(@http2 ? HTTP2Bundle : HTTPBundle).new(uri, opt)
  b.on_error do |msg, err|
    handle_bundle_error b, msg, err
  end
  b.verbose=@verbose
  setup_bundle b
  b
end
request_code_ok(code, bundle) click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1031
def request_code_ok(code, bundle)
  if code != 200
    if code == 304 || code == 408
      @subscriber.on_failure error(code, "", bundle)
      @subscriber.finished+=1
      close bundle
    elsif @subscriber.on_failure(error(code, "", bundle)) == false
      @subscriber.finished+=1
      close bundle
    else
      Celluloid.sleep @retry_delay if @retry_delay
      bundle.send_GET
    end
    false
  else
    @timer.reset if @timer
    true
  end
end
run(was_success = nil) click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1004
def run(was_success = nil)
  uri = URI.parse_possibly_unix_socket(@url)
  uri.port||= uri.scheme.match(/^(ws|http)$/) ? 80 : 443
  @cooked=Celluloid::Condition.new
  @connected = @concurrency
  @notready = @concurrency
  @timer.cancel if @timer
  if @timeout
    @timer = after(@timeout) do 
      stop "Timeout"
    end
  end
  @concurrency.times do |i|
    begin
      bundle = new_bundle(uri, id: i, useragent: "pubsub.rb #{self.class.name} #{@use_http2 ? "(http/2)" : ""} ##{i}", logger: @logger)
    rescue SystemCallError => e
      @subscriber.on_failure error(0, e.to_s)
      close nil
      return
    end
    
    @bundles[bundle]=true
    bundle.send_GET @last_modified, @etag
    async.listen bundle
  end
end
setup_bundle(b) click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1068
def setup_bundle(b)
  b.buffer_body!
  b.on_response do |code, headers, body|
    @subscriber.waiting-=1
    # Headers and body is all parsed
    b.last_modified = headers["Last-Modified"]
    b.etag = headers["Etag"]
    b.request_time = Time.now.to_f - b.time_requested
    if request_code_ok(code, b)
      on_message_ret=nil
      Message.each_multipart_message(headers["Content-Type"], body) do |content_type, msg_body, multi|
        unless @nomsg
          msg=Message.new msg_body.dup
          msg.content_type=content_type
          unless multi
            msg.last_modified= headers["Last-Modified"]
            msg.etag= headers["Etag"]
          end
        else
          msg=msg_body.dup
        end
        
        on_message_ret= @subscriber.on_message(msg, b)
      end
      
      unless on_message_ret == false
        @subscriber.waiting+=1
        b.send_GET
      else
        @subscriber.finished+=1
        close b
      end
    end
  end
  
  b.on_error do |msg, err|
    handle_bundle_error b, msg, err
  end
end
stop(msg="Stopped", src_bundle=nil) click to toggle source
Calls superclass method NchanTools::Subscriber::Client#stop
# File lib/nchan_tools/pubsub.rb, line 996
def stop(msg="Stopped", src_bundle=nil)
  super msg, (@bundles.first && @bundles.first.first)
  @bundles.each do |b, v|
    close b
  end
  @timer.cancel if @timer
end