class NchanTools::Publisher
Attributes
accept[RW]
channel_info[RW]
channel_info_type[RW]
extra_headers[RW]
messages[RW]
nofail[RW]
response[RW]
response_body[RW]
response_code[RW]
url[RW]
verbose[RW]
ws[RW]
Public Class Methods
new(url, opt={})
click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1662 def initialize(url, opt={}) @url= url unless opt[:nostore] @messages = MessageStore.new :noid => true @messages.name = "pub" end @timeout = opt[:timeout] @accept = opt[:accept] @verbose = opt[:verbose] @on_response = opt[:on_response] @http2 = opt[:http2] @ws_wait_until_response = true if opt[:ws] || opt[:websocket] @ws = Subscriber.new url, 1, timeout: 100000, client: :websocket, permessage_deflate: opt[:permessage_deflate] @ws_sent_msg = [] @ws.on_message do |msg| sent = @ws_sent_msg.shift if @messages && sent @messages << sent[:msg] end self.response=Typhoeus::Response.new self.response_code=200 #fake it self.response_body=msg sent[:response] = self.response sent[:condition].signal true if sent[:condition] @on_response.call(self.response_code, self.response_body) if @on_response end @ws.on_failure do |err| raise PublisherError, err end @ws.run @ws.wait :ready end end
Public Instance Methods
delete()
click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1894 def delete submit nil, :DELETE end
get(accept_header=nil)
click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1889 def get(accept_header=nil) self.accept=accept_header submit nil, :GET self.accept=nil end
on_complete(&block)
click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1771 def on_complete(&block) raise ArgumentError, "block must be given" unless block @on_complete = block end
on_response(&block)
click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1766 def on_response(&block) @on_response = block if block_given? @on_response end
parse_channel_info(data, content_type=nil)
click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1714 def parse_channel_info(data, content_type=nil) info = {} case content_type when "text/plain" mm = data.match(/^queued messages: (.*)\r$/) info[:messages] = mm[1].to_i if mm mm = data.match(/^last requested: (.*) sec\. ago\r$/) info[:last_requested] = mm[1].to_i if mm mm = data.match(/^active subscribers: (.*)\r$/) info[:subscribers] = mm[1].to_i if mm mm = data.match(/^last message id: (.*)$/) info[:last_message_id] = mm[1] if mm return info, :plain when "text/json", "application/json" begin info_json=JSON.parse data rescue JSON::ParserError => e return nil end info[:messages] = info_json["messages"].to_i info[:last_requested] = info_json["requested"].to_i info[:subscribers] = info_json["subscribers"].to_i info[:last_message_id] = info_json["last_message_id"] return info, :json when "application/xml", "text/xml" ix = Oga.parse_xml(data, :strict => true) info[:messages] = ix.at_xpath('//messages').text.to_i info[:last_requested] = ix.at_xpath('//requested').text.to_i info[:subscribers] = ix.at_xpath('//subscribers').text.to_i info[:last_message_id] = ix.at_xpath('//last_message_id').text return info, :xml when "application/yaml", "text/yaml" begin yam=YAML.load data rescue return nil end info[:messages] = yam["messages"].to_i info[:last_requested] = yam["requested"].to_i info[:subscribers] = yam["subscribers"].to_i info[:last_message_id] = yam["last_message_id"] return info, :yaml when nil ["text/plain", "text/json", "text/xml", "text/yaml"].each do |try_content_type| ret, type = parse_channel_info data, try_content_type return ret, type if ret end else raise PublisherError, "Unexpected content-type #{content_type}" end end
post(body, content_type=nil, es_event=nil, &block)
click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1897 def post(body, content_type=nil, es_event=nil, &block) submit body, :POST, content_type, es_event, &block end
put(body, content_type=nil, es_event=nil, &block)
click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1900 def put(body, content_type=nil, es_event=nil, &block) submit body, :PUT, content_type, es_event, &block end
reset()
click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1904 def reset @messages.clear end
submit(body, method=:POST, content_type= :'text/plain', eventsource_event=nil, &block)
click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1797 def submit(body, method=:POST, content_type= :'text/plain', eventsource_event=nil, &block) self.response=nil self.response_code=nil self.response_body=nil if Enumerable===body i=0 body.each{|b| i+=1; submit(b, method, content_type, &block)} return i end return submit_ws body, content_type, &block if @ws headers = {:'Content-Type' => content_type, :'Accept' => accept} headers[:'X-Eventsource-Event'] = eventsource_event if eventsource_event headers.merge! @extra_headers if @extra_headers post = Typhoeus::Request.new( @url, headers: headers, method: method, body: body, timeout: @timeout || PUBLISH_TIMEOUT, connecttimeout: @timeout || PUBLISH_TIMEOUT, verbose: @verbose, http_version: @http2 ? :httpv2_0 : :none ) if body && @messages msg=Message.new body msg.content_type=content_type msg.eventsource_event=eventsource_event end if @on_complete post.on_complete @on_complete else post.on_complete do |response| self.response=response self.response_code=response.code self.response_body=response.body if response.success? #puts "published message #{msg.to_s[0..15]}" @channel_info, @channel_info_type = parse_channel_info response.body, response.headers["Content-Type"] if @messages && msg msg.id = @channel_info[:last_message_id] if @channel_info @messages << msg end elsif response.timed_out? # aw hell no #puts "publisher err: timeout" pub_url=URI.parse_possibly_unix_socket(response.request.url) pub_url = "#{pub_url.path}#{pub_url.query ? "?#{pub_url.query}" : nil}" raise PublisherError, "Publisher #{response.request.options[:method]} to #{pub_url} timed out." elsif response.code == 0 # Could not get an http response, something's wrong. #puts "publisher err: #{response.return_message}" errmsg="No HTTP response: #{response.return_message}" unless self.nofail then raise PublisherError, errmsg end else # Received a non-successful http response. #puts "publisher err: #{response.code.to_s}" errmsg="HTTP request failed: #{response.code.to_s}" unless self.nofail then raise PublisherError, errmsg end end block.call(self.response_code, self.response_body) if block on_response.call(self.response_code, self.response_body) if on_response end end #puts "publishing to #{@url}" begin post.run rescue Exception => e last=nil, i=0 e.backtrace.select! do |bt| if bt.match(/(gems\/(typhoeus|ethon)|pubsub\.rb)/) last=i false else i+=1 true end end e.backtrace.insert last, "..." raise PublisherError, e end end
terminate()
click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1793 def terminate @ws.terminate if @ws end
with_url(alt_url) { || ... }
click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1703 def with_url(alt_url) prev_url=@url @url=alt_url if block_given? yield @url=prev_url else self end end
Private Instance Methods
submit_ws(body, content_type, &block)
click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1776 def submit_ws(body, content_type, &block) sent = {condition: Celluloid::Condition.new} sent[:msg] = body && @messages ? Message.new(body) : body @ws_sent_msg << sent if content_type == "application/octet-stream" @ws.client.send_binary(body) else @ws.client.send_data(body) end if @ws_wait_until_response while not sent[:response] do Celluloid.sleep 0.1 end end sent[:msg] end