class Rpush::Daemon::Apnsp8::Delivery

Constants

CLIENT_JOIN_TIMEOUT
DEFAULT_MAX_CONCURRENT_STREAMS
RETRYABLE_CODES

Public Class Methods

new(app, http2_client, token_provider, batch) click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 13
def initialize(app, http2_client, token_provider, batch)
  @app = app
  @client = http2_client
  @batch = batch
  @first_push = true
  @token_provider = token_provider
end

Public Instance Methods

perform() click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 21
def perform
  @batch.each_notification do |notification|
    prepare_async_post(notification)
  end

  # Send all preprocessed requests at once
  @client.join(timeout: CLIENT_JOIN_TIMEOUT)
rescue NetHttp2::AsyncRequestTimeout => error
  mark_batch_retryable(Time.now + 10.seconds, error)
  @client.close
  raise
rescue Errno::ECONNREFUSED, SocketError, HTTP2::Error::StreamLimitExceeded => error
  # TODO restart connection when StreamLimitExceeded
  mark_batch_retryable(Time.now + 10.seconds, error)
  raise
rescue StandardError => error
  mark_batch_failed(error)
  raise
ensure
  @batch.all_processed
end

Protected Instance Methods

build_request(notification) click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 129
def build_request(notification)
  {
    path:    "/3/device/#{notification.device_token}",
    headers: prepare_headers(notification),
    body:    prepare_body(notification)
  }
end
delayed_push_async(http_request) click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 75
def delayed_push_async(http_request)
  until streams_available? do
    sleep 0.001
  end
  @client.call_async(http_request)
end
failed_message_to_log(notification, response) click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 166
def failed_message_to_log(notification, response)
  log_error("Notification #{notification.id} failed, "\
    "#{response[:code]}/#{response[:failure_reason]}")
end
handle_response(notification, response) click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 99
def handle_response(notification, response)
  code = response[:code]
  case code
  when 200
    ok(notification)
  when *RETRYABLE_CODES
    service_unavailable(notification, response)
  else
    reflect(:notification_id_failed,
      @app,
      notification.id, code,
      response[:failure_reason])
    @batch.mark_failed(notification, response[:code], response[:failure_reason])
    failed_message_to_log(notification, response)
  end
end
notification_data(notification) click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 156
def notification_data(notification)
  notification.data || {}
end
ok(notification) click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 116
def ok(notification)
  log_info("#{notification.id} sent to #{notification.device_token}")
  @batch.mark_delivered(notification)
end
prepare_async_post(notification) click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 46
def prepare_async_post(notification)
  response = {}

  request = build_request(notification)
  http_request = @client.prepare_request(:post, request[:path],
    body:    request[:body],
    headers: request[:headers]
  )

  http_request.on(:headers) do |hdrs|
    response[:code] = hdrs[':status'].to_i
  end

  http_request.on(:body_chunk) do |body_chunk|
    next unless body_chunk.present?

    response[:failure_reason] = JSON.parse(body_chunk)['reason']
  end

  http_request.on(:close) { handle_response(notification, response) }

  if @first_push
    @first_push = false
    @client.call_async(http_request)
  else
    delayed_push_async(http_request)
  end
end
prepare_body(notification) click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 137
def prepare_body(notification)
  hash = notification.as_json.except(HTTP2_HEADERS_KEY)
  JSON.dump(hash).force_encoding(Encoding::BINARY)
end
prepare_headers(notification) click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 142
def prepare_headers(notification)
  jwt_token = @token_provider.token

  headers = {}

  headers['content-type'] = 'application/json'
  headers['apns-expiration'] = '0'
  headers['apns-priority'] = '10'
  headers['apns-topic'] = @app.bundle_id
  headers['authorization'] = "bearer #{jwt_token}"

  headers.merge notification_data(notification)[HTTP2_HEADERS_KEY] || {}
end
remote_max_concurrent_streams() click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 86
def remote_max_concurrent_streams
  # 0x7fffffff is the default value from http-2 gem (2^31)
  if @client.remote_settings[:settings_max_concurrent_streams] == 0x7fffffff
    # Ideally we'd fall back to `#local_settings` here, but `NetHttp2::Client`
    # doesn't expose that attr from the `HTTP2::Client` it wraps. Instead, we
    # chose a hard-coded value matching the default local setting from the
    # `HTTP2::Client` class
    DEFAULT_MAX_CONCURRENT_STREAMS
  else
    @client.remote_settings[:settings_max_concurrent_streams]
  end
end
retry_message_to_log(notification) click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 160
def retry_message_to_log(notification)
  log_warn("Notification #{notification.id} will be retried after "\
    "#{notification.deliver_after.strftime('%Y-%m-%d %H:%M:%S')} "\
    "(retry #{notification.retries}).")
end
service_unavailable(notification, response) click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 121
def service_unavailable(notification, response)
  @batch.mark_retryable(notification, Time.now + 10.seconds)
  # Logs should go last as soon as we need to initialize
  # retry time to display it in log
  failed_message_to_log(notification, response)
  retry_message_to_log(notification)
end
streams_available?() click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 82
def streams_available?
  remote_max_concurrent_streams - @client.stream_count > 0
end