class Pubnub

Constants

MSG_TOO_LARGE_RESPONSE
ORIGIN_HOST
SUCCESS_RESPONSE
TIMEOUT_BAD_JSON_RESPONSE
TIMEOUT_BAD_RESPONSE_CODE
TIMEOUT_GENERAL_ERROR
TIMEOUT_NON_SUBSCRIBE
TIMEOUT_SUBSCRIBE

Attributes

channel[RW]
cipher_key[RW]
origin[RW]
publish_key[RW]
secret_key[RW]
session_uuid[RW]
ssl[RW]
subscribe_key[RW]

Public Class Methods

new(*args) click to toggle source

ORIGIN_HOST = ‘test.pubnub.com’

# File lib/pubnub.rb, line 55
def initialize(*args)

  if args.size == 5 # passing in named parameters

    @publish_key = args[0].to_s
    @subscribe_key = args[1].to_s
    @secret_key = args[2].to_s
    @cipher_key = args[3].to_s
    @ssl = args[4]

  elsif args.size == 1 && args[0].class == Hash # passing in an options hash

    options_hash = HashWithIndifferentAccess.new(args[0])
    @publish_key = options_hash[:publish_key].blank? ? nil : options_hash[:publish_key].to_s
    @subscribe_key = options_hash[:subscribe_key].blank? ? nil : options_hash[:subscribe_key].to_s
    @secret_key = options_hash[:secret_key].blank? ? nil : options_hash[:secret_key].to_s
    @cipher_key = options_hash[:cipher_key].blank? ? nil : options_hash[:cipher_key].to_s
    @ssl = options_hash[:ssl].blank? ? false : true
    @logger = options_hash[:logger]

  else
    raise(InitError, "Initialize with either a hash of options, or exactly 5 named parameters.")
  end

  @session_uuid = uuid
  verify_init
end

Public Instance Methods

check_for_em(request) click to toggle source
# File lib/pubnub.rb, line 269
def check_for_em request
  if EM.reactor_running?
    _request(request, true)
  else
    EM.run do
      _request(request)
    end
  end
end
detailed_history(options = nil) click to toggle source
# File lib/pubnub.rb, line 177
def detailed_history(options = nil)
  usage_error = "detailed_history() requires :channel, :callback, and :count options."
  if options.class != Hash
    raise(ArgumentError, usage_error)
  end

  options = HashWithIndifferentAccess.new(options) unless (options == nil)

  unless options[:count] && options[:channel] && options[:callback]
    raise(ArgumentError, usage_error)
  end


  detailed_history_request = PubnubRequest.new(:operation => :detailed_history)

  #TODO: refactor into initializer code on request instantiation

  # /detailed_history/SUBSCRIBE_KEY/CHANNEL/JSONP_CALLBACK/LIMIT

  detailed_history_request.ssl = @ssl
  detailed_history_request.set_channel(options)
  detailed_history_request.set_callback(options)
  detailed_history_request.set_cipher_key(options, self.cipher_key)

  detailed_history_request.set_subscribe_key(options, self.subscribe_key)

  detailed_history_request.history_count = options[:count]
  detailed_history_request.history_start = options[:start]
  detailed_history_request.history_end = options[:end]
  detailed_history_request.history_reverse = options[:reverse]

  detailed_history_request.format_url!
  check_for_em detailed_history_request

end
here_now(options = nil) click to toggle source
# File lib/pubnub.rb, line 152
def here_now(options = nil)
  usage_error = "here_now() requires :channel and :callback options."
  if options.class != Hash
    raise(ArgumentError, usage_error)
  end

  options = HashWithIndifferentAccess.new(options) unless (options == nil)

  unless options[:channel] && options[:callback]
    raise(ArgumentError, usage_error)
  end

  here_now_request = PubnubRequest.new(:operation => :here_now)

  here_now_request.ssl = @ssl
  here_now_request.set_channel(options)
  here_now_request.set_callback(options)

  here_now_request.set_subscribe_key(options, self.subscribe_key)

  here_now_request.format_url!
  check_for_em here_now_request

