| | |
| | |
| | |
| |
|
| | from __future__ import annotations |
| |
|
| | import abc |
| | import datetime |
| | import typing |
| |
|
| | from cryptography import utils, x509 |
| | from cryptography.hazmat.bindings._rust import ocsp |
| | from cryptography.hazmat.primitives import hashes, serialization |
| | from cryptography.hazmat.primitives.asymmetric.types import ( |
| | CertificateIssuerPrivateKeyTypes, |
| | ) |
| | from cryptography.x509.base import ( |
| | _EARLIEST_UTC_TIME, |
| | _convert_to_naive_utc_time, |
| | _reject_duplicate_extension, |
| | ) |
| |
|
| |
|
| | class OCSPResponderEncoding(utils.Enum): |
| | HASH = "By Hash" |
| | NAME = "By Name" |
| |
|
| |
|
| | class OCSPResponseStatus(utils.Enum): |
| | SUCCESSFUL = 0 |
| | MALFORMED_REQUEST = 1 |
| | INTERNAL_ERROR = 2 |
| | TRY_LATER = 3 |
| | SIG_REQUIRED = 5 |
| | UNAUTHORIZED = 6 |
| |
|
| |
|
| | _ALLOWED_HASHES = ( |
| | hashes.SHA1, |
| | hashes.SHA224, |
| | hashes.SHA256, |
| | hashes.SHA384, |
| | hashes.SHA512, |
| | ) |
| |
|
| |
|
| | def _verify_algorithm(algorithm: hashes.HashAlgorithm) -> None: |
| | if not isinstance(algorithm, _ALLOWED_HASHES): |
| | raise ValueError( |
| | "Algorithm must be SHA1, SHA224, SHA256, SHA384, or SHA512" |
| | ) |
| |
|
| |
|
| | class OCSPCertStatus(utils.Enum): |
| | GOOD = 0 |
| | REVOKED = 1 |
| | UNKNOWN = 2 |
| |
|
| |
|
| | class _SingleResponse: |
| | def __init__( |
| | self, |
| | cert: x509.Certificate, |
| | issuer: x509.Certificate, |
| | algorithm: hashes.HashAlgorithm, |
| | cert_status: OCSPCertStatus, |
| | this_update: datetime.datetime, |
| | next_update: datetime.datetime | None, |
| | revocation_time: datetime.datetime | None, |
| | revocation_reason: x509.ReasonFlags | None, |
| | ): |
| | if not isinstance(cert, x509.Certificate) or not isinstance( |
| | issuer, x509.Certificate |
| | ): |
| | raise TypeError("cert and issuer must be a Certificate") |
| |
|
| | _verify_algorithm(algorithm) |
| | if not isinstance(this_update, datetime.datetime): |
| | raise TypeError("this_update must be a datetime object") |
| | if next_update is not None and not isinstance( |
| | next_update, datetime.datetime |
| | ): |
| | raise TypeError("next_update must be a datetime object or None") |
| |
|
| | self._cert = cert |
| | self._issuer = issuer |
| | self._algorithm = algorithm |
| | self._this_update = this_update |
| | self._next_update = next_update |
| |
|
| | if not isinstance(cert_status, OCSPCertStatus): |
| | raise TypeError( |
| | "cert_status must be an item from the OCSPCertStatus enum" |
| | ) |
| | if cert_status is not OCSPCertStatus.REVOKED: |
| | if revocation_time is not None: |
| | raise ValueError( |
| | "revocation_time can only be provided if the certificate " |
| | "is revoked" |
| | ) |
| | if revocation_reason is not None: |
| | raise ValueError( |
| | "revocation_reason can only be provided if the certificate" |
| | " is revoked" |
| | ) |
| | else: |
| | if not isinstance(revocation_time, datetime.datetime): |
| | raise TypeError("revocation_time must be a datetime object") |
| |
|
| | revocation_time = _convert_to_naive_utc_time(revocation_time) |
| | if revocation_time < _EARLIEST_UTC_TIME: |
| | raise ValueError( |
| | "The revocation_time must be on or after" |
| | " 1950 January 1." |
| | ) |
| |
|
| | if revocation_reason is not None and not isinstance( |
| | revocation_reason, x509.ReasonFlags |
| | ): |
| | raise TypeError( |
| | "revocation_reason must be an item from the ReasonFlags " |
| | "enum or None" |
| | ) |
| |
|
| | self._cert_status = cert_status |
| | self._revocation_time = revocation_time |
| | self._revocation_reason = revocation_reason |
| |
|
| |
|
| | class OCSPRequest(metaclass=abc.ABCMeta): |
| | @property |
| | @abc.abstractmethod |
| | def issuer_key_hash(self) -> bytes: |
| | """ |
| | The hash of the issuer public key |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def issuer_name_hash(self) -> bytes: |
| | """ |
| | The hash of the issuer name |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def hash_algorithm(self) -> hashes.HashAlgorithm: |
| | """ |
| | The hash algorithm used in the issuer name and key hashes |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def serial_number(self) -> int: |
| | """ |
| | The serial number of the cert whose status is being checked |
| | """ |
| |
|
| | @abc.abstractmethod |
| | def public_bytes(self, encoding: serialization.Encoding) -> bytes: |
| | """ |
| | Serializes the request to DER |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def extensions(self) -> x509.Extensions: |
| | """ |
| | The list of request extensions. Not single request extensions. |
| | """ |
| |
|
| |
|
| | class OCSPSingleResponse(metaclass=abc.ABCMeta): |
| | @property |
| | @abc.abstractmethod |
| | def certificate_status(self) -> OCSPCertStatus: |
| | """ |
| | The status of the certificate (an element from the OCSPCertStatus enum) |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def revocation_time(self) -> datetime.datetime | None: |
| | """ |
| | The date of when the certificate was revoked or None if not |
| | revoked. |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def revocation_time_utc(self) -> datetime.datetime | None: |
| | """ |
| | The date of when the certificate was revoked or None if not |
| | revoked. Represented as a non-naive UTC datetime. |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def revocation_reason(self) -> x509.ReasonFlags | None: |
| | """ |
| | The reason the certificate was revoked or None if not specified or |
| | not revoked. |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def this_update(self) -> datetime.datetime: |
| | """ |
| | The most recent time at which the status being indicated is known by |
| | the responder to have been correct |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def this_update_utc(self) -> datetime.datetime: |
| | """ |
| | The most recent time at which the status being indicated is known by |
| | the responder to have been correct. Represented as a non-naive UTC |
| | datetime. |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def next_update(self) -> datetime.datetime | None: |
| | """ |
| | The time when newer information will be available |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def next_update_utc(self) -> datetime.datetime | None: |
| | """ |
| | The time when newer information will be available. Represented as a |
| | non-naive UTC datetime. |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def issuer_key_hash(self) -> bytes: |
| | """ |
| | The hash of the issuer public key |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def issuer_name_hash(self) -> bytes: |
| | """ |
| | The hash of the issuer name |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def hash_algorithm(self) -> hashes.HashAlgorithm: |
| | """ |
| | The hash algorithm used in the issuer name and key hashes |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def serial_number(self) -> int: |
| | """ |
| | The serial number of the cert whose status is being checked |
| | """ |
| |
|
| |
|
| | class OCSPResponse(metaclass=abc.ABCMeta): |
| | @property |
| | @abc.abstractmethod |
| | def responses(self) -> typing.Iterator[OCSPSingleResponse]: |
| | """ |
| | An iterator over the individual SINGLERESP structures in the |
| | response |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def response_status(self) -> OCSPResponseStatus: |
| | """ |
| | The status of the response. This is a value from the OCSPResponseStatus |
| | enumeration |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def signature_algorithm_oid(self) -> x509.ObjectIdentifier: |
| | """ |
| | The ObjectIdentifier of the signature algorithm |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def signature_hash_algorithm( |
| | self, |
| | ) -> hashes.HashAlgorithm | None: |
| | """ |
| | Returns a HashAlgorithm corresponding to the type of the digest signed |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def signature(self) -> bytes: |
| | """ |
| | The signature bytes |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def tbs_response_bytes(self) -> bytes: |
| | """ |
| | The tbsResponseData bytes |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def certificates(self) -> list[x509.Certificate]: |
| | """ |
| | A list of certificates used to help build a chain to verify the OCSP |
| | response. This situation occurs when the OCSP responder uses a delegate |
| | certificate. |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def responder_key_hash(self) -> bytes | None: |
| | """ |
| | The responder's key hash or None |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def responder_name(self) -> x509.Name | None: |
| | """ |
| | The responder's Name or None |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def produced_at(self) -> datetime.datetime: |
| | """ |
| | The time the response was produced |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def produced_at_utc(self) -> datetime.datetime: |
| | """ |
| | The time the response was produced. Represented as a non-naive UTC |
| | datetime. |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def certificate_status(self) -> OCSPCertStatus: |
| | """ |
| | The status of the certificate (an element from the OCSPCertStatus enum) |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def revocation_time(self) -> datetime.datetime | None: |
| | """ |
| | The date of when the certificate was revoked or None if not |
| | revoked. |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def revocation_time_utc(self) -> datetime.datetime | None: |
| | """ |
| | The date of when the certificate was revoked or None if not |
| | revoked. Represented as a non-naive UTC datetime. |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def revocation_reason(self) -> x509.ReasonFlags | None: |
| | """ |
| | The reason the certificate was revoked or None if not specified or |
| | not revoked. |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def this_update(self) -> datetime.datetime: |
| | """ |
| | The most recent time at which the status being indicated is known by |
| | the responder to have been correct |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def this_update_utc(self) -> datetime.datetime: |
| | """ |
| | The most recent time at which the status being indicated is known by |
| | the responder to have been correct. Represented as a non-naive UTC |
| | datetime. |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def next_update(self) -> datetime.datetime | None: |
| | """ |
| | The time when newer information will be available |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def next_update_utc(self) -> datetime.datetime | None: |
| | """ |
| | The time when newer information will be available. Represented as a |
| | non-naive UTC datetime. |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def issuer_key_hash(self) -> bytes: |
| | """ |
| | The hash of the issuer public key |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def issuer_name_hash(self) -> bytes: |
| | """ |
| | The hash of the issuer name |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def hash_algorithm(self) -> hashes.HashAlgorithm: |
| | """ |
| | The hash algorithm used in the issuer name and key hashes |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def serial_number(self) -> int: |
| | """ |
| | The serial number of the cert whose status is being checked |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def extensions(self) -> x509.Extensions: |
| | """ |
| | The list of response extensions. Not single response extensions. |
| | """ |
| |
|
| | @property |
| | @abc.abstractmethod |
| | def single_extensions(self) -> x509.Extensions: |
| | """ |
| | The list of single response extensions. Not response extensions. |
| | """ |
| |
|
| | @abc.abstractmethod |
| | def public_bytes(self, encoding: serialization.Encoding) -> bytes: |
| | """ |
| | Serializes the response to DER |
| | """ |
| |
|
| |
|
| | OCSPRequest.register(ocsp.OCSPRequest) |
| | OCSPResponse.register(ocsp.OCSPResponse) |
| | OCSPSingleResponse.register(ocsp.OCSPSingleResponse) |
| |
|
| |
|
| | class OCSPRequestBuilder: |
| | def __init__( |
| | self, |
| | request: tuple[ |
| | x509.Certificate, x509.Certificate, hashes.HashAlgorithm |
| | ] |
| | | None = None, |
| | request_hash: tuple[bytes, bytes, int, hashes.HashAlgorithm] |
| | | None = None, |
| | extensions: list[x509.Extension[x509.ExtensionType]] = [], |
| | ) -> None: |
| | self._request = request |
| | self._request_hash = request_hash |
| | self._extensions = extensions |
| |
|
| | def add_certificate( |
| | self, |
| | cert: x509.Certificate, |
| | issuer: x509.Certificate, |
| | algorithm: hashes.HashAlgorithm, |
| | ) -> OCSPRequestBuilder: |
| | if self._request is not None or self._request_hash is not None: |
| | raise ValueError("Only one certificate can be added to a request") |
| |
|
| | _verify_algorithm(algorithm) |
| | if not isinstance(cert, x509.Certificate) or not isinstance( |
| | issuer, x509.Certificate |
| | ): |
| | raise TypeError("cert and issuer must be a Certificate") |
| |
|
| | return OCSPRequestBuilder( |
| | (cert, issuer, algorithm), self._request_hash, self._extensions |
| | ) |
| |
|
| | def add_certificate_by_hash( |
| | self, |
| | issuer_name_hash: bytes, |
| | issuer_key_hash: bytes, |
| | serial_number: int, |
| | algorithm: hashes.HashAlgorithm, |
| | ) -> OCSPRequestBuilder: |
| | if self._request is not None or self._request_hash is not None: |
| | raise ValueError("Only one certificate can be added to a request") |
| |
|
| | if not isinstance(serial_number, int): |
| | raise TypeError("serial_number must be an integer") |
| |
|
| | _verify_algorithm(algorithm) |
| | utils._check_bytes("issuer_name_hash", issuer_name_hash) |
| | utils._check_bytes("issuer_key_hash", issuer_key_hash) |
| | if algorithm.digest_size != len( |
| | issuer_name_hash |
| | ) or algorithm.digest_size != len(issuer_key_hash): |
| | raise ValueError( |
| | "issuer_name_hash and issuer_key_hash must be the same length " |
| | "as the digest size of the algorithm" |
| | ) |
| |
|
| | return OCSPRequestBuilder( |
| | self._request, |
| | (issuer_name_hash, issuer_key_hash, serial_number, algorithm), |
| | self._extensions, |
| | ) |
| |
|
| | def add_extension( |
| | self, extval: x509.ExtensionType, critical: bool |
| | ) -> OCSPRequestBuilder: |
| | if not isinstance(extval, x509.ExtensionType): |
| | raise TypeError("extension must be an ExtensionType") |
| |
|
| | extension = x509.Extension(extval.oid, critical, extval) |
| | _reject_duplicate_extension(extension, self._extensions) |
| |
|
| | return OCSPRequestBuilder( |
| | self._request, self._request_hash, [*self._extensions, extension] |
| | ) |
| |
|
| | def build(self) -> OCSPRequest: |
| | if self._request is None and self._request_hash is None: |
| | raise ValueError("You must add a certificate before building") |
| |
|
| | return ocsp.create_ocsp_request(self) |
| |
|
| |
|
| | class OCSPResponseBuilder: |
| | def __init__( |
| | self, |
| | response: _SingleResponse | None = None, |
| | responder_id: tuple[x509.Certificate, OCSPResponderEncoding] |
| | | None = None, |
| | certs: list[x509.Certificate] | None = None, |
| | extensions: list[x509.Extension[x509.ExtensionType]] = [], |
| | ): |
| | self._response = response |
| | self._responder_id = responder_id |
| | self._certs = certs |
| | self._extensions = extensions |
| |
|
| | def add_response( |
| | self, |
| | cert: x509.Certificate, |
| | issuer: x509.Certificate, |
| | algorithm: hashes.HashAlgorithm, |
| | cert_status: OCSPCertStatus, |
| | this_update: datetime.datetime, |
| | next_update: datetime.datetime | None, |
| | revocation_time: datetime.datetime | None, |
| | revocation_reason: x509.ReasonFlags | None, |
| | ) -> OCSPResponseBuilder: |
| | if self._response is not None: |
| | raise ValueError("Only one response per OCSPResponse.") |
| |
|
| | singleresp = _SingleResponse( |
| | cert, |
| | issuer, |
| | algorithm, |
| | cert_status, |
| | this_update, |
| | next_update, |
| | revocation_time, |
| | revocation_reason, |
| | ) |
| | return OCSPResponseBuilder( |
| | singleresp, |
| | self._responder_id, |
| | self._certs, |
| | self._extensions, |
| | ) |
| |
|
| | def responder_id( |
| | self, encoding: OCSPResponderEncoding, responder_cert: x509.Certificate |
| | ) -> OCSPResponseBuilder: |
| | if self._responder_id is not None: |
| | raise ValueError("responder_id can only be set once") |
| | if not isinstance(responder_cert, x509.Certificate): |
| | raise TypeError("responder_cert must be a Certificate") |
| | if not isinstance(encoding, OCSPResponderEncoding): |
| | raise TypeError( |
| | "encoding must be an element from OCSPResponderEncoding" |
| | ) |
| |
|
| | return OCSPResponseBuilder( |
| | self._response, |
| | (responder_cert, encoding), |
| | self._certs, |
| | self._extensions, |
| | ) |
| |
|
| | def certificates( |
| | self, certs: typing.Iterable[x509.Certificate] |
| | ) -> OCSPResponseBuilder: |
| | if self._certs is not None: |
| | raise ValueError("certificates may only be set once") |
| | certs = list(certs) |
| | if len(certs) == 0: |
| | raise ValueError("certs must not be an empty list") |
| | if not all(isinstance(x, x509.Certificate) for x in certs): |
| | raise TypeError("certs must be a list of Certificates") |
| | return OCSPResponseBuilder( |
| | self._response, |
| | self._responder_id, |
| | certs, |
| | self._extensions, |
| | ) |
| |
|
| | def add_extension( |
| | self, extval: x509.ExtensionType, critical: bool |
| | ) -> OCSPResponseBuilder: |
| | if not isinstance(extval, x509.ExtensionType): |
| | raise TypeError("extension must be an ExtensionType") |
| |
|
| | extension = x509.Extension(extval.oid, critical, extval) |
| | _reject_duplicate_extension(extension, self._extensions) |
| |
|
| | return OCSPResponseBuilder( |
| | self._response, |
| | self._responder_id, |
| | self._certs, |
| | [*self._extensions, extension], |
| | ) |
| |
|
| | def sign( |
| | self, |
| | private_key: CertificateIssuerPrivateKeyTypes, |
| | algorithm: hashes.HashAlgorithm | None, |
| | ) -> OCSPResponse: |
| | if self._response is None: |
| | raise ValueError("You must add a response before signing") |
| | if self._responder_id is None: |
| | raise ValueError("You must add a responder_id before signing") |
| |
|
| | return ocsp.create_ocsp_response( |
| | OCSPResponseStatus.SUCCESSFUL, self, private_key, algorithm |
| | ) |
| |
|
| | @classmethod |
| | def build_unsuccessful( |
| | cls, response_status: OCSPResponseStatus |
| | ) -> OCSPResponse: |
| | if not isinstance(response_status, OCSPResponseStatus): |
| | raise TypeError( |
| | "response_status must be an item from OCSPResponseStatus" |
| | ) |
| | if response_status is OCSPResponseStatus.SUCCESSFUL: |
| | raise ValueError("response_status cannot be SUCCESSFUL") |
| |
|
| | return ocsp.create_ocsp_response(response_status, None, None, None) |
| |
|
| |
|
| | load_der_ocsp_request = ocsp.load_der_ocsp_request |
| | load_der_ocsp_response = ocsp.load_der_ocsp_response |
| |
|