Welcome to aiokafka’s documentation!

https://img.shields.io/badge/kafka-1.0%2C%200.11%2C%200.10%2C%200.9-brightgreen.svg https://img.shields.io/pypi/pyversions/aiokafka.svg https://img.shields.io/badge/license-Apache%202-blue.svg

aiokafka is a client for the Apache Kafka distributed stream processing system using asyncio. It is based on the kafka-python library and reuses its internals for protocol parsing, errors, etc. The client is designed to function much like the official Java client, with a sprinkling of Pythonic interfaces.

aiokafka can be used with 0.9+ Kafka brokers and supports fully coordinated consumer groups – i.e., dynamic partition assignment to multiple consumers in the same group.

Getting started

AIOKafkaConsumer

AIOKafkaConsumer is a high-level message consumer, intended to operate as similarly as possible to the official Java client.

Here’s a consumer example:

from aiokafka import AIOKafkaConsumer
import asyncio

async def consume():
    consumer = AIOKafkaConsumer(
        'my_topic', 'my_other_topic',
        bootstrap_servers='localhost:9092',
        group_id="my-group")
    # Get cluster layout and join group `my-group`
    await consumer.start()
    try:
        # Consume messages
        async for msg in consumer:
            print("consumed: ", msg.topic, msg.partition, msg.offset,
                  msg.key, msg.value, msg.timestamp)
    finally:
        # Will leave consumer group; perform autocommit if enabled.
        await consumer.stop()

asyncio.run(consume())

Read more in Consumer client section.

AIOKafkaProducer

AIOKafkaProducer is a high-level, asynchronous message producer.

Here’s a producer example:

from aiokafka import AIOKafkaProducer
import asyncio

async def send_one():
    producer = AIOKafkaProducer(
        bootstrap_servers='localhost:9092')
    # Get cluster layout and initial topic/partition leadership information
    await producer.start()
    try:
        # Produce message
        await producer.send_and_wait("my_topic", b"Super message")
    finally:
        # Wait for all pending messages to be delivered or expire.
        await producer.stop()

asyncio.run(send_one())

Read more in Producer client section.

Installation

pip install aiokafka

Note

aiokafka requires the kafka-python library.

Optional LZ4 install

To enable LZ4 compression/decompression, install aiokafka with lz4 extra option:

pip install ‘aiokafka[lz4]’

Note, that on Windows you will need Visual Studio build tools, available for download from http://landinghub.visualstudio.com/visual-cpp-build-tools

Optional Snappy install

  1. Download and build Snappy from http://google.github.io/snappy/

Ubuntu:

apt-get install libsnappy-dev

OSX:

brew install snappy

From Source:

wget https://github.com/google/snappy/tarball/master
tar xzvf google-snappy-X.X.X-X-XXXXXXXX.tar.gz
cd google-snappy-X.X.X-X-XXXXXXXX
./configure
make
sudo make install
  1. Install aiokafka with snappy extra option

pip install 'aiokafka[snappy]'

Optional zstd indtall

To enable Zstandard compression/decompression, install aiokafka with zstd extra option:

pip install 'aiokafka[zstd]'

Optional GSSAPI install

To enable SASL authentication with GSSAPI, install aiokafka with gssapi extra option:

pip install 'aiokafka[gssapi]'

Source code

The project is hosted on GitHub

Please feel free to file an issue on bug tracker if you have found a bug or have some suggestion for library improvement.

The library uses Travis for Continious Integration.

Authors and License

The aiokafka package is Apache 2 licensed and freely available.

Feel free to improve this package and send a pull request to GitHub.

Contents:

Indices and tables