When processing more sensitive data
enable_auto_commit=False mode of
Consumer can lead to data loss in cases of critical failure. To avoid it we
can commit offsets manually after they were processed. Note, that this is a
tradeoff from at most once to at least once delivery, to achieve
exactly once you will need to save offsets in the destination database and
validate those yourself.
More on message delivery: https://kafka.apache.org/documentation.html#semantics
After Kafka Broker version 0.11 and after aiokafka==0.5.0 it is possible to use Transactional Producer to achieve exactly once delivery semantics. See Tranactional Producer section.
import json import asyncio from kafka.common import KafkaError from aiokafka import AIOKafkaConsumer async def consume(): consumer = AIOKafkaConsumer( 'foobar', bootstrap_servers='localhost:9092', auto_offset_reset='earliest', group_id="some-consumer-group", enable_auto_commit=False) await consumer.start() # we want to consume 10 messages from "foobar" topic # and commit after that for i in range(10): msg = await (consumer.getone() await consumer.commit() await consumer.stop() asyncio.run(consume())