Spring Boot + Apache Kafka + JSON Message Example(2023)
In the previous article, I explained about the Producing and consuming message using apache kafka. Here we would be dealing with the producing and consuming the JSON object. We will need mainly three things:- A Producer who sends the message.
- A Consumer who receives the message.
- A Topic ,which will receive the message from the producer and send it to the consumer.
Project Structure
This will be the standard directory layout for maven project structure-We need to start by creating a Maven pom.xml(Project Object Model) file. The pom.xml file contains the project configuration details.
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.9.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.codeusingjava</groupId> <artifactId>kafkajsonmessage</artifactId> <version>0.0.1-SNAPSHOT</version> <name>kafaExample</name> <description>Spring Boot Kafka</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>We define the port for the kafka bootstrap server in our application.properties file:
kafka.bootstrap-servers: localhost:9092We create a model class named Person having only one property i.e name.
package com.codeusingjava.model; public class Person { private String name; public Person() { } public Person(String name) { this.name = name; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { return "User [name=" + name + "]"; } }The configuration class for the producer class is as follows-
The ProducerFactory creates the instance for kafka producers. Several other configurations are done here like
- BOOTSTRAP_SERVERS_CONFIG denotes the host number and the port number.
- Key KEY_SERIALIZER_CLASS_CONFIG is used to define the key.
- VALUE_SERIALIZER_CLASS_CONFIG for defining the values. We will use JsonSerializer in our application when dealing with JSON object.
package com.codeusingjava.config; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.support.serializer.JsonSerializer; import com.codeusingjava.model.Person; import java.util.HashMap; import java.util.Map; @Configuration public class ProducersConfig { @Value("0") private String bootstrapServers; @Bean public ProducerFactory<String, Person> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName()); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, Person> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }The configuration class for the consumer is as follows- Inside ConsumerFactory, several configurations are defined like BOOTSTRAP_SERVERS_CONFIG, group-id config and deserializer class configurations for both key and value.
The consumer API for Kafka is not thread-safe. We can use the ConcurrentKafkaListenerContainerFactory for setting consumer api and at the same time, setting other kafka properties.
The consumer class will be used for consuming the JSON messages passed.
package com.codeusingjava.config; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.support.serializer.JsonDeserializer; import com.codeusingjava.model.Person; import com.codeusingjava.util.AppConstants; @EnableKafka @Configuration public class ConsumersConfig { @Value("0") private String bootstrapServers; @Bean public ConsumerFactory<String, Person> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, AppConstants.GROUP_ID); return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(Person.class)); } @Bean public ConcurrentKafkaListenerContainerFactory<String, Person> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Person> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
package com.codeusingjava.util; public class AppConstants { public static final String TOPIC="codeusingjavaTopic"; public static final String GROUP_ID="groupId"; }The ProducerService class has the method for sending the message to the topic. The KafkaTemplate is used for the purpose.
package com.codeusingjava.service; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import com.codeusingjava.model.Person; import com.codeusingjava.util.AppConstants; @Service public class ProducerService { @Autowired private KafkaTemplate<String, Person> kafkaTemplate; public void send(Person person) { System.out.println("Json Serializer for the Person : {}"+person); kafkaTemplate.send(AppConstants.TOPIC, user); } }In the controller class, an API is defined which will be run for sending the message to the topic.
package com.codeusingjava.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import com.codeusingjava.model.Person; import com.codeusingjava.service.ProducerService; @RestController @RequestMapping(value = "/kafka/") public class ProducerController { @Autowired ProducerService kafkaProducer; @PostMapping(value = "/producer") public String sendMessage(@RequestBody Person user) { kafkaProducer.send(user); return "Message sent Successfully to the topic"; } }The main class for running the application is as follows-
The values of the Topic and the groupId are coming from the Appconstants class where we have defined its values.
The @KafkaListener annotation is used here for listening the message from the topic.
package com.codeusingjava; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.kafka.annotation.KafkaListener; import com.codeusingjava.model.Person; import com.codeusingjava.util.AppConstants; @SpringBootApplication public class KafaJSONMessageApplication { public static void main(String[] args) { SpringApplication.run(KafaJSONMessageApplication.class, args); } @KafkaListener(topics = AppConstants.TOPIC, groupId=AppConstants.GROUP_ID) public void listen(Person user) { System.out.println("Received info--> : " +user); } }
Run the application
Before running the application, we start the zookeeper server and the kafka server using the following command-We start the zookeeper service by the following command-
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
We open another command prompt and then start the Kafka broker by the following command-
.\bin\windows\kafka-server-start.bat .\config\server.properties
Now we open the REST api client and run the following API:-
http://localhost:8080/kafka/producer
When we open the offset -explorer, we can see that the group id can be seen at the consumer side and the topic name is also populated.
Finally the message is received by the consumer and gets printed on the console.