class Aws::CognitoSrp

Client for AWS Cognito Identity Provider using Secure Remote Password (SRP).

Borrowed from: gist.github.com/jviney/5fd0fab96cd70d5d46853f052be4744c

This code is a direct translation of the Python version found here: github.com/capless/warrant/blob/ff2e4793d8479e770f2461ef7cbc0c15ee784395/warrant/aws_srp.py

Example usage:

aws_srp = Aws::CognitoSrp.new(
  username: "username",
  password: "password",
  pool_id: "pool-id",
  client_id: "client-id",
  aws_client: Aws::CognitoIdentityProvider::Client.new(region: "us-west-2")
)

aws_srp.authenticate

Constants

G_HEX
INFO_BITS
NEW_PASSWORD_REQUIRED
N_HEX
PASSWORD_VERIFIER
REFRESH_TOKEN
USER_SRP_AUTH
VERSION

Public Class Methods

new(username:, password:, pool_id:, client_id:, aws_client:) click to toggle source
# File lib/aws/cognito_srp.rb, line 88
def initialize(username:, password:, pool_id:, client_id:, aws_client:)
  @username = username
  @password = password
  @pool_id = pool_id
  @client_id = client_id
  @aws_client = aws_client

  @big_n = hex_to_long(N_HEX)
  @g = hex_to_long(G_HEX)
  @k = hex_to_long(hex_hash("00#{N_HEX}0#{G_HEX}"))
  @small_a_value = generate_random_small_a
  @large_a_value = calculate_a
end

Public Instance Methods

authenticate() click to toggle source
# File lib/aws/cognito_srp.rb, line 102
def authenticate
  init_auth_response = @aws_client.initiate_auth(
    client_id: @client_id,
    auth_flow: USER_SRP_AUTH,
    auth_parameters: {
      USERNAME: @username,
      SRP_A: long_to_hex(@large_a_value)
    }
  )

  unless init_auth_response.challenge_name == PASSWORD_VERIFIER
    raise UnexpectedChallenge, "Expected Cognito to respond with a #{PASSWORD_VERIFIER} challenge, got #{init_auth_response.challenge_name} instead"
  end

  challenge_response = process_challenge(init_auth_response.challenge_parameters)

  auth_response = @aws_client.respond_to_auth_challenge(
    client_id: @client_id,
    challenge_name: PASSWORD_VERIFIER,
    challenge_responses: challenge_response
  )

  if auth_response.challenge_name == NEW_PASSWORD_REQUIRED
    raise NewPasswordRequired, "Cognito responded to password verifier with a #{NEW_PASSWORD_REQUIRED} challenge"
  end

  auth_response.authentication_result
end
refresh(refresh_token)
Alias for: refresh_tokens
refresh_tokens(refresh_token) click to toggle source
# File lib/aws/cognito_srp.rb, line 131
def refresh_tokens(refresh_token)
  resp = @aws_client.initiate_auth(
    client_id: @client_id,
    auth_flow: REFRESH_TOKEN,
    auth_parameters: {
      REFRESH_TOKEN: refresh_token
    }
  )

  resp.authentication_result
end
Also aliased as: refresh

Private Instance Methods

bytes_to_hex(bytes) click to toggle source
# File lib/aws/cognito_srp.rb, line 206
def bytes_to_hex(bytes)
  bytes.unpack1('H*')
end
calculate_a() click to toggle source
# File lib/aws/cognito_srp.rb, line 151
def calculate_a
  big_a = @g.pow(@small_a_value, @big_n)
  raise ValueError, "Safety check for A failed" if big_a % @big_n == 0
  big_a
end
calculate_u(big_a, big_b) click to toggle source
# File lib/aws/cognito_srp.rb, line 241
def calculate_u(big_a, big_b)
  u_hex_hash = hex_hash(pad_hex(big_a) + pad_hex(big_b))
  hex_to_long(u_hex_hash)
end
compute_hkdf(ikm, salt) click to toggle source
# File lib/aws/cognito_srp.rb, line 234
def compute_hkdf(ikm, salt)
  prk = ::OpenSSL::HMAC.digest(::OpenSSL::Digest::SHA256.new, salt, ikm)
  info_bits_update = INFO_BITS + 1.chr.force_encoding('utf-8')
  hmac_hash = ::OpenSSL::HMAC.digest(::OpenSSL::Digest::SHA256.new, prk, info_bits_update)
  hmac_hash[0, 16]
