Manual commitΒΆ

When processing more sensitive data enable_auto_commit=False mode of Consumer can lead to data loss in cases of critical failure. To avoid it we can commit offsets manually after they were processed. Note, that this is a tradeoff from at most once to at least once delivery, to achieve exactly once you will need to save offsets in the destination database and validate those yourself.

More on message delivery: https://kafka.apache.org/documentation.html#semantics

Note

After Kafka Broker version 0.11 and after aiokafka==0.5.0 it is possible to use Transactional Producer to achieve exactly once delivery semantics. See Transactional producer section.

Consumer:

import json
import asyncio
from kafka.common import KafkaError
from aiokafka import AIOKafkaConsumer

async def consume():
    consumer = AIOKafkaConsumer(
        'foobar',
        bootstrap_servers='localhost:9092',
        auto_offset_reset='earliest',
        group_id="some-consumer-group",
        enable_auto_commit=False)
    await consumer.start()
    # we want to consume 10 messages from "foobar" topic
    # and commit after that
    for _ in range(10):
        msg = await consumer.getone()
    await consumer.commit()

    await consumer.stop()

asyncio.run(consume())