class NchanTools::Publisher

Attributes

accept[RW]
channel_info[RW]
channel_info_type[RW]
extra_headers[RW]
messages[RW]
nofail[RW]
response[RW]
response_body[RW]
response_code[RW]
url[RW]
verbose[RW]
ws[RW]

Public Class Methods

new(url, opt={}) click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1662
def initialize(url, opt={})
  @url= url
  unless opt[:nostore]
    @messages = MessageStore.new :noid => true
    @messages.name = "pub"
  end
  @timeout = opt[:timeout]
  @accept = opt[:accept]
  @verbose = opt[:verbose]
  @on_response = opt[:on_response]
  @http2 = opt[:http2]
  
  @ws_wait_until_response = true
  
  if opt[:ws] || opt[:websocket]
    @ws = Subscriber.new url, 1, timeout: 100000, client: :websocket, permessage_deflate: opt[:permessage_deflate]
    @ws_sent_msg = []
    @ws.on_message do |msg|
      sent = @ws_sent_msg.shift
      if @messages && sent
        @messages << sent[:msg]
      end
      
      self.response=Typhoeus::Response.new
      self.response_code=200 #fake it
      self.response_body=msg
      
      sent[:response] = self.response
      sent[:condition].signal true if sent[:condition]
      
      @on_response.call(self.response_code, self.response_body) if @on_response
    end
    @ws.on_failure do |err|
      raise PublisherError, err
    end
    
    @ws.run
    @ws.wait :ready
  end
end

Public Instance Methods

delete() click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1894
def delete
  submit nil, :DELETE
end
get(accept_header=nil) click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1889
def get(accept_header=nil)
  self.accept=accept_header
  submit nil, :GET
  self.accept=nil
end
on_complete(&block) click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1771
def on_complete(&block)
  raise ArgumentError, "block must be given" unless block
  @on_complete = block
end
on_response(&block) click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1766
def on_response(&block)
  @on_response = block if block_given?
  @on_response
end
parse_channel_info(data, content_type=nil) click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1714
def parse_channel_info(data, content_type=nil)
  info = {}
  case content_type
  when "text/plain"
    mm = data.match(/^queued messages: (.*)\r$/)
    info[:messages] = mm[1].to_i if mm
    mm = data.match(/^last requested: (.*) sec\. ago\r$/)
    info[:last_requested] = mm[1].to_i if mm
    mm = data.match(/^active subscribers: (.*)\r$/)
    info[:subscribers] = mm[1].to_i if mm
    mm = data.match(/^last message id: (.*)$/)
    info[:last_message_id] = mm[1] if mm
    return info, :plain
  when "text/json", "application/json"
    begin
      info_json=JSON.parse data
    rescue JSON::ParserError => e
      return nil
    end
    info[:messages] = info_json["messages"].to_i
    info[:last_requested] = info_json["requested"].to_i
    info[:subscribers] = info_json["subscribers"].to_i
    info[:last_message_id] = info_json["last_message_id"]
    return info, :json
  when "application/xml", "text/xml"
    ix = Oga.parse_xml(data, :strict => true)
    info[:messages] = ix.at_xpath('//messages').text.to_i
    info[:last_requested] = ix.at_xpath('//requested').text.to_i
    info[:subscribers] = ix.at_xpath('//subscribers').text.to_i
    info[:last_message_id] = ix.at_xpath('//last_message_id').text
    return info, :xml
  when "application/yaml", "text/yaml"
    begin
      yam=YAML.load data
    rescue
      return nil
    end
    info[:messages] = yam["messages"].to_i
    info[:last_requested] = yam["requested"].to_i
    info[:subscribers] = yam["subscribers"].to_i
    info[:last_message_id] = yam["last_message_id"]
    return info, :yaml
  when nil
    ["text/plain", "text/json", "text/xml", "text/yaml"].each do |try_content_type|
      ret, type = parse_channel_info data, try_content_type
      return ret, type if ret
    end
  else
    raise PublisherError, "Unexpected content-type #{content_type}"
  end
end
post(body, content_type=nil, es_event=nil, &block) click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1897
def post(body, content_type=nil, es_event=nil, &block)
  submit body, :POST, content_type, es_event, &block
