Custom partitionerΒΆ
If you consider using partitions as a logical entity, rather then purely for load-balancing, you may need to have more control over routing messages to partitions. By default hashing algorithms are used.
Producer
import asyncio
import random
from aiokafka import AIOKafkaProducer
def my_partitioner(key, all_partitions, available_partitions):
if key == b'first':
return all_partitions[0]
elif key == b'last':
return all_partitions[-1]
return random.choice(all_partitions)
async def produce_one(producer, key, value):
future = await producer.send('foobar', value, key=key)
resp = await future
print("'%s' produced in partition: %i"%(value.decode(), resp.partition))
async def produce_task():
producer = AIOKafkaProducer(
bootstrap_servers='localhost:9092',
partitioner=my_partitioner)
await producer.start()
await produce_one(producer, b'last', b'1')
await produce_one(producer, b'some', b'2')
await produce_one(producer, b'first', b'3')
await producer.stop()
asyncio.run(produce_task())
Output (topic foobar
has 10 partitions):
>>> python3 producer.py
'1' produced in partition: 9
'2' produced in partition: 6
'3' produced in partition: 0