end
history(options = nil) click to toggle source
# File lib/pubnub.rb, line 213
def history(options = nil)
  usage_error = "history() requires :channel, :callback, and :limit options."
  if options.class != Hash
    raise(ArgumentError, usage_error)
  end

  options = HashWithIndifferentAccess.new(options) unless (options == nil)

  unless options[:limit] && options[:channel] && options[:callback]
    raise(ArgumentError, usage_error)
  end


  history_request = PubnubRequest.new(:operation => :history)

  #TODO: refactor into initializer code on request instantiation

  # /history/SUBSCRIBE_KEY/CHANNEL/JSONP_CALLBACK/LIMIT

  history_request.ssl = @ssl
  history_request.set_channel(options)
  history_request.set_callback(options)
  history_request.set_cipher_key(options, self.cipher_key)

  history_request.set_subscribe_key(options, self.subscribe_key)
  history_request.history_limit = options[:limit]

  history_request.format_url!
  check_for_em history_request

end
my_callback(x, quiet = false) click to toggle source
# File lib/pubnub.rb, line 256
def my_callback(x, quiet = false)
  if quiet !=false
    puts("mycallback says: #{x.to_s}")
  else
    ""
  end

end
presence(options) click to toggle source
# File lib/pubnub.rb, line 135
def presence(options)
  usage_error = "presence() requires :channel and :callback options."
  if options.class != Hash
    raise(ArgumentError, usage_error)
  end

  options = HashWithIndifferentAccess.new(options) unless (options == nil)

  unless options[:channel] && options[:callback]
    raise(ArgumentError, usage_error)
  end

  subscribe(options.merge(:operation => "presence"))

end
publish(options) click to toggle source
# File lib/pubnub.rb, line 89
def publish(options)
  options = HashWithIndifferentAccess.new(options)
  publish_request = PubnubRequest.new(:operation => :publish)

  #TODO: refactor into initializer code on request instantiation

  publish_request.ssl = @ssl
  publish_request.set_origin(options)
  publish_request.set_channel(options)
  publish_request.set_callback(options)
  publish_request.set_cipher_key(options, self.cipher_key)
  publish_request.set_message(options, self.cipher_key)
  publish_request.set_publish_key(options, self.publish_key)
  publish_request.set_subscribe_key(options, self.subscribe_key)
  publish_request.set_secret_key(options, self.secret_key)


  publish_request.format_url!

  check_for_em publish_request
end
subscribe(options) click to toggle source
# File lib/pubnub.rb, line 111
def subscribe(options)
  options = HashWithIndifferentAccess.new(options)

  operation = options[:operation].nil? ? :subscribe : :presence

  subscribe_request = PubnubRequest.new(:operation => operation, :session_uuid => @session_uuid)

  #TODO: refactor into initializer code on request instantiation

  subscribe_request.ssl = @ssl
  subscribe_request.set_origin(options)
  subscribe_request.set_channel(options)
  subscribe_request.set_callback(options)
  subscribe_request.set_cipher_key(options, self.cipher_key) unless subscribe_request.operation == "presence"

  subscribe_request.set_subscribe_key(options, self.subscribe_key)

  format_url_options = options[:override_timetoken].present? ? options[:override_timetoken] : nil
  subscribe_request.format_url!(format_url_options)

  check_for_em subscribe_request

end
time(options) click to toggle source
# File lib/pubnub.rb, line 245
def time(options)
  options = HashWithIndifferentAccess.new(options)
  raise(PubNubRuntimeError, "You must supply a callback.") if options['callback'].blank?

  time_request = PubnubRequest.new(:operation => :time)
  time_request.set_callback(options)

  time_request.format_url!
  check_for_em time_request
end
uuid() click to toggle source
# File lib/pubnub.rb, line 265
def uuid
  UUID.new.generate
end
verify_init() click to toggle source
# File lib/pubnub.rb, line 83
def verify_init
  # publish_key and cipher_key are both optional.
  raise(InitError, "subscribe_key is a mandatory parameter.") if @subscribe_key.blank?
end

Private Instance Methods

