Transactional Consume-Process-ProduceΒΆ

If you have a pattern where you want to consume from one topic, process data and produce to a different one, you would really like to do it with using Transactional Producer. In the example below we read from IN_TOPIC, process data and produce the resut to OUT_TOPIC in a transactional manner.

import asyncio
from collections import defaultdict, Counter

from aiokafka import TopicPartition, AIOKafkaConsumer, AIOKafkaProducer


IN_TOPIC = "in_topic"
GROUP_ID = "processing-group"
OUT_TOPIC = "out_topic"
TRANSACTIONAL_ID = "my-txn-id"
BOOTSTRAP_SERVERS = "localhost:9092"

POLL_TIMEOUT = 60_000


def process_batch(msgs):
    # Group by key do simple count sampling by a minute window
    buckets_by_key = defaultdict(Counter)
    for msg in msgs:
        timestamp = (msg.timestamp // 60_000) * 60
        buckets_by_key[msg.key][timestamp] += 1

    res = []
    for key, counts in buckets_by_key.items():
        for timestamp, count in counts.items():
            value = str(count).encode()
            res.append((key, value, timestamp))

    return res


async def transactional_process():
    consumer = AIOKafkaConsumer(
        IN_TOPIC,
        bootstrap_servers=BOOTSTRAP_SERVERS,
        enable_auto_commit=False,
        group_id=GROUP_ID,
        isolation_level="read_committed"  # <-- This will filter aborted txn's
    )
    await consumer.start()

    producer = AIOKafkaProducer(
        bootstrap_servers=BOOTSTRAP_SERVERS,
        transactional_id=TRANSACTIONAL_ID
    )
    await producer.start()

    try:
        while True:
            msg_batch = await consumer.getmany(timeout_ms=POLL_TIMEOUT)

            async with producer.transaction():
                commit_offsets = {}
                in_msgs = []
                for tp, msgs in msg_batch.items():
                    in_msgs.extend(msgs)
                    commit_offsets[tp] = msgs[-1].offset + 1

                out_msgs = process_batch(in_msgs)
                for key, value, timestamp in out_msgs:
                    await producer.send(
                        OUT_TOPIC, value=value, key=key,
                        timestamp_ms=int(timestamp * 1000)
                    )
                # We commit through the producer because we want the commit
                # to only succeed if the whole transaction is done
                # successfully.
                await producer.send_offsets_to_transaction(
                    commit_offsets, GROUP_ID)
    finally:
        await consumer.stop()
        await producer.stop()


if __name__ == "__main__":
    asyncio.run(transactional_process())