Spring Boot + Apache Kafka Hello World(2021) | CodeUsingJava






















Spring Boot + Apache Kafka Hello World(2021)

In the previous article, I explained about the introduction, architecture and installation of Apache kafka. In this tutorial, we would configure the kafka servers and pass the message from the producer to the consumer in a practical manner. We will need mainly three things:
  • A Producer who sends the message.
  • A Consumer who receives the message.
  • A Topic ,which will receive the message from the producer and send it to the consumer.

Project Structure

  • This will be the standard directory layout for maven project structure-
    Spring Boot Maven Project
    We need to start by creating a Maven pom.xml(Project Object Model) file. The pom.xml file contains the project configuration details.
    	<?xml version="1.0" encoding="UTF-8"?>
    	<project xmlns="http://maven.apache.org/POM/4.0.0"
    		xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    		xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    		<modelVersion>4.0.0</modelVersion>
    		<parent>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-parent</artifactId>
    			<version>2.3.9.RELEASE</version>
    			<relativePath /> <!-- lookup parent from repository -->
    		</parent>
    		<groupId>com.codeusingjava</groupId>
    		<artifactId>kafa-hello-message</artifactId>
    		<version>0.0.1-SNAPSHOT</version>
    		<name>kafaExample</name>
    		<description>Spring Boot Kafka</description>
    		<properties>
    			<java.version>1.8</java.version>
    		</properties>
    		<dependencies>
    			<dependency>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-starter</artifactId>
    			</dependency>
    	
    			<dependency>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-starter-web</artifactId>
    			</dependency>
    	
    			<dependency>
    				<groupId>org.springframework.kafka</groupId>
    				<artifactId>spring-kafka</artifactId>
    			</dependency>
    	
    		</dependencies>
    	
    		<build>
    			<plugins>
    				<plugin>
    					<groupId>org.springframework.boot</groupId>
    					<artifactId>spring-boot-maven-plugin</artifactId>
    				</plugin>
    			</plugins>
    		</build>
    	
    	</project>
    		
    
    We define the port for the kafka bootstrap server in our application.properties file:
    	kafka.bootstrap-servers: localhost:9092
    
    The configuration class for the producer class is as follows-
    The ProducerFactory creates the instance for kafka producers. Several other configurations are done here like
    • BOOTSTRAP_SERVERS_CONFIG denotes the host number and the port number.
    • Key KEY_SERIALIZER_CLASS_CONFIG is used to define the key.
    • VALUE_SERIALIZER_CLASS_CONFIG for defining the values.
    package com.codeusingjava.config;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.core.DefaultKafkaProducerFactory;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.core.ProducerFactory;
    
    @Configuration
    public class ProducersConfig {
        @Value("0")
        private String bootstrapServers;
    
        @Bean
        public ProducerFactory<String, String> producerFactory() {
            Map<String, Object> configProps = new HashMap<>();
            configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            return new DefaultKafkaProducerFactory<>(configProps);
        }
        @Bean
        public KafkaTemplate<String, String> kafkaTemplate() {
            return new KafkaTemplate<>(producerFactory());
        }
    }
    
    The configuration class for the consumer is as follows- Inside ConsumerFactory, several configurations are defined like BOOTSTRAP_SERVERS_CONFIG, group-id config and deserializer class configurations for both key and value.
    The consumer API for Kafka is not thread-safe. We can use the ConcurrentKafkaListenerContainerFactory for setting consumer api and at the same time, setting other kafka properties.
    package com.codeusingjava.config;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    
    @EnableKafka
    @Configuration
    public class ConsumersConfig {
    
        @Value("0")
        private String bootstrapServers;
    
        @Bean
        public ConsumerFactory<String, String> consumerFactory() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            return new DefaultKafkaConsumerFactory<>(props);
        }
        @Bean
        public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String>
                    factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            return factory;
        }
    }
    
    
    The AppConstant class is used for parsing other classes definitions in the form of members that are public, static and final in nature.
    	package com.codeusingjava.util;
    
    	public class AppConstants {
    		public static final String TOPIC="codeusingjavaTopic";
    		public static final String GROUP_ID="groupId";
    	
    	}
    	
    
    The ProducerService class has the method for sending the message to the topic. The KafkaTemplate is used for the purpose.
    	package com.codeusingjava.service;
    
    	import org.springframework.beans.factory.annotation.Autowired;
    	import org.springframework.kafka.core.KafkaTemplate;
    	import org.springframework.stereotype.Service;
    	
    	import com.codeusingjava.util.AppConstants;
    	
    	@Service
    	public class ProducerService {
    		@Autowired
    		private KafkaTemplate<String, String> kafkaTemplate;
    	
    		
    	
    		public void send(String message) {
    	
    			kafkaTemplate.send(AppConstants.TOPIC, message);
    		}
    	}
    	
    
    In the controller class, an API is defined which will be run for sending the message to the topic.
    	package com.codeusingjava.controller;
    
    	import org.springframework.beans.factory.annotation.Autowired;
    	import org.springframework.kafka.annotation.KafkaListener;
    	import org.springframework.web.bind.annotation.GetMapping;
    	import org.springframework.web.bind.annotation.RequestMapping;
    	import org.springframework.web.bind.annotation.RequestParam;
    	import org.springframework.web.bind.annotation.RestController;
    	
    	import com.codeusingjava.service.ProducerService;
    	
    	@RestController
    	@RequestMapping(value = "/kafka/")
    	public class ProducerController {
    	
    		@Autowired
    		ProducerService kafkaProducer;
    	
    	
    		@GetMapping(value = "/producer")
    		public String sendMessage(@RequestParam("message") String message)
    		{
    			kafkaProducer.send(message);
    			return "Message sent Successfully to the topic";
    		}
    	
    	}
    	
    
    The main class for running the application is as follows-
    The values of the Topic and the groupId are coming from the Appconstants class where we have defined its values.
    The @KafkaListener annotation is used here for listening the message from the topic.
    	package com.codeusingjava;
    
    	import org.springframework.boot.SpringApplication;
    	import org.springframework.boot.autoconfigure.SpringBootApplication;
    	import org.springframework.kafka.annotation.KafkaListener;
    	
    	import com.codeusingjava.util.AppConstants;
    	
    	@SpringBootApplication
    	public class KafkaHelloMessageApplication {
    	
    		public static void main(String[] args) {
    			SpringApplication.run(KafkaHelloMessageApplication.class, args);
    		}
    	
    		@KafkaListener(topics = AppConstants.TOPIC, groupId=AppConstants.GROUP_ID)
    		public void listen(String message) {
    	
    			System.out.println("Received Messasge : " + message);
    		}
    	}
    	
    
Run the application
Before running the application, we start the zookeeper server and the kafka server using the following command-
We start the zookeeper service by the following command-
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
We open another command prompt and then start the Kafka broker by the following command-
.\bin\windows\kafka-server-start.bat .\config\server.properties
browser url
We open a new command prompt and write the following command and mention the topic which is created in our application. We see that the message is successfully received from the topic.
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic codeusingjavaTopic
cmd topic
Finally the message is received by the consumer and gets printed on the console.
spring boot

Downloads-

Spring Boot + Apache Kafka Example