Custom partitionerΒΆ

If you consider using partitions as a logical entity, rather then purely for load-balancing, you may need to have more control over routing messages to partitions. By default hashing algorithms are used.

Producer

import asyncio
import random
from aiokafka import AIOKafkaProducer

def my_partitioner(key, all_partitions, available_partitions):
   if key == b'first':
       return all_partitions[0]
   elif key == b'last':
       return all_partitions[-1]
   return random.choice(all_partitions)

async def produce_one(producer, key, value):
    future = await producer.send('foobar', value, key=key)
    resp = await future
    print("'%s' produced in partition: %i"%(value.decode(), resp.partition))

async def produce_task():
    producer = AIOKafkaProducer(
        bootstrap_servers='localhost:9092',
        partitioner=my_partitioner)

    await producer.start()
    await produce_one(producer, b'last', b'1')
    await produce_one(producer, b'some', b'2')
    await produce_one(producer, b'first', b'3')
    await producer.stop()

asyncio.run(produce_task())

Output (topic foobar has 10 partitions):

>>> python3 producer.py
'1' produced in partition: 9
'2' produced in partition: 6
'3' produced in partition: 0