class Pubnub::Client

Constants

DEFAULT_CONNECT_CALLBACK
DEFAULT_ERROR_CALLBACK

Attributes

callback[RW]
channel[RW]
cipher_key[RW]
close_connection[RW]
error[RW]
history_count[RW]
history_end[RW]
history_limit[RW]
history_reverse[RW]
history_start[RW]
host[RW]
jsonp[RW]
last_timetoken[RW]
message[RW]
operation[RW]
origin[RW]
port[RW]
publish_key[RW]
query[RW]
response[RW]
secret_key[RW]
session_uuid[RW]
ssl[RW]
subscribe_key[RW]
timetoken[RW]
url[RW]
uuid[RW]

Public Class Methods

new(options = {}) click to toggle source
# File lib/pubnub/client.rb, line 38
def initialize(options = {})
  $log = options[:logger]
  $log = Logger.new('pubnub.log', 0, 100 * 1024 * 1024) unless $log

  @subscriptions = Array.new

  @subscription_request = nil
  @retry            = true
  @retry_count      = 0
  @callback         = options[:callback]
  @error_callback   = options[:error_callback]
  @error_callback   = DEFAULT_ERROR_CALLBACK unless @error_callback
  @connect_callback = options[:connect_callback]
  @connect_callback = DEFAULT_CONNECT_CALLBACK unless @connect_callback
  @cipher_key       = options[:cipher_key]
  @publish_key      = options[:publish_key] || DEFAULT_PUBLISH_KEY
  @subscribe_key    = options[:subscribe_key] || DEFAULT_SUBSCRIBE_KEY
  @channel          = options[:channel] || DEFAULT_CHANNEL
  @message          = options[:message]
  @ssl              = options[:ssl]
  @secret_key       = options[:secret_key]
  @timetoken        = options[:timetoken]
  @session_uuid     = options[:uuid] || options[:session_uuid] || UUID.new.generate

      @history_count    = options[:count]
  @history_start    = options[:start]
  @history_end      = options[:end]
  @history_reverse  = options[:reverse]

  @port             = options[:port]
  @url              = options[:url]
  @origin           = options[:origin]
  @origin           = DEFAULT_ORIGIN unless @origin
  @query            = options[:query]

  @http_sync        = options[:http_sync]

  @max_retries = options[:max_retries]
  @max_retries = MAX_RETRIES unless @max_retries

  @non_subscribe_timeout = options[:non_subscribe_timeout]
  @non_subscribe_timeout = 5 unless @non_subscribe_timeout

  @reconnect_max_attempts = options[:reconnect_max_attempts]
  @reconnect_max_attempts = 60 unless @reconnect_max_attempts

  @reconnect_retry_interval = options[:reconnect_retry_interval]
  @reconnect_retry_interval = 5 unless @reconnect_retry_interval

  @reconnect_response_timeout = options[:reconnect_response_timeout]
  @reconnect_response_timeout = 5 unless @reconnect_response_timeout

  @sync_connection_sub = Pubnub::PubNubHTTParty.new
  @sync_connection     = Pubnub::PubNubHTTParty.new

  @pause_subscribe = false
end

Public Instance Methods

active_subscriptions() click to toggle source
# File lib/pubnub/client.rb, line 167
def active_subscriptions
  @subscription_request
end
here_now(options = {}, &block) click to toggle source
# File lib/pubnub/client.rb, line 149
def here_now(options = {}, &block)
  options[:callback] = block if block_given?
  options = merge_options(options, 'here_now')
  verify_operation('here_now', options)
  start_request options
end
history(options = {}, &block) click to toggle source
# File lib/pubnub/client.rb, line 120
def history(options = {}, &block)
  options[:callback] = block if block_given?
  options = merge_options(options, 'history')
  verify_operation('history', options)
  options[:params].merge!({:count => options[:count]})
  options[:params].merge!({:start => options[:start]}) unless options[:start].nil?
  options[:params].merge!({:end => options[:end]}) unless options[:end].nil?
  options[:params].merge!({:reverse => 'true'}) if options[:reverse]
  start_request options
end
leave(options = {}, &block) click to toggle source
# File lib/pubnub/client.rb, line 131
def leave(options = {}, &block)
  options[:callback] = block if block_given?
  options = merge_options(options, 'leave')
  verify_operation('leave', options)
  return false unless get_channels_for_subscription.include? options[:channel]
  remove_from_subscription options[:channel]
  if @subscriptions.empty?
    @timetoken = 0
    @subscription_request.timetoken = 0
    @subscribe_connection.close
    @wait_for_response = false
  end
  start_request options

