Batch producer ============== If your application needs precise control over batch creation and submission and you're willing to forego the niceties of automatic serialization and partition selection, you may use the simple :meth:`~.AIOKafkaProducer.create_batch` and :meth:`~.AIOKafkaProducer.send_batch` interface. Producer .. code:: python import asyncio import random from aiokafka.producer import AIOKafkaProducer async def send_many(num): topic = "my_topic" producer = AIOKafkaProducer() await producer.start() batch = producer.create_batch() i = 0 while i < num: msg = ("Test message %d" % i).encode("utf-8") metadata = batch.append(key=None, value=msg, timestamp=None) if metadata is None: partitions = await producer.partitions_for(topic) partition = random.choice(tuple(partitions)) await producer.send_batch(batch, topic, partition=partition) print("%d messages sent to partition %d" % (batch.record_count(), partition)) batch = producer.create_batch() continue i += 1 partitions = await producer.partitions_for(topic) partition = random.choice(tuple(partitions)) await producer.send_batch(batch, topic, partition=partition) print("%d messages sent to partition %d" % (batch.record_count(), partition)) await producer.stop() asyncio.run(send_many(1000)) Output (topic ``my_topic`` has 3 partitions):: >>> python3 batch_produce.py 329 messages sent to partition 2 327 messages sent to partition 0 327 messages sent to partition 0 17 messages sent to partition 1