Welcome to aiokafka’s documentation!¶
aiokafka is a client for the Apache Kafka distributed stream processing system using asyncio. It is based on the kafka-python library and reuses its internals for protocol parsing, errors, etc. The client is designed to function much like the official Java client, with a sprinkling of Pythonic interfaces.
aiokafka can be used with 0.9+ Kafka brokers and supports fully coordinated consumer groups – i.e., dynamic partition assignment to multiple consumers in the same group.
Getting started¶
AIOKafkaConsumer¶
AIOKafkaConsumer
is a high-level message consumer, intended to
operate as similarly as possible to the official Java client.
Here’s a consumer example:
from aiokafka import AIOKafkaConsumer
import asyncio
async def consume():
consumer = AIOKafkaConsumer(
'my_topic', 'my_other_topic',
bootstrap_servers='localhost:9092',
group_id="my-group")
# Get cluster layout and join group `my-group`
await consumer.start()
try:
# Consume messages
async for msg in consumer:
print("consumed: ", msg.topic, msg.partition, msg.offset,
msg.key, msg.value, msg.timestamp)
finally:
# Will leave consumer group; perform autocommit if enabled.
await consumer.stop()
asyncio.run(consume())
Read more in Consumer client section.
AIOKafkaProducer¶
AIOKafkaProducer
is a high-level, asynchronous message producer.
Here’s a producer example:
from aiokafka import AIOKafkaProducer
import asyncio
async def send_one():
producer = AIOKafkaProducer(
bootstrap_servers='localhost:9092')
# Get cluster layout and initial topic/partition leadership information
await producer.start()
try:
# Produce message
await producer.send_and_wait("my_topic", b"Super message")
finally:
# Wait for all pending messages to be delivered or expire.
await producer.stop()
asyncio.run(send_one())
Read more in Producer client section.
Installation¶
pip install aiokafka
Optional LZ4 install¶
To enable LZ4 compression/decompression, install aiokafka with lz4
extra option:
pip install 'aiokafka[lz4]'
Note, that on Windows you will need Visual Studio build tools, available for download from http://landinghub.visualstudio.com/visual-cpp-build-tools
Optional Snappy install¶
To enable Snappy compression/decompression, install aiokafka with snappy
extra option
pip install 'aiokafka[snappy]'
Optional zstd indtall¶
To enable Zstandard compression/decompression, install aiokafka with zstd
extra option:
pip install 'aiokafka[zstd]'
Optional GSSAPI install¶
To enable SASL authentication with GSSAPI, install aiokafka with gssapi
extra option:
pip install 'aiokafka[gssapi]'
Source code¶
The project is hosted on GitHub
Please feel free to file an issue on bug tracker if you have found a bug or have some suggestion for library improvement.