class NchanTools::Subscriber::LongPollClient
Attributes
timeout[RW]
Public Class Methods
aliases()
click to toggle source
# File lib/nchan_tools/pubsub.rb, line 715 def self.aliases [:longpoll] end
new(subscr, opt={})
click to toggle source
Calls superclass method
NchanTools::Subscriber::Client::new
# File lib/nchan_tools/pubsub.rb, line 979 def initialize(subscr, opt={}) super @last_modified, @etag, @timeout = opt[:last_modified], opt[:etag], opt[:timeout].to_i || 10 @connect_timeout = opt[:connect_timeout] @subscriber=subscr @url=subscr.url @concurrency=opt[:concurrency] || opt[:clients] || 1 @gzip=opt[:gzip] @retry_delay=opt[:retry_delay] @nomsg=opt[:nomsg] @bundles={} @body_buf="" @extra_headers = opt[:extra_headers] @verbose=opt[:verbose] @http2=opt[:http2] || opt[:h2] end
Public Instance Methods
close(bundle)
click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1120 def close(bundle) if bundle bundle.done=true bundle.sock.close unless bundle.sock.closed? @bundles.delete bundle end @connected -= 1 if @connected <= 0 @cooked.signal true end end
error(*args)
click to toggle source
Calls superclass method
NchanTools::Subscriber::Client#error
# File lib/nchan_tools/pubsub.rb, line 719 def error(*args) @error_what||= ["#{@http2 ? "HTTP/2" : "HTTP"} Request"] super end
listen(bundle)
click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1108 def listen(bundle) loop do begin return false if bundle.read == false rescue EOFError @subscriber.on_failure error(0, "Server Closed Connection"), bundle close bundle return false end end end
new_bundle(uri, opt={})
click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1051 def new_bundle(uri, opt={}) opt[:headers]||={} if @extra_headers opt[:headers].merge! @extra_headers end if @gzip opt[:headers]["Accept-Encoding"]="gzip, deflate" end b=(@http2 ? HTTP2Bundle : HTTPBundle).new(uri, opt) b.on_error do |msg, err| handle_bundle_error b, msg, err end b.verbose=@verbose setup_bundle b b end
request_code_ok(code, bundle)
click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1031 def request_code_ok(code, bundle) if code != 200 if code == 304 || code == 408 @subscriber.on_failure error(code, "", bundle) @subscriber.finished+=1 close bundle elsif @subscriber.on_failure(error(code, "", bundle)) == false @subscriber.finished+=1 close bundle else Celluloid.sleep @retry_delay if @retry_delay bundle.send_GET end false else @timer.reset if @timer true end end
run(was_success = nil)
click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1004 def run(was_success = nil) uri = URI.parse_possibly_unix_socket(@url) uri.port||= uri.scheme.match(/^(ws|http)$/) ? 80 : 443 @cooked=Celluloid::Condition.new @connected = @concurrency @notready = @concurrency @timer.cancel if @timer if @timeout @timer = after(@timeout) do stop "Timeout" end end @concurrency.times do |i| begin bundle = new_bundle(uri, id: i, useragent: "pubsub.rb #{self.class.name} #{@use_http2 ? "(http/2)" : ""} ##{i}", logger: @logger) rescue SystemCallError => e @subscriber.on_failure error(0, e.to_s) close nil return end @bundles[bundle]=true bundle.send_GET @last_modified, @etag async.listen bundle end end
setup_bundle(b)
click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1068 def setup_bundle(b) b.buffer_body! b.on_response do |code, headers, body| @subscriber.waiting-=1 # Headers and body is all parsed b.last_modified = headers["Last-Modified"] b.etag = headers["Etag"] b.request_time = Time.now.to_f - b.time_requested if request_code_ok(code, b) on_message_ret=nil Message.each_multipart_message(headers["Content-Type"], body) do |content_type, msg_body, multi| unless @nomsg msg=Message.new msg_body.dup msg.content_type=content_type unless multi msg.last_modified= headers["Last-Modified"] msg.etag= headers["Etag"] end else msg=msg_body.dup end on_message_ret= @subscriber.on_message(msg, b) end unless on_message_ret == false @subscriber.waiting+=1 b.send_GET else @subscriber.finished+=1 close b end end end b.on_error do |msg, err| handle_bundle_error b, msg, err end end
stop(msg="Stopped", src_bundle=nil)
click to toggle source
Calls superclass method
NchanTools::Subscriber::Client#stop
# File lib/nchan_tools/pubsub.rb, line 996 def stop(msg="Stopped", src_bundle=nil) super msg, (@bundles.first && @bundles.first.first) @bundles.each do |b, v| close b end @timer.cancel if @timer end