end
generate_random_small_a() click to toggle source
# File lib/aws/cognito_srp.rb, line 146
def generate_random_small_a
  random_long_int = get_random(128)
  random_long_int % @big_n
end
get_password_authentication_key(username, password, server_b_value, salt) click to toggle source
# File lib/aws/cognito_srp.rb, line 157
def get_password_authentication_key(username, password, server_b_value, salt)
  u_value = calculate_u(@large_a_value, server_b_value)

  raise ValueError, "U cannot be zero" if u_value == 0

  username_password = "#{@pool_id.split("_")[1]}#{username}:#{password}"
  username_password_hash = hash_sha256(username_password)

  x_value = hex_to_long(hex_hash(pad_hex(salt) + username_password_hash))
  g_mod_pow_xn = @g.pow(x_value, @big_n)
  int_value2 = server_b_value - @k * g_mod_pow_xn
  s_value = int_value2.pow(@small_a_value + u_value * x_value, @big_n)
  compute_hkdf(hex_to_bytes(pad_hex(s_value)), hex_to_bytes(pad_hex(long_to_hex(u_value))))
end
get_random(nbytes) click to toggle source
# File lib/aws/cognito_srp.rb, line 218
def get_random(nbytes)
  hex_to_long(bytes_to_hex(::SecureRandom.gen_random(nbytes)))
end
hash_sha256(buf) click to toggle source
# File lib/aws/cognito_srp.rb, line 194
def hash_sha256(buf)
  ::Digest::SHA256.hexdigest(buf)
end
hex_hash(hex_string) click to toggle source
# File lib/aws/cognito_srp.rb, line 198
def hex_hash(hex_string)
  hash_sha256(hex_to_bytes(hex_string))
end
hex_to_bytes(hex_string) click to toggle source
# File lib/aws/cognito_srp.rb, line 202
def hex_to_bytes(hex_string)
  [hex_string].pack('H*')
end
hex_to_long(hex_string) click to toggle source
# File lib/aws/cognito_srp.rb, line 210
def hex_to_long(hex_string)
  hex_string.to_i(16)
end
long_to_hex(long_num) click to toggle source
# File lib/aws/cognito_srp.rb, line 214
def long_to_hex(long_num)
  long_num.to_s(16)
end
pad_hex(long_int) click to toggle source
# File lib/aws/cognito_srp.rb, line 222
def pad_hex(long_int)
  hash_str = long_int.is_a?(::String) ? long_int : long_to_hex(long_int)

  if hash_str.size % 2 == 1
    hash_str = "0#{hash_str}"
  elsif '89ABCDEFabcdef'.include?(hash_str[0])
    hash_str = "00#{hash_str}"
  end

  hash_str
end
process_challenge(challenge_parameters) click to toggle source
# File lib/aws/cognito_srp.rb, line 172
def process_challenge(challenge_parameters)
  user_id_for_srp = challenge_parameters.fetch("USER_ID_FOR_SRP")
  salt_hex = challenge_parameters.fetch("SALT")
  srp_b_hex = challenge_parameters.fetch("SRP_B")
  secret_block_b64 = challenge_parameters.fetch("SECRET_BLOCK")

  timestamp = ::Time.now.utc.strftime("%a %b %-d %H:%M:%S %Z %Y")

  hkdf = get_password_authentication_key(user_id_for_srp, @password, srp_b_hex.to_i(16), salt_hex)
  secret_block_bytes = ::Base64.strict_decode64(secret_block_b64)
  msg = @pool_id.split("_")[1] + user_id_for_srp + secret_block_bytes + timestamp
  hmac_digest = ::OpenSSL::HMAC.digest(::OpenSSL::Digest::SHA256.new, hkdf, msg)
  signature_string = ::Base64.strict_encode64(hmac_digest).force_encoding('utf-8')

  {
    TIMESTAMP: timestamp,
    USERNAME: user_id_for_srp,
    PASSWORD_CLAIM_SECRET_BLOCK: secret_block_b64,
    PASSWORD_CLAIM_SIGNATURE: signature_string
  }
end