aboutsummaryrefslogtreecommitdiff
path: root/src/OpenSSL/crypto.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/OpenSSL/crypto.py')
-rw-r--r--src/OpenSSL/crypto.py536
1 files changed, 333 insertions, 203 deletions
diff --git a/src/OpenSSL/crypto.py b/src/OpenSSL/crypto.py
index 12e92df..4265525 100644
--- a/src/OpenSSL/crypto.py
+++ b/src/OpenSSL/crypto.py
@@ -1,3 +1,4 @@
+import calendar
import datetime
from base64 import b16encode
@@ -7,11 +8,11 @@ from operator import __eq__, __ne__, __lt__, __le__, __gt__, __ge__
from six import (
integer_types as _integer_types,
text_type as _text_type,
- PY3 as _PY3)
+ PY2 as _PY2,
+)
-from cryptography import x509
+from cryptography import utils, x509
from cryptography.hazmat.primitives.asymmetric import dsa, rsa
-from cryptography.utils import deprecated
from OpenSSL._util import (
ffi as _ffi,
@@ -19,48 +20,49 @@ from OpenSSL._util import (
exception_from_error_queue as _exception_from_error_queue,
byte_string as _byte_string,
native as _native,
+ path_string as _path_string,
UNSPECIFIED as _UNSPECIFIED,
text_to_bytes_and_warn as _text_to_bytes_and_warn,
make_assert as _make_assert,
)
__all__ = [
- 'FILETYPE_PEM',
- 'FILETYPE_ASN1',
- 'FILETYPE_TEXT',
- 'TYPE_RSA',
- 'TYPE_DSA',
- 'Error',
- 'PKey',
- 'get_elliptic_curves',
- 'get_elliptic_curve',
- 'X509Name',
- 'X509Extension',
- 'X509Req',
- 'X509',
- 'X509StoreFlags',
- 'X509Store',
- 'X509StoreContextError',
- 'X509StoreContext',
- 'load_certificate',
- 'dump_certificate',
- 'dump_publickey',
- 'dump_privatekey',
- 'Revoked',
- 'CRL',
- 'PKCS7',
- 'PKCS12',
- 'NetscapeSPKI',
- 'load_publickey',
- 'load_privatekey',
- 'dump_certificate_request',
- 'load_certificate_request',
- 'sign',
- 'verify',
- 'dump_crl',
- 'load_crl',
- 'load_pkcs7_data',
- 'load_pkcs12'
+ "FILETYPE_PEM",
+ "FILETYPE_ASN1",
+ "FILETYPE_TEXT",
+ "TYPE_RSA",
+ "TYPE_DSA",
+ "Error",
+ "PKey",
+ "get_elliptic_curves",
+ "get_elliptic_curve",
+ "X509Name",
+ "X509Extension",
+ "X509Req",
+ "X509",
+ "X509StoreFlags",
+ "X509Store",
+ "X509StoreContextError",
+ "X509StoreContext",
+ "load_certificate",
+ "dump_certificate",
+ "dump_publickey",
+ "dump_privatekey",
+ "Revoked",
+ "CRL",
+ "PKCS7",
+ "PKCS12",
+ "NetscapeSPKI",
+ "load_publickey",
+ "load_privatekey",
+ "dump_certificate_request",
+ "load_certificate_request",
+ "sign",
+ "verify",
+ "dump_crl",
+ "load_crl",
+ "load_pkcs7_data",
+ "load_pkcs12",
]
FILETYPE_PEM = _lib.SSL_FILETYPE_PEM
@@ -94,6 +96,7 @@ def _get_backend():
triggering this side effect unless _get_backend is called.
"""
from cryptography.hazmat.backends.openssl.backend import backend
+
return backend
@@ -136,7 +139,7 @@ def _bio_to_string(bio):
"""
Copy the contents of an OpenSSL BIO object into a Python byte string.
"""
- result_buffer = _ffi.new('char**')
+ result_buffer = _ffi.new("char**")
buffer_length = _lib.BIO_get_mem_data(bio, result_buffer)
return _ffi.buffer(result_buffer[0], buffer_length)[:]
@@ -173,7 +176,7 @@ def _get_asn1_time(timestamp):
@return: The time value from C{timestamp} as a L{bytes} string in a certain
format. Or C{None} if the object contains no time value.
"""
- string_timestamp = _ffi.cast('ASN1_STRING*', timestamp)
+ string_timestamp = _ffi.cast("ASN1_STRING*", timestamp)
if _lib.ASN1_STRING_length(string_timestamp) == 0:
return None
elif (
@@ -196,7 +199,8 @@ def _get_asn1_time(timestamp):
_untested_error("ASN1_TIME_to_generalizedtime")
else:
string_timestamp = _ffi.cast(
- "ASN1_STRING*", generalized_timestamp[0])
+ "ASN1_STRING*", generalized_timestamp[0]
+ )
string_data = _lib.ASN1_STRING_data(string_timestamp)
string_result = _ffi.string(string_data)
_lib.ASN1_GENERALIZEDTIME_free(generalized_timestamp[0])
@@ -220,6 +224,7 @@ class PKey(object):
"""
A class representing an DSA or RSA public key or key pair.
"""
+
_only_public = False
_initialized = True
@@ -258,8 +263,15 @@ class PKey(object):
.. versionadded:: 16.1.0
"""
pkey = cls()
- if not isinstance(crypto_key, (rsa.RSAPublicKey, rsa.RSAPrivateKey,
- dsa.DSAPublicKey, dsa.DSAPrivateKey)):
+ if not isinstance(
+ crypto_key,
+ (
+ rsa.RSAPublicKey,
+ rsa.RSAPrivateKey,
+ dsa.DSAPublicKey,
+ dsa.DSAPrivateKey,
+ ),
+ ):
raise TypeError("Unsupported key type")
pkey._pkey = crypto_key._evp_pkey
@@ -346,7 +358,7 @@ class PKey(object):
rsa = _lib.EVP_PKEY_get1_RSA(self._pkey)
rsa = _ffi.gc(rsa, _lib.RSA_free)
result = _lib.RSA_check_key(rsa)
- if result:
+ if result == 1:
return True
_raise_current_error()
@@ -367,13 +379,6 @@ class PKey(object):
return _lib.EVP_PKEY_bits(self._pkey)
-PKeyType = deprecated(
- PKey, __name__,
- "PKeyType has been deprecated, use PKey instead",
- DeprecationWarning
-)
-
-
class _EllipticCurve(object):
"""
A representation of a supported elliptic curve.
@@ -383,10 +388,11 @@ class _EllipticCurve(object):
instances each of which represents one curve supported by the system.
@type _curves: :py:type:`NoneType` or :py:type:`set`
"""
+
_curves = None
- if _PY3:
- # This only necessary on Python 3. Morever, it is broken on Python 2.
+ if not _PY2:
+ # This only necessary on Python 3. Moreover, it is broken on Python 2.
def __ne__(self, other):
"""
Implement cooperation with the right-hand side argument of ``!=``.
@@ -409,14 +415,12 @@ class _EllipticCurve(object):
elliptic curves the underlying library supports.
"""
num_curves = lib.EC_get_builtin_curves(_ffi.NULL, 0)
- builtin_curves = _ffi.new('EC_builtin_curve[]', num_curves)
+ builtin_curves = _ffi.new("EC_builtin_curve[]", num_curves)
# The return value on this call should be num_curves again. We
# could check it to make sure but if it *isn't* then.. what could
# we do? Abort the whole process, I suppose...? -exarkun
lib.EC_get_builtin_curves(builtin_curves, num_curves)
- return set(
- cls.from_nid(lib, c.nid)
- for c in builtin_curves)
+ return set(cls.from_nid(lib, c.nid) for c in builtin_curves)
@classmethod
def _get_elliptic_curves(cls, lib):
@@ -549,14 +553,16 @@ class X509Name(object):
self._name = _ffi.gc(name, _lib.X509_NAME_free)
def __setattr__(self, name, value):
- if name.startswith('_'):
+ if name.startswith("_"):
return super(X509Name, self).__setattr__(name, value)
# Note: we really do not want str subclasses here, so we do not use
# isinstance.
if type(name) is not str:
- raise TypeError("attribute name must be string, not '%.200s'" % (
- type(value).__name__,))
+ raise TypeError(
+ "attribute name must be string, not '%.200s'"
+ % (type(value).__name__,)
+ )
nid = _lib.OBJ_txt2nid(_byte_string(name))
if nid == _lib.NID_undef:
@@ -577,10 +583,11 @@ class X509Name(object):
break
if isinstance(value, _text_type):
- value = value.encode('utf-8')
+ value = value.encode("utf-8")
add_result = _lib.X509_NAME_add_entry_by_NID(
- self._name, nid, _lib.MBSTRING_UTF8, value, -1, -1, 0)
+ self._name, nid, _lib.MBSTRING_UTF8, value, -1, -1, 0
+ )
if not add_result:
_raise_current_error()
@@ -616,9 +623,9 @@ class X509Name(object):
_openssl_assert(data_length >= 0)
try:
- result = _ffi.buffer(
- result_buffer[0], data_length
- )[:].decode('utf-8')
+ result = _ffi.buffer(result_buffer[0], data_length)[:].decode(
+ "utf-8"
+ )
finally:
# XXX untested
_lib.OPENSSL_free(result_buffer[0])
@@ -630,6 +637,7 @@ class X509Name(object):
return NotImplemented
result = _lib.X509_NAME_cmp(self._name, other._name)
return op(result, 0)
+
return f
__eq__ = _cmp(__eq__)
@@ -647,11 +655,13 @@ class X509Name(object):
"""
result_buffer = _ffi.new("char[]", 512)
format_result = _lib.X509_NAME_oneline(
- self._name, result_buffer, len(result_buffer))
+ self._name, result_buffer, len(result_buffer)
+ )
_openssl_assert(format_result != _ffi.NULL)
return "<X509Name object '%s'>" % (
- _native(_ffi.string(result_buffer)),)
+ _native(_ffi.string(result_buffer)),
+ )
def hash(self):
"""
@@ -672,7 +682,7 @@ class X509Name(object):
:return: The DER encoded form of this name.
:rtype: :py:class:`bytes`
"""
- result_buffer = _ffi.new('unsigned char**')
+ result_buffer = _ffi.new("unsigned char**")
encode_result = _lib.i2d_X509_NAME(self._name, result_buffer)
_openssl_assert(encode_result >= 0)
@@ -699,20 +709,14 @@ class X509Name(object):
# ffi.string does not handle strings containing NULL bytes
# (which may have been generated by old, broken software)
- value = _ffi.buffer(_lib.ASN1_STRING_data(fval),
- _lib.ASN1_STRING_length(fval))[:]
+ value = _ffi.buffer(
+ _lib.ASN1_STRING_data(fval), _lib.ASN1_STRING_length(fval)
+ )[:]
result.append((_ffi.string(name), value))
return result
-X509NameType = deprecated(
- X509Name, __name__,
- "X509NameType has been deprecated, use X509Name instead",
- DeprecationWarning
-)
-
-
class X509Extension(object):
"""
An X.509 v3 certificate extension.
@@ -808,7 +812,8 @@ class X509Extension(object):
parts.append(_native(_bio_to_string(bio)))
else:
value = _native(
- _ffi.buffer(name.d.ia5.data, name.d.ia5.length)[:])
+ _ffi.buffer(name.d.ia5.data, name.d.ia5.length)[:]
+ )
parts.append(label + ":" + value)
return ", ".join(parts)
@@ -858,19 +863,12 @@ class X509Extension(object):
.. versionadded:: 0.12
"""
octet_result = _lib.X509_EXTENSION_get_data(self._extension)
- string_result = _ffi.cast('ASN1_STRING*', octet_result)
+ string_result = _ffi.cast("ASN1_STRING*", octet_result)
char_result = _lib.ASN1_STRING_data(string_result)
result_length = _lib.ASN1_STRING_length(string_result)
return _ffi.buffer(char_result, result_length)[:]
-X509ExtensionType = deprecated(
- X509Extension, __name__,
- "X509ExtensionType has been deprecated, use X509Extension instead",
- DeprecationWarning
-)
-
-
class X509Req(object):
"""
An X.509 certificate signing requests.
@@ -891,8 +889,9 @@ class X509Req(object):
.. versionadded:: 17.1.0
"""
from cryptography.hazmat.backends.openssl.x509 import (
- _CertificateSigningRequest
+ _CertificateSigningRequest,
)
+
backend = _get_backend()
return _CertificateSigningRequest(backend, self._req)
@@ -1018,9 +1017,20 @@ class X509Req(object):
"""
exts = []
native_exts_obj = _lib.X509_REQ_get_extensions(self._req)
+ native_exts_obj = _ffi.gc(
+ native_exts_obj,
+ lambda x: _lib.sk_X509_EXTENSION_pop_free(
+ x,
+ _ffi.addressof(_lib._original_lib, "X509_EXTENSION_free"),
+ ),
+ )
+
for i in range(_lib.sk_X509_EXTENSION_num(native_exts_obj)):
ext = X509Extension.__new__(X509Extension)
- ext._extension = _lib.sk_X509_EXTENSION_value(native_exts_obj, i)
+ extension = _lib.X509_EXTENSION_dup(
+ _lib.sk_X509_EXTENSION_value(native_exts_obj, i)
+ )
+ ext._extension = _ffi.gc(extension, _lib.X509_EXTENSION_free)
exts.append(ext)
return exts
@@ -1070,17 +1080,11 @@ class X509Req(object):
return result
-X509ReqType = deprecated(
- X509Req, __name__,
- "X509ReqType has been deprecated, use X509Req instead",
- DeprecationWarning
-)
-
-
class X509(object):
"""
An X.509 certificate.
"""
+
def __init__(self):
x509 = _lib.X509_new()
_openssl_assert(x509 != _ffi.NULL)
@@ -1106,6 +1110,7 @@ class X509(object):
.. versionadded:: 17.1.0
"""
from cryptography.hazmat.backends.openssl.x509 import _Certificate
+
backend = _get_backend()
return _Certificate(backend, self._x509)
@@ -1247,12 +1252,16 @@ class X509(object):
result_length[0] = len(result_buffer)
digest_result = _lib.X509_digest(
- self._x509, digest, result_buffer, result_length)
+ self._x509, digest, result_buffer, result_length
+ )
_openssl_assert(digest_result == 1)
- return b":".join([
- b16encode(ch).upper() for ch
- in _ffi.buffer(result_buffer, result_length[0])])
+ return b":".join(
+ [
+ b16encode(ch).upper()
+ for ch in _ffi.buffer(result_buffer, result_length[0])
+ ]
+ )
def subject_name_hash(self):
"""
@@ -1277,7 +1286,7 @@ class X509(object):
hex_serial = hex(serial)[2:]
if not isinstance(hex_serial, bytes):
- hex_serial = hex_serial.encode('ascii')
+ hex_serial = hex_serial.encode("ascii")
bignum_serial = _ffi.new("BIGNUM**")
@@ -1288,7 +1297,8 @@ class X509(object):
if bignum_serial[0] == _ffi.NULL:
set_result = _lib.ASN1_INTEGER_set(
- _lib.X509_get_serialNumber(self._x509), small_serial)
+ _lib.X509_get_serialNumber(self._x509), small_serial
+ )
if set_result:
# TODO Not tested
_raise_current_error()
@@ -1333,7 +1343,7 @@ class X509(object):
if not isinstance(amount, int):
raise TypeError("amount must be an integer")
- notAfter = _lib.X509_get_notAfter(self._x509)
+ notAfter = _lib.X509_getm_notAfter(self._x509)
_lib.X509_gmtime_adj(notAfter, amount)
def gmtime_adj_notBefore(self, amount):
@@ -1346,7 +1356,7 @@ class X509(object):
if not isinstance(amount, int):
raise TypeError("amount must be an integer")
- notBefore = _lib.X509_get_notBefore(self._x509)
+ notBefore = _lib.X509_getm_notBefore(self._x509)
_lib.X509_gmtime_adj(notBefore, amount)
def has_expired(self):
@@ -1375,7 +1385,7 @@ class X509(object):
:return: A timestamp string, or ``None`` if there is none.
:rtype: bytes or NoneType
"""
- return self._get_boundary_time(_lib.X509_get_notBefore)
+ return self._get_boundary_time(_lib.X509_getm_notBefore)
def _set_boundary_time(self, which, when):
return _set_asn1_time(which(self._x509), when)
@@ -1391,7 +1401,7 @@ class X509(object):
:param bytes when: A timestamp string.
:return: ``None``
"""
- return self._set_boundary_time(_lib.X509_get_notBefore, when)
+ return self._set_boundary_time(_lib.X509_getm_notBefore, when)
def get_notAfter(self):
"""
@@ -1404,7 +1414,7 @@ class X509(object):
:return: A timestamp string, or ``None`` if there is none.
:rtype: bytes or NoneType
"""
- return self._get_boundary_time(_lib.X509_get_notAfter)
+ return self._get_boundary_time(_lib.X509_getm_notAfter)
def set_notAfter(self, when):
"""
@@ -1417,7 +1427,7 @@ class X509(object):
:param bytes when: A timestamp string.
:return: ``None``
"""
- return self._set_boundary_time(_lib.X509_get_notAfter, when)
+ return self._set_boundary_time(_lib.X509_getm_notAfter, when)
def _get_name(self, which):
name = X509Name.__new__(X509Name)
@@ -1543,13 +1553,6 @@ class X509(object):
return ext
-X509Type = deprecated(
- X509, __name__,
- "X509Type has been deprecated, use X509 instead",
- DeprecationWarning
-)
-
-
class X509StoreFlags(object):
"""
Flags for X509 verification, used to change the behavior of
@@ -1560,6 +1563,7 @@ class X509StoreFlags(object):
.. _OpenSSL Verification Flags:
https://www.openssl.org/docs/manmaster/man3/X509_VERIFY_PARAM_set_flags.html
"""
+
CRL_CHECK = _lib.X509_V_FLAG_CRL_CHECK
CRL_CHECK_ALL = _lib.X509_V_FLAG_CRL_CHECK_ALL
IGNORE_CRITICAL = _lib.X509_V_FLAG_IGNORE_CRITICAL
@@ -1610,16 +1614,8 @@ class X509Store(object):
if not isinstance(cert, X509):
raise TypeError()
- # As of OpenSSL 1.1.0i adding the same cert to the store more than
- # once doesn't cause an error. Accordingly, this code now silences
- # the error for OpenSSL < 1.1.0i as well.
- if _lib.X509_STORE_add_cert(self._store, cert._x509) == 0:
- code = _lib.ERR_peek_error()
- err_reason = _lib.ERR_GET_REASON(code)
- _openssl_assert(
- err_reason == _lib.X509_R_CERT_ALREADY_IN_HASH_TABLE
- )
- _lib.ERR_clear_error()
+ res = _lib.X509_STORE_add_cert(self._store, cert._x509)
+ _openssl_assert(res == 1)
def add_crl(self, crl):
"""
@@ -1680,15 +1676,57 @@ class X509Store(object):
param = _lib.X509_VERIFY_PARAM_new()
param = _ffi.gc(param, _lib.X509_VERIFY_PARAM_free)
- _lib.X509_VERIFY_PARAM_set_time(param, int(vfy_time.strftime('%s')))
+ _lib.X509_VERIFY_PARAM_set_time(
+ param, calendar.timegm(vfy_time.timetuple())
+ )
_openssl_assert(_lib.X509_STORE_set1_param(self._store, param) != 0)
+ def load_locations(self, cafile, capath=None):
+ """
+ Let X509Store know where we can find trusted certificates for the
+ certificate chain. Note that the certificates have to be in PEM
+ format.
-X509StoreType = deprecated(
- X509Store, __name__,
- "X509StoreType has been deprecated, use X509Store instead",
- DeprecationWarning
-)
+ If *capath* is passed, it must be a directory prepared using the
+ ``c_rehash`` tool included with OpenSSL. Either, but not both, of
+ *cafile* or *capath* may be ``None``.
+
+ .. note::
+
+ Both *cafile* and *capath* may be set simultaneously.
+
+ Call this method multiple times to add more than one location.
+ For example, CA certificates, and certificate revocation list bundles
+ may be passed in *cafile* in subsequent calls to this method.
+
+ .. versionadded:: 20.0
+
+ :param cafile: In which file we can find the certificates (``bytes`` or
+ ``unicode``).
+ :param capath: In which directory we can find the certificates
+ (``bytes`` or ``unicode``).
+
+ :return: ``None`` if the locations were set successfully.
+
+ :raises OpenSSL.crypto.Error: If both *cafile* and *capath* is ``None``
+ or the locations could not be set for any reason.
+
+ """
+ if cafile is None:
+ cafile = _ffi.NULL
+ else:
+ cafile = _path_string(cafile)
+
+ if capath is None:
+ capath = _ffi.NULL
+ else:
+ capath = _path_string(capath)
+
+ load_result = _lib.X509_STORE_load_locations(
+ self._store, cafile, capath
+ )
+ if not load_result:
+ _raise_current_error()
class X509StoreContextError(Exception):
@@ -1718,21 +1756,54 @@ class X509StoreContext(object):
collected.
:ivar _store: See the ``store`` ``__init__`` parameter.
:ivar _cert: See the ``certificate`` ``__init__`` parameter.
+ :ivar _chain: See the ``chain`` ``__init__`` parameter.
:param X509Store store: The certificates which will be trusted for the
purposes of any verifications.
:param X509 certificate: The certificate to be verified.
+ :param chain: List of untrusted certificates that may be used for building
+ the certificate chain. May be ``None``.
+ :type chain: :class:`list` of :class:`X509`
"""
- def __init__(self, store, certificate):
+ def __init__(self, store, certificate, chain=None):
store_ctx = _lib.X509_STORE_CTX_new()
self._store_ctx = _ffi.gc(store_ctx, _lib.X509_STORE_CTX_free)
self._store = store
self._cert = certificate
+ self._chain = self._build_certificate_stack(chain)
# Make the store context available for use after instantiating this
# class by initializing it now. Per testing, subsequent calls to
# :meth:`_init` have no adverse affect.
self._init()
+ @staticmethod
+ def _build_certificate_stack(certificates):
+ def cleanup(s):
+ # Equivalent to sk_X509_pop_free, but we don't
+ # currently have a CFFI binding for that available
+ for i in range(_lib.sk_X509_num(s)):
+ x = _lib.sk_X509_value(s, i)
+ _lib.X509_free(x)
+ _lib.sk_X509_free(s)
+
+ if certificates is None or len(certificates) == 0:
+ return _ffi.NULL
+
+ stack = _lib.sk_X509_new_null()
+ _openssl_assert(stack != _ffi.NULL)
+ stack = _ffi.gc(stack, cleanup)
+
+ for cert in certificates:
+ if not isinstance(cert, X509):
+ raise TypeError("One of the elements is not an X509 instance")
+
+ _openssl_assert(_lib.X509_up_ref(cert._x509) > 0)
+ if _lib.sk_X509_push(stack, cert._x509) <= 0:
+ _lib.X509_free(cert._x509)
+ _raise_current_error()
+
+ return stack
+
def _init(self):
"""
Set up the store context for a subsequent verification operation.
@@ -1741,7 +1812,7 @@ class X509StoreContext(object):
:meth:`_cleanup` will leak memory.
"""
ret = _lib.X509_STORE_CTX_init(
- self._store_ctx, self._store._store, self._cert._x509, _ffi.NULL
+ self._store_ctx, self._store._store, self._cert._x509, self._chain
)
if ret <= 0:
_raise_current_error()
@@ -1765,8 +1836,13 @@ class X509StoreContext(object):
errors = [
_lib.X509_STORE_CTX_get_error(self._store_ctx),
_lib.X509_STORE_CTX_get_error_depth(self._store_ctx),
- _native(_ffi.string(_lib.X509_verify_cert_error_string(
- _lib.X509_STORE_CTX_get_error(self._store_ctx)))),
+ _native(
+ _ffi.string(
+ _lib.X509_verify_cert_error_string(
+ _lib.X509_STORE_CTX_get_error(self._store_ctx)
+ )
+ )
+ ),
]
# A context error should always be associated with a certificate, so we
# expect this call to never return :class:`None`.
@@ -1808,6 +1884,45 @@ class X509StoreContext(object):
if ret <= 0:
raise self._exception_from_context()
+ def get_verified_chain(self):
+ """
+ Verify a certificate in a context and return the complete validated
+ chain.
+
+ :raises X509StoreContextError: If an error occurred when validating a
+ certificate in the context. Sets ``certificate`` attribute to
+ indicate which certificate caused the error.
+
+ .. versionadded:: 20.0
+ """
+ # Always re-initialize the store context in case
+ # :meth:`verify_certificate` is called multiple times.
+ #
+ # :meth:`_init` is called in :meth:`__init__` so _cleanup is called
+ # before _init to ensure memory is not leaked.
+ self._cleanup()
+ self._init()
+ ret = _lib.X509_verify_cert(self._store_ctx)
+ if ret <= 0:
+ self._cleanup()
+ raise self._exception_from_context()
+
+ # Note: X509_STORE_CTX_get1_chain returns a deep copy of the chain.
+ cert_stack = _lib.X509_STORE_CTX_get1_chain(self._store_ctx)
+ _openssl_assert(cert_stack != _ffi.NULL)
+
+ result = []
+ for i in range(_lib.sk_X509_num(cert_stack)):
+ cert = _lib.sk_X509_value(cert_stack, i)
+ _openssl_assert(cert != _ffi.NULL)
+ pycert = X509._from_raw_x509_ptr(cert)
+ result.append(pycert)
+
+ # Free the stack but not the members which are freed by the X509 class.
+ _lib.sk_X509_free(cert_stack)
+ self._cleanup()
+ return result
+
def load_certificate(type, buffer):
"""
@@ -1830,8 +1945,7 @@ def load_certificate(type, buffer):
elif type == FILETYPE_ASN1:
x509 = _lib.d2i_X509_bio(bio, _ffi.NULL)
else:
- raise ValueError(
- "type argument must be FILETYPE_PEM or FILETYPE_ASN1")
+ raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
if x509 == _ffi.NULL:
_raise_current_error()
@@ -1860,9 +1974,10 @@ def dump_certificate(type, cert):
else:
raise ValueError(
"type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
- "FILETYPE_TEXT")
+ "FILETYPE_TEXT"
+ )
- assert result_code == 1
+ _openssl_assert(result_code == 1)
return _bio_to_string(bio)
@@ -1916,7 +2031,8 @@ def dump_privatekey(type, pkey, cipher=None, passphrase=None):
if passphrase is None:
raise TypeError(
"if a value is given for cipher "
- "one must also be given for passphrase")
+ "one must also be given for passphrase"
+ )
cipher_obj = _lib.EVP_get_cipherbyname(_byte_string(cipher))
if cipher_obj == _ffi.NULL:
raise ValueError("Invalid cipher name")
@@ -1926,8 +2042,14 @@ def dump_privatekey(type, pkey, cipher=None, passphrase=None):
helper = _PassphraseHelper(type, passphrase)
if type == FILETYPE_PEM:
result_code = _lib.PEM_write_bio_PrivateKey(
- bio, pkey._pkey, cipher_obj, _ffi.NULL, 0,
- helper.callback, helper.callback_args)
+ bio,
+ pkey._pkey,
+ cipher_obj,
+ _ffi.NULL,
+ 0,
+ helper.callback,
+ helper.callback_args,
+ )
helper.raise_if_problem()
elif type == FILETYPE_ASN1:
result_code = _lib.i2d_PrivateKey_bio(bio, pkey._pkey)
@@ -1935,15 +2057,13 @@ def dump_privatekey(type, pkey, cipher=None, passphrase=None):
if _lib.EVP_PKEY_id(pkey._pkey) != _lib.EVP_PKEY_RSA:
raise TypeError("Only RSA keys are supported for FILETYPE_TEXT")
- rsa = _ffi.gc(
- _lib.EVP_PKEY_get1_RSA(pkey._pkey),
- _lib.RSA_free
- )
+ rsa = _ffi.gc(_lib.EVP_PKEY_get1_RSA(pkey._pkey), _lib.RSA_free)
result_code = _lib.RSA_print(bio, rsa, 0)
else:
raise ValueError(
"type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
- "FILETYPE_TEXT")
+ "FILETYPE_TEXT"
+ )
_openssl_assert(result_code != 0)
@@ -1954,6 +2074,7 @@ class Revoked(object):
"""
A certificate revocation.
"""
+
# https://www.openssl.org/docs/manmaster/man5/x509v3_config.html#CRL-distribution-points
# which differs from crl_reasons of crypto/x509v3/v3_enum.c that matches
# OCSP_crl_reason_str. We use the latter, just like the command line
@@ -1993,7 +2114,8 @@ class Revoked(object):
asn1_serial = _ffi.gc(
_lib.BN_to_ASN1_INTEGER(bignum_serial, _ffi.NULL),
- _lib.ASN1_INTEGER_free)
+ _lib.ASN1_INTEGER_free,
+ )
_lib.X509_REVOKED_set_serialNumber(self._revoked, asn1_serial)
def get_serial(self):
@@ -2044,7 +2166,7 @@ class Revoked(object):
elif not isinstance(reason, bytes):
raise TypeError("reason must be None or a byte string")
else:
- reason = reason.lower().replace(b' ', b'')
+ reason = reason.lower().replace(b" ", b"")
reason_code = [r.lower() for r in self._crl_reasons].index(reason)
new_reason_ext = _lib.ASN1_ENUMERATED_new()
@@ -2056,7 +2178,8 @@ class Revoked(object):
self._delete_reason()
add_result = _lib.X509_REVOKED_add1_ext_i2d(
- self._revoked, _lib.NID_crl_reason, new_reason_ext, 0, 0)
+ self._revoked, _lib.NID_crl_reason, new_reason_ext, 0, 0
+ )
_openssl_assert(add_result == 1)
def get_reason(self):
@@ -2138,8 +2261,9 @@ class CRL(object):
.. versionadded:: 17.1.0
"""
from cryptography.hazmat.backends.openssl.x509 import (
- _CertificateRevocationList
+ _CertificateRevocationList,
)
+
backend = _get_backend()
return _CertificateRevocationList(backend, self._crl)
@@ -2246,7 +2370,7 @@ class CRL(object):
def set_nextUpdate(self, when):
"""
- Set when the CRL will next be udpated.
+ Set when the CRL will next be updated.
The timestamp is formatted as an ASN.1 TIME::
@@ -2279,13 +2403,15 @@ class CRL(object):
digest_obj = _lib.EVP_get_digestbyname(digest)
_openssl_assert(digest_obj != _ffi.NULL)
_lib.X509_CRL_set_issuer_name(
- self._crl, _lib.X509_get_subject_name(issuer_cert._x509))
+ self._crl, _lib.X509_get_subject_name(issuer_cert._x509)
+ )
_lib.X509_CRL_sort(self._crl)
result = _lib.X509_CRL_sign(self._crl, issuer_key._pkey, digest_obj)
_openssl_assert(result != 0)
- def export(self, cert, key, type=FILETYPE_PEM, days=100,
- digest=_UNSPECIFIED):
+ def export(
+ self, cert, key, type=FILETYPE_PEM, days=100, digest=_UNSPECIFIED
+ ):
"""
Export the CRL as a string.
@@ -2295,7 +2421,7 @@ class CRL(object):
:data:`FILETYPE_ASN1`, or :data:`FILETYPE_TEXT`.
:param int days: The number of days until the next update of this CRL.
:param bytes digest: The name of the message digest to use (eg
- ``b"sha2566"``).
+ ``b"sha256"``).
:rtype: bytes
"""
@@ -2338,13 +2464,6 @@ class CRL(object):
return dump_crl(type, self)
-CRLType = deprecated(
- CRL, __name__,
- "CRLType has been deprecated, use CRL instead",
- DeprecationWarning
-)
-
-
class PKCS7(object):
def type_is_signed(self):
"""
@@ -2389,13 +2508,6 @@ class PKCS7(object):
return _ffi.string(string_type)
-PKCS7Type = deprecated(
- PKCS7, __name__,
- "PKCS7Type has been deprecated, use PKCS7 instead",
- DeprecationWarning
-)
-
-
class PKCS12(object):
"""
A PKCS #12 archive.
@@ -2557,10 +2669,17 @@ class PKCS12(object):
cert = self._cert._x509
pkcs12 = _lib.PKCS12_create(
- passphrase, friendlyname, pkey, cert, cacerts,
+ passphrase,
+ friendlyname,
+ pkey,
+ cert,
+ cacerts,
_lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC,
_lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC,
- iter, maciter, 0)
+ iter,
+ maciter,
+ 0,
+ )
if pkcs12 == _ffi.NULL:
_raise_current_error()
pkcs12 = _ffi.gc(pkcs12, _lib.PKCS12_free)
@@ -2570,13 +2689,6 @@ class PKCS12(object):
return _bio_to_string(bio)
-PKCS12Type = deprecated(
- PKCS12, __name__,
- "PKCS12Type has been deprecated, use PKCS12 instead",
- DeprecationWarning
-)
-
-
class NetscapeSPKI(object):
"""
A Netscape SPKI object.
@@ -2667,13 +2779,6 @@ class NetscapeSPKI(object):
_openssl_assert(set_result == 1)
-NetscapeSPKIType = deprecated(
- NetscapeSPKI, __name__,
- "NetscapeSPKIType has been deprecated, use NetscapeSPKI instead",
- DeprecationWarning
-)
-
-
class _PassphraseHelper(object):
def __init__(self, type, passphrase, more_args=False, truncate=False):
if type != FILETYPE_PEM and passphrase is not None:
@@ -2689,9 +2794,7 @@ class _PassphraseHelper(object):
def callback(self):
if self._passphrase is None:
return _ffi.NULL
- elif isinstance(self._passphrase, bytes):
- return _ffi.NULL
- elif callable(self._passphrase):
+ elif isinstance(self._passphrase, bytes) or callable(self._passphrase):
return _ffi.callback("pem_password_cb", self._read_passphrase)
else:
raise TypeError(
@@ -2702,9 +2805,7 @@ class _PassphraseHelper(object):
def callback_args(self):
if self._passphrase is None:
return _ffi.NULL
- elif isinstance(self._passphrase, bytes):
- return self._passphrase
- elif callable(self._passphrase):
+ elif isinstance(self._passphrase, bytes) or callable(self._passphrase):
return _ffi.NULL
else:
raise TypeError(
@@ -2724,12 +2825,15 @@ class _PassphraseHelper(object):
def _read_passphrase(self, buf, size, rwflag, userdata):
try:
- if self._more_args:
- result = self._passphrase(size, rwflag, userdata)
+ if callable(self._passphrase):
+ if self._more_args:
+ result = self._passphrase(size, rwflag, userdata)
+ else:
+ result = self._passphrase(rwflag)
else:
- result = self._passphrase(rwflag)
+ result = self._passphrase
if not isinstance(result, bytes):
- raise ValueError("String expected")
+ raise ValueError("Bytes expected")
if len(result) > size:
if self._truncate:
result = result[:size]
@@ -2738,7 +2842,7 @@ class _PassphraseHelper(object):
"passphrase returned by callback is too long"
)
for i in range(len(result)):
- buf[i] = result[i:i + 1]
+ buf[i] = result[i : i + 1]
return len(result)
except Exception as e:
self._problems.append(e)
@@ -2763,7 +2867,8 @@ def load_publickey(type, buffer):
if type == FILETYPE_PEM:
evp_pkey = _lib.PEM_read_bio_PUBKEY(
- bio, _ffi.NULL, _ffi.NULL, _ffi.NULL)
+ bio, _ffi.NULL, _ffi.NULL, _ffi.NULL
+ )
elif type == FILETYPE_ASN1:
evp_pkey = _lib.d2i_PUBKEY_bio(bio, _ffi.NULL)
else:
@@ -2799,7 +2904,8 @@ def load_privatekey(type, buffer, passphrase=None):
helper = _PassphraseHelper(type, passphrase)
if type == FILETYPE_PEM:
evp_pkey = _lib.PEM_read_bio_PrivateKey(
- bio, _ffi.NULL, helper.callback, helper.callback_args)
+ bio, _ffi.NULL, helper.callback, helper.callback_args
+ )
helper.raise_if_problem()
elif type == FILETYPE_ASN1:
evp_pkey = _lib.d2i_PrivateKey_bio(bio, _ffi.NULL)
@@ -2898,7 +3004,8 @@ def sign(pkey, data, digest):
signature_buffer = _ffi.new("unsigned char[]", length)
signature_length = _ffi.new("unsigned int *")
final_result = _lib.EVP_SignFinal(
- md_ctx, signature_buffer, signature_length, pkey._pkey)
+ md_ctx, signature_buffer, signature_length, pkey._pkey
+ )
_openssl_assert(final_result == 1)
return _ffi.buffer(signature_buffer, signature_length[0])[:]
@@ -2962,9 +3069,10 @@ def dump_crl(type, crl):
else:
raise ValueError(
"type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
- "FILETYPE_TEXT")
+ "FILETYPE_TEXT"
+ )
- assert ret == 1
+ _openssl_assert(ret == 1)
return _bio_to_string(bio)
@@ -3027,6 +3135,17 @@ def load_pkcs7_data(type, buffer):
return pypkcs7
+load_pkcs7_data = utils.deprecated(
+ load_pkcs7_data,
+ __name__,
+ (
+ "PKCS#7 support in pyOpenSSL is deprecated. You should use the APIs "
+ "in cryptography."
+ ),
+ DeprecationWarning,
+)
+
+
def load_pkcs12(buffer, passphrase=None):
"""
Load pkcs12 data from the string *buffer*. If the pkcs12 structure is
@@ -3114,6 +3233,17 @@ def load_pkcs12(buffer, passphrase=None):
return pkcs12
+load_pkcs12 = utils.deprecated(
+ load_pkcs12,
+ __name__,
+ (
+ "PKCS#12 support in pyOpenSSL is deprecated. You should use the APIs "
+ "in cryptography."
+ ),
+ DeprecationWarning,
+)
+
+
# There are no direct unit tests for this initialization. It is tested
# indirectly since it is necessary for functions like dump_privatekey when
# using encryption.
@@ -3132,4 +3262,4 @@ _lib.SSL_load_error_strings()
# Set the default string mask to match OpenSSL upstream (since 2005) and
# RFC5280 recommendations.
-_lib.ASN1_STRING_set_default_mask_asc(b'utf8only')
+_lib.ASN1_STRING_set_default_mask_asc(b"utf8only")