Depending on your situation you may want to for your consumers to consume from a give set of partitions.
To do so we use assign
instead of subscribe
.
For example:
def consume(self):
events = []
consumer: confluent_kafka.Consumer = confluent_kafka.Consumer(
{'bootstrap.servers': 'mybroker'}
)
try:
list_topics = consumer.list_topics("topic_name")
partitions = [TopicPartition("topic_name", partition) for partition in list(list_topics.topics["topic_name"].partitions.keys())]
consumer.assign(partitions)
# Else, just subscribe to the topic
# consumer.subscribe([self.topic_name])
while True:
msg = consumer.poll(10.0)
if msg is None:
logger.info(f"No more messages to read on topic {self.topic_name}.")
break
if msg.error():
raise KafkaException(msg.error())
process(msg)
(...)