end
Also aliased as: unsubscribe
presence(options = {}, &block) click to toggle source
# File lib/pubnub/client.rb, line 113
def presence(options = {}, &block)
  options[:callback] = block if block_given?
  options = merge_options(options, 'presence')
  verify_operation('presence', options)
  start_request options
end
publish(options = {}, &block) click to toggle source
# File lib/pubnub/client.rb, line 96
def publish(options = {}, &block)
  options[:callback] = block if block_given?
  options = merge_options(options, 'publish')
  verify_operation('publish', options)
  start_request options

end
subscribe(options = {}, &block) click to toggle source
# File lib/pubnub/client.rb, line 104
def subscribe(options = {}, &block)
  options[:callback] = block if block_given?
  options = merge_options(options, 'subscribe')
  verify_operation('subscribe', options)
  @error_callback.call 'YOU ARE ALREADY SUBSCRIBED TO THAT CHANNEL' if get_channels_for_subscription.include? options[:channel]
  $log.error 'YOU ARE ALREADY SUBSCRIBED TO THAT CHANNEL' if get_channels_for_subscription.include? options[:channel]
  start_request options
end
subscription_running?() click to toggle source
# File lib/pubnub/client.rb, line 163
def subscription_running?
  @subscription_running
end
time(options = {}, &block) click to toggle source
# File lib/pubnub/client.rb, line 156
def time(options = {}, &block)
  options[:callback] = block if block_given?
  options = merge_options(options, 'time')
  verify_operation('time', options)
  start_request options
end
unsubscribe(options = {}, &block)
Alias for: leave

Private Instance Methods

compile_channel_parameter(channel, channels) click to toggle source
# File lib/pubnub/client.rb, line 517
def compile_channel_parameter(channel, channels)
  raise(ArgumentError, 'Can\'t handle both :channel and :channels parameters given.') if channel && channels
  channel = channels if channels
  channel = channel.to_s if channel.class == Symbol
  channel = channel.map! {|c| c.to_s}.join(',') if channel.class == Array
  return channel
end
fire_subscriptions_callback_for(envelope) click to toggle source
# File lib/pubnub/client.rb, line 207
def fire_subscriptions_callback_for(envelope)
  @subscriptions.each do |subscription|
    subscription.fire_callback_for envelope
  end
end
get_channels_for_subscription() click to toggle source
# File lib/pubnub/client.rb, line 201
def get_channels_for_subscription
  @subscriptions.map do |sub|
    sub.channel
  end
end
is_update?(timetoken) click to toggle source
# File lib/pubnub/client.rb, line 470
def is_update?(timetoken)
  if @timetoken.to_i < timetoken.to_i
    @timetoken = timetoken
  else
    false
  end

end
is_valid_json?(response) click to toggle source
# File lib/pubnub/client.rb, line 479
def is_valid_json?(response)
  begin
    JSON.parse(response)
    valid = true
  rescue
    valid = false
  end
  valid
end
merge_options(options = {}, operation = '') click to toggle source
# File lib/pubnub/client.rb, line 177
def merge_options(options = {}, operation = '')
  options[:channel] = compile_channel_parameter(options[:channel],options[:channels]) if options[:channel] || options[:channels]
  return {
    :ssl           => @ssl,
    :cipher_key    => @cipher_key,
    :publish_key   => @publish_key,
    :subscribe_key => @subscribe_key,
    :secret_key    => @secret_key,
    :origin        => @origin,
    :operation     => operation,
    :params        => { :uuid => @session_uuid },
    :timetoken     => @timetoken,
    :error_callback=> @error_callback
  }.merge(options)
end
remove_from_subscription(channel) click to toggle source
# File lib/pubnub/client.rb, line 173
def remove_from_subscription(channel)
  @subscriptions.delete_if { |s| s.channel.to_s == channel.to_s }
end
send_request(request) click to toggle source
# File lib/pubnub/client.rb, line 449
def send_request(request)
  if %w(subscribe presence).include? request.operation
    unless @subscribe_connection
      @subscribe_connection = EM::HttpRequest.new(request.origin, :connect_timeout => 370, :inactivity_timeout => 370)
      connection = @subscribe_connection.get :path => '/time/0', :keepalive => true, :query => request.query
      #EM.next_tick do
        connection.callback do
          EM.defer do @connect_callback.call 'ASYNC SUBSCRIBE CONNECTION' end
        end
      #end
    end

    @subscribe_connection.get :path => request.path, :query => request.query, :keepalive => true
  else
    unless @connection
      @connection = EM::HttpRequest.new request.origin
    end
    @connection.get :path => request.path, :query => request.query, :keepalive => true
  end
end
start_em_if_not_running() click to toggle source
# File lib/pubnub/client.rb, line 193
def start_em_if_not_running
  Thread.new do
    EM.run
  end unless EM.reactor_running?

  until EM.reactor_running? do end
