class Rpush::Daemon::Apnsp8::Delivery
Constants
- CLIENT_JOIN_TIMEOUT
- DEFAULT_MAX_CONCURRENT_STREAMS
- RETRYABLE_CODES
Public Class Methods
new(app, http2_client, token_provider, batch)
click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 13 def initialize(app, http2_client, token_provider, batch) @app = app @client = http2_client @batch = batch @first_push = true @token_provider = token_provider end
Public Instance Methods
perform()
click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 21 def perform @batch.each_notification do |notification| prepare_async_post(notification) end # Send all preprocessed requests at once @client.join(timeout: CLIENT_JOIN_TIMEOUT) rescue NetHttp2::AsyncRequestTimeout => error mark_batch_retryable(Time.now + 10.seconds, error) @client.close raise rescue Errno::ECONNREFUSED, SocketError, HTTP2::Error::StreamLimitExceeded => error # TODO restart connection when StreamLimitExceeded mark_batch_retryable(Time.now + 10.seconds, error) raise rescue StandardError => error mark_batch_failed(error) raise ensure @batch.all_processed end
Protected Instance Methods
build_request(notification)
click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 129 def build_request(notification) { path: "/3/device/#{notification.device_token}", headers: prepare_headers(notification), body: prepare_body(notification) } end
delayed_push_async(http_request)
click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 75 def delayed_push_async(http_request) until streams_available? do sleep 0.001 end @client.call_async(http_request) end
failed_message_to_log(notification, response)
click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 166 def failed_message_to_log(notification, response) log_error("Notification #{notification.id} failed, "\ "#{response[:code]}/#{response[:failure_reason]}") end
handle_response(notification, response)
click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 99 def handle_response(notification, response) code = response[:code] case code when 200 ok(notification) when *RETRYABLE_CODES service_unavailable(notification, response) else reflect(:notification_id_failed, @app, notification.id, code, response[:failure_reason]) @batch.mark_failed(notification, response[:code], response[:failure_reason]) failed_message_to_log(notification, response) end end
notification_data(notification)
click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 156 def notification_data(notification) notification.data || {} end
ok(notification)
click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 116 def ok(notification) log_info("#{notification.id} sent to #{notification.device_token}") @batch.mark_delivered(notification) end
prepare_async_post(notification)
click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 46 def prepare_async_post(notification) response = {} request = build_request(notification) http_request = @client.prepare_request(:post, request[:path], body: request[:body], headers: request[:headers] ) http_request.on(:headers) do |hdrs| response[:code] = hdrs[':status'].to_i end http_request.on(:body_chunk) do |body_chunk| next unless body_chunk.present? response[:failure_reason] = JSON.parse(body_chunk)['reason'] end http_request.on(:close) { handle_response(notification, response) } if @first_push @first_push = false @client.call_async(http_request) else delayed_push_async(http_request) end end
prepare_body(notification)
click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 137 def prepare_body(notification) hash = notification.as_json.except(HTTP2_HEADERS_KEY) JSON.dump(hash).force_encoding(Encoding::BINARY) end
prepare_headers(notification)
click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 142 def prepare_headers(notification) jwt_token = @token_provider.token headers = {} headers['content-type'] = 'application/json' headers['apns-expiration'] = '0' headers['apns-priority'] = '10' headers['apns-topic'] = @app.bundle_id headers['authorization'] = "bearer #{jwt_token}" headers.merge notification_data(notification)[HTTP2_HEADERS_KEY] || {} end
remote_max_concurrent_streams()
click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 86 def remote_max_concurrent_streams # 0x7fffffff is the default value from http-2 gem (2^31) if @client.remote_settings[:settings_max_concurrent_streams] == 0x7fffffff # Ideally we'd fall back to `#local_settings` here, but `NetHttp2::Client` # doesn't expose that attr from the `HTTP2::Client` it wraps. Instead, we # chose a hard-coded value matching the default local setting from the # `HTTP2::Client` class DEFAULT_MAX_CONCURRENT_STREAMS else @client.remote_settings[:settings_max_concurrent_streams] end end
retry_message_to_log(notification)
click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 160 def retry_message_to_log(notification) log_warn("Notification #{notification.id} will be retried after "\ "#{notification.deliver_after.strftime('%Y-%m-%d %H:%M:%S')} "\ "(retry #{notification.retries}).") end
streams_available?()
click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 82 def streams_available? remote_max_concurrent_streams - @client.stream_count > 0 end