A to Z Kafka Streams (Example 1)

September 27, 2022

Running a Kafka Streams application

Install Kafka:

curl https://archive.apache.org/dist/kafka/2.8.0/kafka_2.12-2.8.0.tgz -o kafka_2.12-2.8.0.tgz


tar xzf kafka_2.12-2.8.0.tgz 

Go to Kafka folder:

cd kafka_2.12-2.8.0

Start zookeeper:

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
  • Zookeeper defaults to port 2181

Start kafka:

bin/kafka-server-start.sh -daemon config/server.properties
  • Kafka defaults to port 9092

Create the input topic(input-topic):

bin/kafka-topics.sh --create --topic input-topic --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092

Create the output topic(output-topic):

bin/kafka-topics.sh --create --topic output-topic --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092

Check that all is ok by listing out all the topics created:

bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

Build your Java project

cd ..
mkdir javakafkaproject && cd javakafkaproject

Download project files:

mkdir -p src/main/java
curl https://joelpintomata.com/tutorials/a-to-z-kafka-example-java-code.java -o src/main/java/KafkaStreamsDemo.java
curl https://joelpintomata.com/tutorials/a-to-z-kafka-example-pom.xml -o ./pom.xml

Build the project:

mvn clean package

Run the application:

java -jar target/kafka-streams-demo-1.0-SNAPSHOT-jar-with-dependencies.jar

In a second terminal, start a kafka producer:

cd kafka_2.12-2.8.0
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 \
    --topic input-topic \
    --property key.separator=":" \
    --property parse.key=true 

in the above command we set the separator as :, without it Kafka defaults to a tab (\t)

In a third terminal, start a kafka consumer:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic output-topic \
    --property print.key=true \
    --property key.separator=" total is " \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

How does this all work together? In the Producer terminal send the following:


In the Consumer terminal you will get:

bananas total is 6
oranges total is 10

In the Producer terminal send the following:


In the Consumer terminal you will get:

pears total is 20
oranges total is 20

We can see that the kafka streams application is aggregating all the view counts based on the video title.

Kafka Streams application Architecture example

Kafka Streams architecture example

Credits to Introduction to Kafka Streams - Akash

Key points:

  • A tasks represents the set your processors that is, the steps your application applies to the stream
  • When the application starts it creates a number of tasks are equal to the number of partitions
  • When the application starts it balances the number of tasks along the number of application threads
  • The moment another application starts Kafka balances the amoung of tasks along the number of applications.

Failover/fault_tolerance considerations

  • Kafka relies on change compacted topic to speed up recoveries. change compacted topics are compacted so that theres a unique value per key meaning, each key contains the last value.
  • Kafka relies on change logs that one can query via API