end
start_request(options) click to toggle source
# File lib/pubnub/client.rb, line 213
def start_request(options)
  request = Pubnub::Request.new(options)
  unless options[:http_sync]
    start_em_if_not_running

    if %w(subscribe presence).include? request.operation
      options[:channel].split(',').each do |channel|
        @subscriptions << Subscription.new(:channel => channel, :callback => options[:callback], :error_callback => options[:error_callback]) unless get_channels_for_subscription.include? channel
      end

      @subscription_request = request unless @subscription_request

      if @subscription_request.channel != get_channels_for_subscription.join(',') && @subscription_running
        @subscribe_connection.close
        @timetoken = 0
        @subscription_request.timetoken = 0
        @wait_for_response = false
      end

      @subscription_request.channel = get_channels_for_subscription.join(',')

      @subscription_running = EM.add_periodic_timer(PERIODIC_TIMER) do

        unless @wait_for_response || get_channels_for_subscription.empty?
          @wait_for_response = true
          $log.debug 'SETTING CHANNELS'
          @subscription_request.channel = get_channels_for_subscription.join(',')
          $log.debug 'SENDING SUBSCRIBE REQUEST'
          http = send_request(@subscription_request)

          http.callback do
            $log.debug 'GOT SUBSCRIBE RESPONSE'
            @wait_for_response = false
            if http.response_header.status.to_i == 200
              if is_valid_json?(http.response)
                $log.debug 'GOT VALID JSON'
                @subscription_request.handle_response(http)
                $log.debug 'HANDLED RESPONSE'
                if is_update?(@subscription_request.timetoken)
                  $log.debug 'TIMETOKEN UPDATED'
                  @subscription_request.envelopes.each do |envelope|
                    fire_subscriptions_callback_for envelope
                  end
                else
                  $log.debug 'TIMETOKEN NOT UPDATED'
                end
              end
            else
              if request.error_callback
                request.error_callback.call Pubnub::Response.new(
                                                :error_init => true,
                                                :message =>  [0, "Bad server response: #{http.response_header.status.to_i}"].to_json,
                                                :response =>  [0, "Bad server response: #{http.response_header.status.to_i}"].to_json
                                            )
              else
                fire_subscriptions_callback_for Pubnub::Response.new(
                                         :error_init => true,
                                         :message =>  [0, "Bad server response: #{http.response_header.status.to_i}"].to_json,
                                         :response =>  [0, "Bad server response: #{http.response_header.status.to_i}"].to_json
                                     )
              end

            end
          end
          http.errback do
            $log.error 'GOT SUBSCRIBE ERROR'
            @error_callback.call [0, http.error]
          end
        end
      end unless @subscription_running
    else
      EM.next_tick do
        $log.debug 'SENDING OTHER REQUEST'

        http = send_request(request)

        http.errback do
          @error_callback.call [0, http.error]
        end

        http.callback do
          $log.debug 'GOT OTHER RESPONSE'
          #byebug
          if http.response_header.status.to_i == 200
            if is_valid_json?(http.response)
              request.handle_response(http)
              request.envelopes.each do |envelope|
                $log.debug 'CALLING PARAMETER CALLBACK'
                request.callback.call envelope
              end
            end
          else
            begin
              request.handle_response(http)
              request.envelopes.each do |envelope|
                if request.error_callback
                  request.error_callback.call envelope
                else
                  @error_callback.call envelope
                end
              end

            rescue
              if request.error_callback
                request.error_callback.call Pubnub::Response.new(
                  :error_init => true,
                  :message =>  [0, "Bad server response: #{http.response_header.status.to_i}"].to_json,
                  :response =>  [0, "Bad server response: #{http.response_header.status.to_i}"].to_json
                )
              else
                @error_callback.call Pubnub::Response.new(
                  :error_init => true,
                  :message =>  [0, "Bad server response: #{http.response_header.status.to_i}"].to_json,
                  :response =>  [0, "Bad server response: #{http.response_header.status.to_i}"].to_json
                )
              end
            end
          end
        end
      end
    end
  else
    begin
      if @timetoken.to_i == 0 && request.operation == 'subscribe'
        time(:http_sync => true){|envelope| @timetoken = envelope.message.to_i }
      end
      begin
        if request.query.to_s.empty?
          if %w(subscribe presence).include? request.operation
            response = @sync_connection_sub.send_request(request.origin + request.path, :timeout => 370)
          else
            response = @sync_connection.send_request(request.origin + request.path, :timeout => @non_subscribe_timeout)
          end
        else
          if %w(subscribe presence).include? request.operation
            response = @sync_connection_sub.send_request(request.origin + request.path, :query => request.query, :timeout => 370)
          else
            response = @sync_connection.send_request(request.origin + request.path, :query => request.query, :timeout => @non_subscribe_timeout)
          end
        end
      rescue
        msg = 'ERROR SENDING REQUEST'
        @error_callback.call  Pubnub::Response.new(
                                  :error_init => true,
                                  :message =>  [0, msg].to_s,
                                  :response =>  [0, msg].to_s
                              )
        @retries = 0 unless @retries
        @retries += 1
        if @retries <= @max_retries
          return start_request options
        else
          msg = "ERROR SENDING REQUEST AFTER #{@retries} RETRIES"
          @retries = 0
          return Pubnub::Response.new(
                      :error_init => true,
                      :message =>  [0, msg].to_s,
                      :response =>  [0, msg].to_s
                  )
        end
      end

      if @sync_connection_sub.first_run?
        @connect_callback.call 'SYNC CONNECTION ESTABLISHED'
      end
      if response.response.code.to_i == 200
        if is_valid_json?(response.body)
          request.handle_response(response)
          @timetoken = request.timetoken

          if request.operation == 'leave'
            Subscription.remove_from_subscription request.channel
          end

          if !request.callback.nil?
            request.envelopes.each do |envelope|
              request.callback.call envelope
            end
          else
            if %w(publish leave here_now time).include? request.operation
              return request.envelopes[0]
            else
              return request.envelopes
            end
          end
        end
      else
        begin
          request.handle_response(response)
          if !request.callback.nil?
            request.envelopes.each do |envelope|
              request.callback.call envelope
            end
          else
            if %w(publish leave here_now time).include? request.operation
              return request.envelopes[0]
            else
              return request.envelopes
            end
          end
        rescue
          if request.error_callback
            request.error_callback.call Pubnub::Response.new(
                                            :error_init => true,
                                            :message =>  [0, "Bad server response: #{response.response.code.to_i}"].to_json,
                                            :response =>  [0, "Bad server response: #{response.response.code.to_i}"].to_json
                                        )
          else
            @error_callback.call Pubnub::Response.new(
                                     :error_init => true,
                                     :message =>  [0, "Bad server response: #{response.response.code.to_i}"].to_json,
                                     :response =>  [0, "Bad server response: #{response.response.code.to_i}"].to_json
                                 )
          end
        end

        if @sync_retries
          @sync_retries += 1
        else
          @sync_retries = 1
        end

        if @sync_retries < @max_retries
          start_request options
        end
      end
    rescue Timeout::Error
      if request.error_callback
        request.error_callback.call [0, 'TIMEOUT']
      else
        @error_callback.call [0, 'TIMEOUT']
      end
    end
  end
