Spring Boot + Kafka: A Toy App
Introduction
In this blog post, we will see the minimal kafka configurations and spring boot source code that we can use to pass (publish) a message from the kafka producer to the consumer using spring boot API.
Fire on
I will be using linux terminal to test the the app. You can use any OS with terminal app (mac ,linux). For windows you can use ubuntu subsystem.
The flow will be like this,
- Configure spring boot app
- Write a docker compose file( I assume you already have docker ,download).
- Test kafka producer and consumer from the terminal.
- Create spring boot API and test the app.
Setup Docker and App
At first we should create a spring boot application from here. Please add dependencies like we showed in below image.
(Also, I am using spring boot 2.7, gradle build system and java 17.)
Let’s use kafka and zookeeper docker image.
We can add a ``docker-compose.yaml`` file like showed below. We should create this file inside the project directory for later use.
As we know, zookeper needs to start before kafka, so we have added ``depends_on: zookeeper``.
Also, the kafka loclahost docker container port is mapped to 9092 local machine, so that we can connect to the kafka port from spring app.
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper:latest
container_name: zookeeper-analytical-pipeline
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:latest
container_name: kafka-analytical-pipeline
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Now, we can start docker and pull images:
$ cd project_folder
$ docker-compose up
After the error free run of docker containers, we can start testing.
Test Kafka
Let’s create a kafka topic,
to Enter into the docker container, from terminal run:
docker exec -it kafka-analytical-pipeline bash
Create topic using below command.
kafka-topics.sh --create --topic metric1 --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Now we can use below commands to connect to the kafka container and run the producer console.
- Open two new tabs in the terminal.
- First one will be used as a producer (tab1) and the other one as a consumer (tab2).
- Run the below commands in both the tabs to connect to the kafka container from terminal.
docker exec -it kafka-analytical-pipeline bash
kafka-console-producer.sh --topic metric1 --bootstrap-server localhost:9092
4. Write below commands to the tab1 of terminal ,when it shows `>` sign, write message and press enter. This will pass messages to the topic `metric1` from the kafka producer.
5. Write below command to the 2nd tab of terminal. We will see that consumer consume messages from the producer ( from tab1 in the terminal)
Note: `— — from — beginning` parameter is needed to see all the messages. Otherwise you will only see the message you pass to the producer after starting the consumer.
Create spring boot api
Let’s create a service, a controller to the empty project we have created in previous section.
We can write a producer service like below,
import org.springframework.beans.factory.annotation.Value
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Service
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Service
class KafkaProducer {
@Value("\${app.constant.kafka.topic-name1}")
lateinit var metricsTopic1:String
val loggerFactory = LoggerFactory.getLogger(KafkaProducer::class.java)
var kafkaTemplate: KafkaTemplate<String, String>
constructor( kafkaTemplate: KafkaTemplate<String, String>){
this.kafkaTemplate = kafkaTemplate
}
fun sendMessage(message: String){
kafkaTemplate.send(metricsTopic1,message).let {
loggerFactory.info("Finished {}",metricsTopic1)
}
}
Write a controller,
import com.dataanalysis.kafka.dataanalysispipeline.kafka.producer.KafkaProducer
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.http.ResponseEntity
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RequestParam
import org.springframework.web.bind.annotation.RestController
@RestController
@RequestMapping("/api/v1/kafka")
class MessageController {
@Autowired
lateinit var kafkaProducer: KafkaProducer
constructor(kafkaProducer: KafkaProducer) {
this.kafkaProducer = kafkaProducer
}
fun MessageController(kafkaProducer: KafkaProducer){
this.kafkaProducer=kafkaProducer
}
@GetMapping("/publish")
fun publish(@RequestParam("message") message:String)
:ResponseEntity<String>
{
kafkaProducer.sendMessage(message)
return ResponseEntity.ok("message sent to the topic")
}
}
Topic config,
import org.apache.kafka.clients.admin.NewTopic
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.config.TopicBuilder
import org.springframework.kafka.core.KafkaTemplate
@Configuration
class KafkaTopicConfig {
@Value("\${app.constant.kafka.topic-name1}")
lateinit var metricsTopic1:String
@Bean
fun metricsTopic():NewTopic{
return TopicBuilder.name(metricsTopic1)
.build()
}
}
Add application configs on ``applicaion.yaml`` as below. As you can see we set the topic name “metric1” (that is injected later, using @value )
app:
constant:
kafka:
topic-name1: "metric1"
spring:
kafka:
consumer:
auto-offset-reset: earliest
bootstrap-servers: localhost:9092
group-id:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# properties:
# spring.json.trusted.packages: com.learning.events # change this with your event packages
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
Let’s run the application and check if it works properly.
./gradlew bootRun
Send Message From API
Let’s call the api to post a message to our topic ‘metric1’ from terminal. We can call the api with a message parameter like below.
curl -X GET http://localhost:8080/api/v1/kafka/publish?message=new_message
We can see kafka consumer is getting the message from metric1 topic.
Conclusion
We can see, we could pass message using the API.
Thank you.