Clicky

Python - Confluent_Kafka - Consume from set of partitions, instead of subscribing to the topic.

April 26, 2024

Depending on your situation you may want to for your consumers to consume from a give set of partitions. To do so we use assign instead of subscribe.

For example:

def consume(self):
  events = []
  consumer: confluent_kafka.Consumer = confluent_kafka.Consumer(
    {'bootstrap.servers': 'mybroker'}
  )
  try:
      list_topics = consumer.list_topics("topic_name")
      partitions = [TopicPartition("topic_name", partition) for partition in list(list_topics.topics["topic_name"].partitions.keys())]
      consumer.assign(partitions)

      # Else, just subscribe to the topic
      # consumer.subscribe([self.topic_name])
  
      while True:
          msg = consumer.poll(10.0)
          if msg is None:
              logger.info(f"No more messages to read on topic {self.topic_name}.")
              break
  
          if msg.error():
              raise KafkaException(msg.error())

          process(msg)
  (...)