end
put(body, content_type=nil, es_event=nil, &block) click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1900
def put(body, content_type=nil, es_event=nil, &block)
  submit body, :PUT, content_type, es_event, &block
end
reset() click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1904
def reset
  @messages.clear    
end
submit(body, method=:POST, content_type= :'text/plain', eventsource_event=nil, &block) click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1797
def submit(body, method=:POST, content_type= :'text/plain', eventsource_event=nil, &block)
  self.response=nil
  self.response_code=nil
  self.response_body=nil

  if Enumerable===body
    i=0
    body.each{|b| i+=1; submit(b, method, content_type, &block)}
    return i
  end
  
  return submit_ws body, content_type, &block if @ws
  
  headers = {:'Content-Type' => content_type, :'Accept' => accept}
  headers[:'X-Eventsource-Event'] = eventsource_event if eventsource_event
  headers.merge! @extra_headers if @extra_headers
  
  post = Typhoeus::Request.new(
    @url,
    headers: headers,
    method: method,
    body: body,
    timeout: @timeout || PUBLISH_TIMEOUT,
    connecttimeout: @timeout || PUBLISH_TIMEOUT,
    verbose: @verbose,
    http_version: @http2 ? :httpv2_0 : :none
  )
  if body && @messages
    msg=Message.new body
    msg.content_type=content_type
    msg.eventsource_event=eventsource_event
  end
  if @on_complete
    post.on_complete @on_complete
  else
    post.on_complete do |response|
      self.response=response
      self.response_code=response.code
      self.response_body=response.body
      if response.success?
        #puts "published message #{msg.to_s[0..15]}"
        @channel_info, @channel_info_type = parse_channel_info response.body, response.headers["Content-Type"]
        if @messages && msg
          msg.id = @channel_info[:last_message_id] if @channel_info
          @messages << msg
        end
        
      elsif response.timed_out?
        # aw hell no
        #puts "publisher err: timeout"
        
        pub_url=URI.parse_possibly_unix_socket(response.request.url)
        pub_url = "#{pub_url.path}#{pub_url.query ? "?#{pub_url.query}" : nil}"
        raise PublisherError, "Publisher #{response.request.options[:method]} to #{pub_url} timed out."
      elsif response.code == 0
        # Could not get an http response, something's wrong.
        #puts "publisher err: #{response.return_message}"
        errmsg="No HTTP response: #{response.return_message}"
        unless self.nofail then
          raise PublisherError, errmsg
        end
      else
        # Received a non-successful http response.
        #puts "publisher err: #{response.code.to_s}"
        errmsg="HTTP request failed: #{response.code.to_s}"
        unless self.nofail then
          raise PublisherError, errmsg
        end
      end
      block.call(self.response_code, self.response_body) if block
      on_response.call(self.response_code, self.response_body) if on_response
    end
  end
  #puts "publishing to #{@url}"
  begin
    post.run
  rescue Exception => e
    last=nil, i=0
    e.backtrace.select! do |bt|
      if bt.match(/(gems\/(typhoeus|ethon)|pubsub\.rb)/)
        last=i
        false
      else
        i+=1
        true
      end 
    end
    e.backtrace.insert last, "..."
    raise PublisherError, e
  end
end
terminate() click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1793
def terminate
  @ws.terminate if @ws
end
with_url(alt_url) { || ... } click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1703
def with_url(alt_url)
  prev_url=@url
  @url=alt_url
  if block_given?
    yield
    @url=prev_url
  else
    self
  end
end

Private Instance Methods

submit_ws(body, content_type, &block) click to toggle source
# File lib/nchan_tools/pubsub.rb, line 1776
def submit_ws(body, content_type, &block)
  sent = {condition: Celluloid::Condition.new}
  sent[:msg] = body && @messages ? Message.new(body) : body
  @ws_sent_msg << sent
  if content_type == "application/octet-stream"
    @ws.client.send_binary(body)
  else
    @ws.client.send_data(body)
  end
  if @ws_wait_until_response
    while not sent[:response] do
      Celluloid.sleep 0.1
    end
  end
  sent[:msg]
end