Source code for aiokafka.helpers
"""
.. _kafka-python: https://github.com/dpkp/kafka-python
"""
import logging
from ssl import create_default_context, Purpose
log = logging.getLogger(__name__)
[docs]def create_ssl_context(*, cafile=None, capath=None, cadata=None,
certfile=None, keyfile=None, password=None,
crlfile=None):
"""
Simple helper, that creates an :class:`~ssl.SSLContext` based on params similar to
those in `kafka-python`_, but with some restrictions like:
* `check_hostname` is not optional, and will be set to :data:`True`
* `crlfile` option is missing. It is fairly hard to test it.
Arguments:
cafile (str): Certificate Authority file path containing certificates
used to sign broker certificates. If CA not specified (by either
cafile, capath, cadata) default system CA will be used if found by
OpenSSL. For more information see
:meth:`~ssl.SSLContext.load_verify_locations`.
Default: :data:`None`
capath (str): Same as `cafile`, but points to a directory containing
several CA certificates. For more information see
:meth:`~ssl.SSLContext.load_verify_locations`.
Default: :data:`None`
cadata (str, bytes): Same as `cafile`, but instead contains already
read data in either ASCII or bytes format. Can be used to specify
DER-encoded certificates, rather than PEM ones. For more
information see :meth:`~ssl.SSLContext.load_verify_locations`.
Default: :data:`None`
certfile (str): optional filename of file in PEM format containing
the client certificate, as well as any CA certificates needed to
establish the certificate's authenticity. For more information see
:meth:`~ssl.SSLContext.load_cert_chain`.
Default: :data:`None`.
keyfile (str): optional filename containing the client private key.
For more information see :meth:`~ssl.SSLContext.load_cert_chain`.
Default: :data:`None`.
password (str): optional password to be used when loading the
certificate chain. For more information see
:meth:`~ssl.SSLContext.load_cert_chain`.
Default: :data:`None`.
"""
if cafile or capath:
log.info('Loading SSL CA from %s', cafile or capath)
elif cadata is not None:
log.info('Loading SSL CA from data provided in `cadata`')
log.debug('`cadata`: %r', cadata)
# Creating context with default params for client sockets.
context = create_default_context(
Purpose.SERVER_AUTH, cafile=cafile, capath=capath, cadata=cadata)
# Load certificate if one is specified.
if certfile is not None:
log.info('Loading SSL Cert from %s', certfile)
if keyfile:
if password is not None:
log.info('Loading SSL Key from %s with password', keyfile)
else: # pragma: no cover
log.info('Loading SSL Key from %s without password', keyfile)
# NOTE: From docs:
# If the password argument is not specified and a password is required,
# OpenSSL’s built-in password prompting mechanism will be used to
# interactively prompt the user for a password.
context.load_cert_chain(certfile, keyfile, password)
return context