class Rpush::Daemon::Apnsp8::Delivery

Constants

RETRYABLE_CODES

Public Class Methods

new(app, http2_client, token_provider, batch) click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 11
def initialize(app, http2_client, token_provider, batch)
  @app = app
  @client = http2_client
  @batch = batch
  @first_push = true
  @token_provider = token_provider
end

Public Instance Methods

perform() click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 19
def perform
  @client.on(:error) { |err| mark_batch_retryable(Time.now + 10.seconds, err) }

  @batch.each_notification do |notification|
    prepare_async_post(notification)
  end

  # Send all preprocessed requests at once
  @client.join
rescue Errno::ECONNREFUSED, SocketError, HTTP2::Error::StreamLimitExceeded => error
  # TODO restart connection when StreamLimitExceeded
  mark_batch_retryable(Time.now + 10.seconds, error)
  raise
rescue StandardError => error
  mark_batch_failed(error)
  raise
ensure
  @batch.all_processed
end

Protected Instance Methods

build_request(notification) click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 122
def build_request(notification)
  {
    path:    "/3/device/#{notification.device_token}",
    headers: prepare_headers(notification),
    body:    prepare_body(notification)
  }
end
delayed_push_async(http_request) click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 72
def delayed_push_async(http_request)
  until streams_available? do
    sleep 0.001
  end
  @client.call_async(http_request)
end
failed_message_to_log(notification, response) click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 159
def failed_message_to_log(notification, response)
  log_error("Notification #{notification.id} failed, "\
    "#{response[:code]}/#{response[:failure_reason]}")
end
handle_response(notification, response) click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 92
def handle_response(notification, response)
  code = response[:code]
  case code
  when 200
    ok(notification)
  when *RETRYABLE_CODES
    service_unavailable(notification, response)
  else
    reflect(:notification_id_failed,
      @app,
      notification.id, code,
      response[:failure_reason])
    @batch.mark_failed(notification, response[:code], response[:failure_reason])
    failed_message_to_log(notification, response)
  end
end
notification_data(notification) click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 149
def notification_data(notification)
  notification.data || {}
end
ok(notification) click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 109
def ok(notification)
  log_info("#{notification.id} sent to #{notification.device_token}")
  @batch.mark_delivered(notification)
end
prepare_async_post(notification) click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 42
def prepare_async_post(notification)
  response = {}

  request = build_request(notification)
  log_warn(request)
  http_request = @client.prepare_request(:post, request[:path],
    body:    request[:body],
    headers: request[:headers]
  )

  http_request.on(:headers) do |hdrs|
    response[:code] = hdrs[':status'].to_i
  end

  http_request.on(:body_chunk) do |body_chunk|
    next unless body_chunk.present?

    response[:failure_reason] = JSON.parse(body_chunk)['reason']
  end

  http_request.on(:close) { handle_response(notification, response) }

  if @first_push
    @first_push = false
    @client.call_async(http_request)
  else
    delayed_push_async(http_request)
  end
end
prepare_body(notification) click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 130
def prepare_body(notification)
  hash = notification.as_json.except(HTTP2_HEADERS_KEY)
  JSON.dump(hash).force_encoding(Encoding::BINARY)
end
prepare_headers(notification) click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 135
def prepare_headers(notification)
  jwt_token = @token_provider.token

  headers = {}
  
  headers['content-type'] = 'application/json'
  headers['apns-expiration'] = '0'
  headers['apns-priority'] = '10'
  headers['apns-topic'] = @app.bundle_id
  headers['authorization'] = "bearer #{jwt_token}"

  headers.merge notification_data(notification)[HTTP2_HEADERS_KEY] || {}
end
remote_max_concurrent_streams() click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 83
def remote_max_concurrent_streams
  # 0x7fffffff is the default value from http-2 gem (2^31)
  if @client.remote_settings[:settings_max_concurrent_streams] == 0x7fffffff
    0
  else
    @client.remote_settings[:settings_max_concurrent_streams]
  end
end
retry_message_to_log(notification) click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 153
def retry_message_to_log(notification)
  log_warn("Notification #{notification.id} will be retried after "\
    "#{notification.deliver_after.strftime('%Y-%m-%d %H:%M:%S')} "\
    "(retry #{notification.retries}).")
end
service_unavailable(notification, response) click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 114
def service_unavailable(notification, response)
  @batch.mark_retryable(notification, Time.now + 10.seconds)
  # Logs should go last as soon as we need to initialize
  # retry time to display it in log
  failed_message_to_log(notification, response)
  retry_message_to_log(notification)
end
streams_available?() click to toggle source
# File lib/rpush/daemon/apnsp8/delivery.rb, line 79
def streams_available?
  remote_max_concurrent_streams - @client.stream_count > 0
end