end
verify_operation(operation, options) click to toggle source
# File lib/pubnub/client.rb, line 489
def verify_operation(operation, options)
  case operation
    when 'publish'
      raise(ArgumentError, 'publish() requires :channel, :message parameters and, if async, callback parameter or block given.') unless (options[:channel] || options[:channels]) && (options[:callback] || options[:block_given] || options[:http_sync]) && options[:message]
    when 'subscribe'
      raise(ArgumentError, 'subscribe() requires :channel parameters and, if async, callback parameter or block given.') unless (options[:channel] || options[:channels]) && (options[:callback] || options[:block_given] || options[:http_sync])
    when 'presence'
      raise(ArgumentError, 'presence() requires :channel parameters and, if async, callback parameter or block given.') unless (options[:channel] || options[:channels]) && (options[:callback] || options[:block_given] || options[:http_sync])
    when 'time'
      raise(ArgumentError, 'time() require, if async, callback parameter or block given.') unless (options[:callback] || options[:block_given] || options[:http_sync])
    when 'history'
      raise(ArgumentError, 'history() requires :channel, :count parameters and, if async, callback parameter or block given.') unless (options[:channel] || options[:channels]) && (options[:callback] || options[:block_given] || options[:http_sync]) && options[:count]
    when 'here_now'
      raise(ArgumentError, 'here_now() requires :channel parameters and, if async, callback parameter or block given.') unless (options[:channel] || options[:channels]) && (options[:callback] || options[:block_given] || options[:http_sync])
    when 'leave'
      raise(ArgumentError, 'leave() requires :channel parameters and, if async, callback parameter or block given.') unless (options[:channel] || options[:channels]) && (options[:callback] || options[:block_given] || options[:http_sync])
  end

  unless options[:callback].nil?
    raise('callback is invalid.') unless options[:callback].respond_to? 'call'
  end

  unless options[:error_callback].nil?
    raise('error_callback is invalid.') unless options[:error_callback].respond_to? 'call'
  end

end