_request(request, is_reactor_running = false) click to toggle source
# File lib/pubnub.rb, line 281
def _request(request, is_reactor_running = false)
  request.format_url!
  EM.schedule {
    begin

      operation_timeout = %w(subscribe presence).include?(request.operation) ? TIMEOUT_SUBSCRIBE : TIMEOUT_NON_SUBSCRIBE
      conn = EM::HttpRequest.new(request.url, :inactivity_timeout => operation_timeout) #client times out in 310s unless the server returns or timeout first
      req = conn.get()

      req.errback{
        logAndRetryGeneralError(is_reactor_running, req, request)
      }

      req.callback {

        if %w(subscribe presence).include?(request.operation)
          if (checkForBadJSON(req) == true && request.operation == "subscribe")
            logAndRetryBadJSON(is_reactor_running, req, request)
          else
            processGoodResponse(is_reactor_running, req, request)
          end
        else
          if req.response_header.http_status.to_i != SUCCESS_RESPONSE

            begin
              server_response = Yajl.load(req.response)
              request.callback.call(server_response)
            rescue => e
              request.callback.call([0, "Bad server response: #{req.response_header.http_status.to_i}"])
            ensure
              EM.stop unless is_reactor_running
            end

            else
              processGoodResponse(is_reactor_running, req, request)
            end
        end

      }

    rescue EventMachine::ConnectionError, RuntimeError => e # RuntimeError for catching "EventMachine not initialized"
      error_message = "Network Error: #{e.message}"
      puts(error_message)
      [0, error_message]
    end
  }
end
checkForBadJSON(req) click to toggle source
# File lib/pubnub.rb, line 329
def checkForBadJSON(req)
  jsonError = false
  begin
    JSON.parse(req.response)
  rescue => e
    jsonError = true
  end
  jsonError
end
init_default_logger() click to toggle source
# File lib/pubnub.rb, line 407
def init_default_logger
  logger = Logger.new("#{Dir.tmpdir}/pubnubError.log", 10, 10000000)
  logger.level = Logger::DEBUG
  logger
end
logAndRetryBadJSON(is_reactor_running, req, request) click to toggle source
# File lib/pubnub.rb, line 366
def logAndRetryBadJSON(is_reactor_running, req, request)
  errMsg = "#{Time.now}: Retrying from bad JSON: #{req.response.to_s}"
  logError(errMsg, request.url)
  retryRequest(is_reactor_running, req, request, TIMEOUT_BAD_JSON_RESPONSE)
end
logAndRetryBadResponseCode(is_reactor_running, req, request) click to toggle source
# File lib/pubnub.rb, line 372
def logAndRetryBadResponseCode(is_reactor_running, req, request)
  errMsg = "#{Time.now}: Retrying from bad server response code: (#{req.response_header.http_status.to_i}) #{req.response.to_s}"
  logError(errMsg, request.url)
  retryRequest(is_reactor_running, req, request, TIMEOUT_BAD_RESPONSE_CODE)
end
logAndRetryGeneralError(is_reactor_running, req, request) click to toggle source
# File lib/pubnub.rb, line 360
def logAndRetryGeneralError(is_reactor_running, req, request)
  errMsg = "#{Time.now}: Network connectivity issue while attempting to reach #{request.url}"
  logError(errMsg, request.url)
  retryRequest(is_reactor_running, req, request, TIMEOUT_GENERAL_ERROR)
end
logError(errMsg, url) click to toggle source
# File lib/pubnub.rb, line 378
def logError(errMsg, url)
  logger.debug("url: #{url}")
  logger.debug("#{errMsg}")
  logger.debug("")
end
logger() click to toggle source
# File lib/pubnub.rb, line 403
def logger
  @logger ||= init_default_logger
end
processGoodResponse(is_reactor_running, req, request) click to toggle source
# File lib/pubnub.rb, line 339
def processGoodResponse(is_reactor_running, req, request)

  if (req.response_header.http_status.to_i != SUCCESS_RESPONSE)

    unless (req.response_header.http_status.to_i == MSG_TOO_LARGE_RESPONSE)
      logAndRetryBadResponseCode(is_reactor_running, req, request)
    end

  else

    request.package_response!(req.response)
    cycle = request.callback.call(request.response)

    if %w(subscribe presence).include?(request.operation) && (cycle != false || request.first_request?)
      _request(request, is_reactor_running)
    else
      EM.stop unless is_reactor_running
    end
  end
end
retryRequest(is_reactor_running, req, request, delay) click to toggle source
# File lib/pubnub.rb, line 384
def retryRequest(is_reactor_running, req, request, delay)

  if %w(subscribe presence).include?(request.operation)
    EM::Timer.new(delay) do
      _request(request, is_reactor_running)
    end
  else
    error_msg = [0, "Request to #{request.url} failed."]

    request.set_error(true)
    request.callback.call(error_msg)

    logger.debug(error_msg)

    EM.stop unless is_reactor_running
  end

end