Using SSL with aiokafkaΒΆ

An example of SSL usage with aiokafka. Please read SSL Authentication for more information.

import asyncio
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
from aiokafka.helpers import create_ssl_context
from kafka.common import TopicPartition

context = create_ssl_context(
    cafile="./ca-cert",  # CA used to sign certificate.
                         # `CARoot` of JKS store container
    certfile="./cert-signed",  # Signed certificate
    keyfile="./cert-key",  # Private Key file of `certfile` certificate
    password="123123"
)

async def produce_and_consume():
    # Produce
    producer = AIOKafkaProducer(
        bootstrap_servers='localhost:9093',
        security_protocol="SSL", ssl_context=context)

    await producer.start()
    try:
        msg = await producer.send_and_wait(
            'my_topic', b"Super Message", partition=0)
    finally:
        await producer.stop()

    consumer = AIOKafkaConsumer(
        "my_topic", bootstrap_servers='localhost:9093',
        security_protocol="SSL", ssl_context=context)
    await consumer.start()
    try:
        consumer.seek(TopicPartition('my_topic', 0), msg.offset)
        fetch_msg = await consumer.getone()
    finally:
        await consumer.stop()

    print("Success", msg, fetch_msg)

if __name__ == "__main__":
    asyncio.run(produce_and_consume())

Output:

>>> python3 ssl_consume_produce.py
Success RecordMetadata(topic='my_topic', partition=0, topic_partition=TopicPartition(topic='my_topic', partition=0), offset=32) ConsumerRecord(topic='my_topic', partition=0, offset=32, timestamp=1479393347381, timestamp_type=0, key=None, value=b'Super Message', checksum=469650252, serialized_key_size=-1, serialized_value_size=13)