Custom partitionerΒΆ

If you consider using partitions as a logical entity, rather then purely for load-balancing, you may need to have more control over routing messages to partitions. By default hashing algorithms are used.


import asyncio
import random
from aiokafka import AIOKafkaProducer

def my_partitioner(key, all_partitions, available_partitions):
   if key == b'first':
       return all_partitions[0]
   elif key == b'last':
       return all_partitions[-1]
   return random.choice(all_partitions)

async def produce_one(producer, key, value):
    future = await producer.send('foobar', value, key=key)
    resp = await future
    print("'%s' produced in partition: %i"%(value.decode(), resp.partition))

async def produce_task():
    producer = AIOKafkaProducer(

    await producer.start()
    await produce_one(producer, b'last', b'1')
    await produce_one(producer, b'some', b'2')
    await produce_one(producer, b'first', b'3')
    await producer.stop()

Output (topic foobar has 10 partitions):

>>> python3
'1' produced in partition: 9
'2' produced in partition: 6
'3' produced in partition: 0