Download Kafka:
curl https://dlcdn.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz -o kafka_2.13-3.6.1.tgz
Extract:
tar -xzf kafka_2.13-3.6.1.tgz
Go to Kafka folder:
cd kafka_2.13-3.6.1.tgz
Start zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
Note: Zookeeper defaults to port 2181
Start kafka:
bin/kafka-server-start.sh config/server.properties
Note: Kafka defaults to port 9092
Create the input topic(input-topic):
bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1 \
--topic streams-plaintext-input
Create the output topic(output-topic):
bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1 \
--topic streams-wordcount-output \
--config cleanup.policy=compact
Check that all is ok by listing out all the topics created:
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
Build your Java project
cd ..
mkdir producer
cd producer
mkdir -p src/main/java
curl https://joelpintomata.com/tutorials/a-to-z-kafka-streams-example-2-producer.java -o src/main/java/Producer.java
curl https://joelpintomata.com/tutorials/a-to-z-kafka-streams-example-2-producer-pom.xml -o pom.xml
mvn clean package
java -jar ./target/kafka-streams-example-2-1.0-SNAPSHOT-jar-with-dependencies.jar
Build your Java project
cd ..
mkdir consumer
cd consumer
mkdir -p src/main/java
curl https://raw.githubusercontent.com/apache/kafka/3.6/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java -o src/main/java/WordCountDemo.java
curl https://joelpintomata.com/tutorials/a-to-z-kafka-streams-example-2-consumer-pom.xml -o pom.xml
mvn clean package
java -jar ./target/kafka-streams-example-2-1.0-SNAPSHOT-jar-with-dependencies.jar
In a third terminal, start a kafka consumer checking the stream process output:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
Kafka Streams architecture example
Credits to Introduction to Kafka Streams - Akash
Key points:
change compacted topic
to speed up recoveries. change compacted topics
are compacted so that theres a unique value per key meaning, each key contains the last value.change logs
that one can query via API