Welcome to aiokafka’s documentation!

https://img.shields.io/badge/kafka-1.0%2C%200.11%2C%200.10%2C%200.9-brightgreen.svg https://img.shields.io/pypi/pyversions/aiokafka.svg https://img.shields.io/badge/license-Apache%202-blue.svg

aiokafka is a client for the Apache Kafka distributed stream processing system using asyncio. It is based on the kafka-python library and reuses its internals for protocol parsing, errors, etc. The client is designed to function much like the official Java client, with a sprinkling of Pythonic interfaces.

aiokafka can be used with 0.9+ Kafka brokers and supports fully coordinated consumer groups – i.e., dynamic partition assignment to multiple consumers in the same group.

Getting started

AIOKafkaConsumer

AIOKafkaConsumer is a high-level message consumer, intended to operate as similarly as possible to the official Java client.

Here’s a consumer example:

from aiokafka import AIOKafkaConsumer
import asyncio

loop = asyncio.get_event_loop()

async def consume():
    consumer = AIOKafkaConsumer(
        'my_topic', 'my_other_topic',
        loop=loop, bootstrap_servers='localhost:9092',
        group_id="my-group")
    # Get cluster layout and join group `my-group`
    await consumer.start()
    try:
        # Consume messages
        async for msg in consumer:
            print("consumed: ", msg.topic, msg.partition, msg.offset,
                  msg.key, msg.value, msg.timestamp)
    finally:
        # Will leave consumer group; perform autocommit if enabled.
        await consumer.stop()

loop.run_until_complete(consume())

Read more in Consumer client section.

AIOKafkaProducer

AIOKafkaProducer is a high-level, asynchronous message producer.

Here’s a producer example:

from aiokafka import AIOKafkaProducer
import asyncio

loop = asyncio.get_event_loop()

async def send_one():
    producer = AIOKafkaProducer(
        loop=loop, bootstrap_servers='localhost:9092')
    # Get cluster layout and initial topic/partition leadership information
    await producer.start()
    try:
        # Produce message
        await producer.send_and_wait("my_topic", b"Super message")
    finally:
        # Wait for all pending messages to be delivered or expire.
        await producer.stop()

loop.run_until_complete(send_one())

Read more in Producer client section.

Installation

pip3 install aiokafka

Note

aiokafka requires the kafka-python library.

Optional LZ4 install

To enable LZ4 compression/decompression, install lz4tools and xxhash:

>>> pip3 install lz4tools
>>> pip3 install xxhash

Note, that on Windows you will need Visual Studio build tools, available for download from http://landinghub.visualstudio.com/visual-cpp-build-tools

Optional Snappy install

  1. Download and build Snappy from http://google.github.io/snappy/

Ubuntu:

apt-get install libsnappy-dev

OSX:

brew install snappy

From Source:

wget https://github.com/google/snappy/tarball/master
tar xzvf google-snappy-X.X.X-X-XXXXXXXX.tar.gz
cd google-snappy-X.X.X-X-XXXXXXXX
./configure
make
sudo make install
  1. Install the python-snappy module
pip3 install python-snappy

For Windows the easiest way is to fetch a precompiled wheel from http://www.lfd.uci.edu/~gohlke/pythonlibs/#python-snappy

Optional GSSAPI install

To enable SASL authentication with GSSAPI you need to install gssapi:

>>> pip3 install gssapi

Source code

The project is hosted on GitHub

Please feel free to file an issue on bug tracker if you have found a bug or have some suggestion for library improvement.

The library uses Travis for Continious Integration.

Indices and tables