# Copyright (c) 2023 Apple Inc. Licensed under MIT License.
from typing import List, Optional, Dict
from base64 import b64decode
from enum import IntEnum
import time
import datetime
import asn1
import jwt
import requests
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric.ec import ECDSA
from cryptography.hazmat.primitives.hashes import SHA1, SHA256
from cryptography.x509 import ocsp, oid
from OpenSSL import crypto
from appstoreserverlibrary.models.AppTransaction import AppTransaction
from appstoreserverlibrary.models.LibraryUtility import _get_cattrs_converter
from .models.Environment import Environment
from .models.ResponseBodyV2DecodedPayload import ResponseBodyV2DecodedPayload
from .models.JWSTransactionDecodedPayload import JWSTransactionDecodedPayload
from .models.JWSRenewalInfoDecodedPayload import JWSRenewalInfoDecodedPayload
[docs]
class SignedDataVerifier:
"""
A class providing utility methods for verifying and decoding App Store signed data.
"""
def __init__(
self,
root_certificates: List[bytes],
enable_online_checks: bool,
environment: Environment,
bundle_id: str,
app_apple_id: Optional[int] = None,
):
self._chain_verifier = _ChainVerifier(root_certificates)
self._environment = environment
self._bundle_id = bundle_id
self._app_apple_id = app_apple_id
self._enable_online_checks = enable_online_checks
if environment == Environment.PRODUCTION and app_apple_id is None:
raise ValueError("appAppleId is required when the environment is Production")
[docs]
def verify_and_decode_renewal_info(self, signed_renewal_info: str) -> JWSRenewalInfoDecodedPayload:
"""
Verifies and decodes a signedRenewalInfo obtained from the App Store Server API, an App Store Server Notification, or from a device
See https://developer.apple.com/documentation/appstoreserverapi/jwsrenewalinfo
:param signed_renewal_info: The signedRenewalInfo field
:return: The decoded renewal info after verification
:throws VerificationException: Thrown if the data could not be verified
"""
decoded_renewal_info = _get_cattrs_converter(JWSRenewalInfoDecodedPayload).structure(self._decode_signed_object(signed_renewal_info), JWSRenewalInfoDecodedPayload)
if decoded_renewal_info.environment != self._environment:
raise VerificationException(VerificationStatus.INVALID_ENVIRONMENT)
return decoded_renewal_info
[docs]
def verify_and_decode_signed_transaction(self, signed_transaction: str) -> JWSTransactionDecodedPayload:
"""
Verifies and decodes a signedTransaction obtained from the App Store Server API, an App Store Server Notification, or from a device
See https://developer.apple.com/documentation/appstoreserverapi/jwstransaction
:param signed_transaction: The signedTransaction field
:return: The decoded transaction info after verification
:throws VerificationException: Thrown if the data could not be verified
"""
decoded_transaction_info = _get_cattrs_converter(JWSTransactionDecodedPayload).structure(self._decode_signed_object(signed_transaction), JWSTransactionDecodedPayload)
if decoded_transaction_info.bundleId != self._bundle_id:
raise VerificationException(VerificationStatus.INVALID_APP_IDENTIFIER)
if decoded_transaction_info.environment != self._environment:
raise VerificationException(VerificationStatus.INVALID_ENVIRONMENT)
return decoded_transaction_info
[docs]
def verify_and_decode_notification(self, signed_payload: str) -> ResponseBodyV2DecodedPayload:
"""
Verifies and decodes an App Store Server Notification signedPayload
See https://developer.apple.com/documentation/appstoreservernotifications/signedpayload
:param signedPayload: The payload received by your server
:return: The decoded payload after verification
:throws VerificationException: Thrown if the data could not be verified
"""
decoded_dict = self._decode_signed_object(signed_payload)
decoded_signed_notification = _get_cattrs_converter(ResponseBodyV2DecodedPayload).structure(decoded_dict, ResponseBodyV2DecodedPayload)
bundle_id = None
app_apple_id = None
environment = None
if decoded_signed_notification.data:
bundle_id = decoded_signed_notification.data.bundleId
app_apple_id = decoded_signed_notification.data.appAppleId
environment = decoded_signed_notification.data.environment
elif decoded_signed_notification.summary:
bundle_id = decoded_signed_notification.summary.bundleId
app_apple_id = decoded_signed_notification.summary.appAppleId
environment = decoded_signed_notification.summary.environment
elif decoded_signed_notification.externalPurchaseToken:
bundle_id = decoded_signed_notification.externalPurchaseToken.bundleId
app_apple_id = decoded_signed_notification.externalPurchaseToken.appAppleId
if decoded_signed_notification.externalPurchaseToken.externalPurchaseId and decoded_signed_notification.externalPurchaseToken.externalPurchaseId.startswith("SANDBOX"):
environment = Environment.SANDBOX
else:
environment = Environment.PRODUCTION
self._verify_notification(bundle_id, app_apple_id, environment)
return decoded_signed_notification
def _verify_notification(self, bundle_id: Optional[str], app_apple_id: Optional[int], environment: Optional[Environment]):
if bundle_id != self._bundle_id or (self._environment == Environment.PRODUCTION and app_apple_id != self._app_apple_id):
raise VerificationException(VerificationStatus.INVALID_APP_IDENTIFIER)
if environment != self._environment:
raise VerificationException(VerificationStatus.INVALID_ENVIRONMENT)
[docs]
def verify_and_decode_app_transaction(self, signed_app_transaction: str) -> AppTransaction:
"""
Verifies and decodes a signed AppTransaction
See https://developer.apple.com/documentation/storekit/apptransaction
:param signed_app_transaction: The signed AppTransaction
:return: The decoded AppTransaction after validation
:throws VerificationException: Thrown if the data could not be verified
"""
decoded_dict = self._decode_signed_object(signed_app_transaction)
decoded_app_transaction = _get_cattrs_converter(AppTransaction).structure(decoded_dict, AppTransaction)
environment = decoded_app_transaction.receiptType
if decoded_app_transaction.bundleId != self._bundle_id or (self._environment == Environment.PRODUCTION and decoded_app_transaction.appAppleId != self._app_apple_id):
raise VerificationException(VerificationStatus.INVALID_APP_IDENTIFIER)
if environment != self._environment:
raise VerificationException(VerificationStatus.INVALID_ENVIRONMENT)
return decoded_app_transaction
def _decode_signed_object(self, signed_obj: str) -> dict:
try:
decoded_jwt = jwt.decode(signed_obj, options={"verify_signature": False})
if self._environment == Environment.XCODE or self._environment == Environment.LOCAL_TESTING:
# Data is not signed by the App Store, and verification should be skipped
# The environment MUST be checked in the public method calling this
return decoded_jwt
unverified_headers: dict = jwt.get_unverified_header(signed_obj)
x5c_header: List[str] = unverified_headers.get("x5c")
if x5c_header is None or len(x5c_header) == 0:
raise Exception("x5c claim was empty")
algorithm_header: str = unverified_headers.get("alg")
if algorithm_header is None or "ES256" != algorithm_header:
raise Exception("Algorithm was not ES256")
signed_date = decoded_jwt.get('signedDate') if decoded_jwt.get('signedDate') is not None else decoded_jwt.get('receiptCreationDate')
effective_date = time.time() if self._enable_online_checks or signed_date is None else int(signed_date) // 1000
signing_key = self._chain_verifier.verify_chain(x5c_header, self._enable_online_checks, effective_date)
return jwt.decode(signed_obj, signing_key, algorithms=["ES256"])
except VerificationException as e:
raise e
except Exception as e:
raise VerificationException(VerificationStatus.VERIFICATION_FAILURE) from e
class _ChainVerifier:
MAXIMUM_CACHE_SIZE = 32 # There are unlikely to be more than a couple keys at once
CACHE_TIME_LIMIT = 15 * 60 # 15 minutes
def __init__(self, root_certificates: List[bytes], enable_strict_checks=True):
self.enable_strict_checks = enable_strict_checks
self.root_certificates = root_certificates
self.verified_certificates_cache: Dict[tuple[str, ...], (str, int)] = {}
def verify_chain(self, certificates: List[str], perform_online_checks: bool, effective_date: int) -> str:
if perform_online_checks and len(certificates) > 0:
cached_public_key = self.get_cached_public_key(certificates)
if cached_public_key is not None:
return cached_public_key
verified_public_key = self._verify_chain_without_caching(certificates=certificates, perform_online_checks=perform_online_checks, effective_date=effective_date)
if perform_online_checks:
self.put_verified_public_key(certificates, verified_public_key)
return verified_public_key
def _verify_chain_without_caching(self, certificates: List[str], perform_online_checks: bool, effective_date: int) -> str:
if len(self.root_certificates) == 0:
raise VerificationException(VerificationStatus.INVALID_CERTIFICATE)
if len(certificates) != 3:
raise VerificationException(VerificationStatus.INVALID_CHAIN_LENGTH)
trusted_store = crypto.X509Store()
try:
for trusted_cert_bytes in self.root_certificates:
trusted_cert = crypto.load_certificate(crypto.FILETYPE_ASN1, trusted_cert_bytes)
trusted_store.add_cert(trusted_cert)
if self.enable_strict_checks:
trusted_store.set_flags(crypto.X509StoreFlags.X509_STRICT)
leaf_cert = crypto.load_certificate(crypto.FILETYPE_ASN1, b64decode(certificates[0], validate=True))
intermediate_cert = crypto.load_certificate(crypto.FILETYPE_ASN1, b64decode(certificates[1], validate=True))
verification_context = crypto.X509StoreContext(trusted_store, leaf_cert, [intermediate_cert])
except Exception as e:
raise VerificationException(VerificationStatus.INVALID_CERTIFICATE) from e
trusted_store.set_time(datetime.datetime.fromtimestamp(effective_date, tz=datetime.timezone.utc))
try:
verification_context.verify_certificate()
trusted_chain = verification_context.get_verified_chain()
except Exception as e:
raise VerificationException(VerificationStatus.VERIFICATION_FAILURE) from e
self.check_oid(trusted_chain[0].to_cryptography(), "1.2.840.113635.100.6.11.1")
self.check_oid(trusted_chain[1].to_cryptography(), "1.2.840.113635.100.6.2.1")
if perform_online_checks:
self.check_ocsp_status(trusted_chain[1], trusted_chain[2], trusted_chain[2])
self.check_ocsp_status(trusted_chain[0], trusted_chain[1], trusted_chain[2])
return (
leaf_cert.to_cryptography()
.public_key()
.public_bytes(encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo)
.decode()
)
def check_oid(self, cert: x509.Certificate, oid: str):
try:
cert.extensions.get_extension_for_oid(x509.ObjectIdentifier(oid))
except Exception as e:
raise VerificationException(VerificationStatus.VERIFICATION_FAILURE) from e
def check_ocsp_status(self, cert: crypto.X509, issuer: crypto.X509, root: crypto.X509):
builder = ocsp.OCSPRequestBuilder()
builder = builder.add_certificate(cert.to_cryptography(), issuer.to_cryptography(), SHA256())
req = builder.build()
authority_values = (
cert.to_cryptography()
.extensions.get_extension_for_oid(x509.oid.ExtensionOID.AUTHORITY_INFORMATION_ACCESS)
.value
)
ocsps = [val for val in authority_values if val.access_method == x509.oid.AuthorityInformationAccessOID.OCSP]
for o in ocsps:
r = requests.post(
o.access_location.value,
headers={"Content-Type": "application/ocsp-request"},
data=req.public_bytes(serialization.Encoding.DER),
)
if r.status_code == 200:
ocsp_resp = ocsp.load_der_ocsp_response(r.content)
if ocsp_resp.response_status == ocsp.OCSPResponseStatus.SUCCESSFUL:
certs = [issuer]
for ocsp_cert in ocsp_resp.certificates:
certs.append(crypto.X509.from_cryptography(ocsp_cert))
# Find signing cert
signing_cert = None
for potential_signing_cert in certs:
if ocsp_resp.responder_key_hash:
subject_public_key_info = (
potential_signing_cert.get_pubkey()
.to_cryptography_key()
.public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
)
decoder = asn1.Decoder()
decoder.start(subject_public_key_info)
decoder.enter()
decoder.read()
_, value = decoder.read()
digest = hashes.Hash(SHA1())
digest.update(value)
if digest.finalize() == ocsp_resp.responder_key_hash:
signing_cert = potential_signing_cert
break
elif ocsp_resp.responder_name:
if ocsp_resp.responder_name == potential_signing_cert.subject.rfc4514_string():
signing_cert = potential_signing_cert
break
if signing_cert is None:
raise VerificationException(VerificationStatus.VERIFICATION_FAILURE)
if signing_cert.to_cryptography().public_bytes(
encoding=serialization.Encoding.DER
) == issuer.to_cryptography().public_bytes(encoding=serialization.Encoding.DER):
# We trust this because it is the issuer
pass
else:
trusted_store = crypto.X509Store()
trusted_store.add_cert(issuer)
trusted_store.add_cert(root) # Apparently a full chain is always needed
verification_context = crypto.X509StoreContext(trusted_store, signing_cert, [])
verification_context.verify_certificate()
if (
oid.ExtendedKeyUsageOID.OCSP_SIGNING
not in signing_cert.to_cryptography()
.extensions.get_extension_for_class(x509.ExtendedKeyUsage)
.value._usages
):
raise VerificationException(VerificationStatus.VERIFICATION_FAILURE)
# Confirm response is signed by signing_certificate
signing_cert.to_cryptography().public_key().verify(
ocsp_resp.signature, ocsp_resp.tbs_response_bytes, ECDSA(ocsp_resp.signature_hash_algorithm)
)
# Get the CertId
for single_response in ocsp_resp.responses:
# Get the cert ID with the provided hashing algorithm (using the request builder wrapper)
builder = ocsp.OCSPRequestBuilder()
builder = builder.add_certificate(
cert.to_cryptography(), issuer.to_cryptography(), single_response.hash_algorithm
)
req = builder.build()
if (
single_response.certificate_status == ocsp.OCSPCertStatus.GOOD
and single_response.serial_number == req.serial_number
and single_response.issuer_key_hash == req.issuer_key_hash
and single_response.issuer_name_hash == req.issuer_name_hash
):
# Success
return
raise VerificationException(VerificationStatus.VERIFICATION_FAILURE)
def get_cached_public_key(self, certificates: List[str]) -> Optional[str]:
verified_public_key = self.verified_certificates_cache.get(tuple(certificates))
if verified_public_key is None:
return None
if verified_public_key[1] <= time.time():
return None
return verified_public_key[0]
def put_verified_public_key(self, certificates: List[str], verified_public_key: str):
cache_expiration = time.time() + _ChainVerifier.CACHE_TIME_LIMIT
self.verified_certificates_cache[tuple(certificates)] = (verified_public_key, cache_expiration)
if len(self.verified_certificates_cache) > _ChainVerifier.MAXIMUM_CACHE_SIZE:
for k, v in list(self.verified_certificates_cache.items()):
if v[1] <= time.time():
del self.verified_certificates_cache[k]
[docs]
class VerificationStatus(IntEnum):
OK = 0
VERIFICATION_FAILURE = 1
INVALID_APP_IDENTIFIER = 2
INVALID_CERTIFICATE = 3
INVALID_CHAIN_LENGTH = 4
INVALID_CHAIN = 5
INVALID_ENVIRONMENT = 6
[docs]
class VerificationException(Exception):
def __init__(self, status: VerificationStatus):
super().__init__("Verification failed with status " + status.name)
self.status = status