Manual commitΒΆ

When processing sensitive data, using enable_auto_commit=True (default) for the Consumer can lead to data loss in the event of a critical failure. To avoid this, set enable_auto_commit=False and commit offsets manually only after messages have been processed. Note, that this is a tradeoff from at most once to at least once delivery, to achieve exactly once you will need to save offsets in the destination database and validate those yourself.

More on message delivery: https://kafka.apache.org/documentation.html#semantics

Note

After Kafka Broker version 0.11 and after aiokafka==0.5.0 it is possible to use Transactional Producer to achieve exactly once delivery semantics. See Transactional producer section.

Consumer:

import json
import asyncio
from aiokafka.errors import KafkaError
from aiokafka import AIOKafkaConsumer

async def consume():
    consumer = AIOKafkaConsumer(
        'foobar',
        bootstrap_servers='localhost:9092',
        auto_offset_reset='earliest',
        group_id="some-consumer-group",
        enable_auto_commit=False)
    await consumer.start()
    # we want to consume 10 messages from "foobar" topic
    # and commit after that
    for _ in range(10):
        msg = await consumer.getone()
    await consumer.commit()

    await consumer.stop()

asyncio.run(consume())