Spring Boot + RabbitMQ (2021) - Multiple Consumers Example | CodeUsingJava






















Spring Boot + RabbitMQ (2021) - Multiple Consumers

In the previous articles, we have configured rabbitmq in our system and sent the hello world messages from Producer to the Consumers.
A single message can be sent to the only single consumer at a time, but with the help of multiple queues and binding, it is possible to send the same message to more then one consumer at a time.
The flow can be explained by the help of following diagram-
project flow

In this tutorial, we would be configuring multiple queues and sending the messages so that the feature of sending message to a number of consumers can be implemented using the functionality.
  • Project Structure

    Producer Application

    The producer application will send the message to the Rabbitmq and the further the message is passed to the consumer.
    This will be the standard directory layout for maven project structure-
    Spring Boot Maven Project structure
    We need to start by creating a Maven pom.xml(Project Object Model) file. The pom.xml file contains the project configuration details.
    	<?xml version="1.0" encoding="UTF-8"?>
    	<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    		xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    		<modelVersion>4.0.0</modelVersion>
    	
    		<groupId>com.codeusingjava</groupId>
    		<artifactId>RabbitMqProducer</artifactId>
    		<version>0.0.1</version>
    		<packaging>jar</packaging>
    	
    		<name>RabbitMqProducer</name>
    		<description>RabbitMqProducer</description>
    	
    		<parent>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-parent</artifactId>
    			<version>1.5.7.RELEASE</version>
    			<relativePath/> <!-- lookup parent from repository -->
    		</parent>
    	
    		<properties>
    			<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    			<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    			<java.version>1.8</java.version>
    		</properties>
    	
    		<dependencies>
    			<dependency>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-starter</artifactId>
    			</dependency>
    			<dependency>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-starter-amqp</artifactId>
    			</dependency>
    			<dependency>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-starter-test</artifactId>
    				<scope>test</scope>
    			</dependency>  
    			
    			
    		</dependencies>
    	
    		<build>
    			<plugins>
    				<plugin>
    					<groupId>org.springframework.boot</groupId>
    					<artifactId>spring-boot-maven-plugin</artifactId>
    				</plugin>
    			</plugins>
    		</build>
    	
    	</project>
    	
    	
    
    All the configurations related to the rabbitmq are done in the application.properties file.
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.exchange=jsa.exchange.logs
    spring.rabbitmq.queue=jsa.queue
    spring.rabbitmq.routingkey=jsa.routekey
    
    We create a model class named Records which contains only two parameters i.e content and the routing key.
    	package com.codeusingjava.model;
    
    public class Records {
    	private String content;
    	private String routingKey;
    	
    	public Records(){};
    	
    	public Records(String content, String routingKey){
    		this.content = content;
    		this.routingKey = routingKey;
    	}
    	
    	public String getContent(){
    		return this.content;
    	}
    	
    	public void setContent(String content){
    		this.content = content;
    	}
    	
    	public String getRoutingKey(){
    		return this.routingKey;
    	}
    	
    	public void setRoutingKey(String routingKey){
    		this.routingKey = routingKey;
    	}
    	
    	@Override
    	public String toString() {
    		return String.format("{content = %s, routingKey = %s}", content, routingKey);
    	}
    }
    
    
    
    The Producer class for the application is as follows-
    The amqpTemplate is used for converting the java object into the amqpexchange and then send this to the rabbitmq server.
      
    	package com.codeusingjava.producer;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    
    import com.codeusingjava.model.Records;
    
    @Component
    public class Producers {
    	
    	@Autowired
    	private AmqpTemplate amqpTemplate;
    	
    	@Value("")
    	private String exchange;
    	
    	public void produce(Records records){
    		String routingKey = records.getRoutingKey();
    		amqpTemplate.convertAndSend(exchange, routingKey, records);
    		System.out.println("Send msg = " + records);
    	}
    }
    
    The configuration class can be defined as follows-
    The Jackson2JsonMessageConverter converts the object to JSON and then JSON to object of java.
    	package com.codeusingjava.config;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RabbitMqConfiguration {
    	
        @Bean
        public MessageConverter jsonMessageConverter(){
            return new Jackson2JsonMessageConverter();
        }
        
        public AmqpTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.setMessageConverter(jsonMessageConverter());
            return rabbitTemplate;
        }
    }
    
    
    The main class for the Rabbitmq Producer can be defined as follows-
    Here we call the produce method and pass the content and the routing key to the rabbitmq.
    	package com.codeusingjava;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    import com.codeusingjava.model.Records;
    import com.codeusingjava.producer.Producers;
    
    
    @SpringBootApplication
    public class RabbitMqProducerApplication  implements CommandLineRunner{
    
    	public static void main(String[] args) {
    		SpringApplication.run(RabbitMqProducerApplication.class, args);
    	}
    	
    	@Autowired
    	Producers producer;
    
    	@Override
    	public void run(String... args) throws Exception {
    		//using 1st routing
    		String content = "Tomcat Started Successfully....";
    		String routingKey = "sys.dev.info";
    		
    		// send to RabbitMQ
    		producer.produce(new Records(content, routingKey));
    		
    		//using 2nd routing
    		content = "Exception is noticed..";
    		routingKey = "sys.test.error";
    		
    		// send to RabbitMQ
    		producer.produce(new Records(content, routingKey));
    		
    		//using 3rd routing
    		content = "Here is your Message..";
    		routingKey = "app.prod.error";
    		
    		// send to RabbitMQ
    		producer.produce(new Records(content, routingKey));
    	}
    }
    

    Consumer Application

    The message sent by the Producer class will be received by multiple consumers.
    The Project Structure for the Consumer Application is as follows-
    Spring Boot Maven Project structure

    The pom.xml for the ConsumerApplication would be same as that of the Producer Application.
      
    	<?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    
    	<groupId>com.codeusingjava</groupId>
    	<artifactId>RabbitMqConsumer</artifactId>
    	<version>0.0.1</version>
    	<packaging>jar</packaging>
    
    	<name>RabbitMqConsumer</name>
    	<description>SpringRabbitMqConsumer</description>
    
    	<parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>1.5.7.RELEASE</version>
    		<relativePath/> <!-- lookup parent from repository -->
    	</parent>
    
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    		<java.version>1.8</java.version>
    	</properties>
    
    	<dependencies>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter</artifactId>
    		</dependency>
    		<dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-test</artifactId>
    			<scope>test</scope>
    		</dependency>
    	</dependencies>
    
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-maven-plugin</artifactId>
    			</plugin>
    		</plugins>
    	</build>
    
    
    </project>
    
    
    The application.properties file would be defined as follows-
     
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.queue=jsa.records.sys
    
    The model class for the consumer would be defined same as that of the producer ones.
     
    	package com.codeusingjava.model;
    
    public class Records {
    	private String content;
    	private String routingKey;
    	
    	public Records(){}
    	
    	public Records(String content, String routingKey){
    		this.content = content;
    		this.routingKey = routingKey;
    	}
    	
    	public String getContent(){
    		return this.content;
    	}
    	
    	public void setContent(String content){
    		this.content = content;
    	}
    	
    	public String getRoutingKey(){
    		return this.routingKey;
    	}
    	
    	public void setRoutingKey(String routingKey){
    		this.routingKey = routingKey;
    	}
    	
    	@Override
    	public String toString() {
    		return String.format("{content = %s, routingKey = %s}", content, routingKey);
    	}
    }
    
    
    The Consumers class will receive the message from the Rabbitmq and prints the message to the console.
    The RabbitListener listens the queue for any incoming message.
      
    package com.codeusingjava.consumer;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import com.codeusingjava.model.Records;
    
    
    @Component
    public class Consumers {
    	
    	@RabbitListener(queues="", containerFactory="jsaFactory")
        public void recievedMessage(Records records) {
            System.out.println("Recieved Message: " + records);
        }
    }
    
    The main class for the RabbitMqConsumerApplication can be defined as follows-
      
    package com.codeusingjava;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class RabbitMqConsumerApplication {
    
    	public static void main(String[] args) {
    		SpringApplication.run(RabbitMqConsumerApplication.class, args);
    	}
    }
    
    

    Rabbitmq Configurations:

    Before running the application, Open Command Prompt and go to the location where RabbitMQ is installed and start the server by the following command:-
    rabbitmq-server start
    Spring BootRabbitMQ server start

    install all the plugins using the following command:-
    rabbitmq-plugins.bat enable rabbitmq_management
    Spring BootRabbitMQ plugin

    Now open localhost:15672 and add login by the following username and password-
    username- guest , password- guest
    Now add the following exchanges-
    jsa.exchange.records
    jsa.exchange.records.error
    Now go to the queues section and add the following queues-
    jsa.records.sys
    jsa.records.prod.error
    Finally we bind all the rabbitmq queues and exchanges as follows-
    Binding
Run the application
Firstly we run the Rabbitmq Producer application and then run the Rabbitmq consumer application as Spring Boot application and the output is as follows-
Sender console

rabbitmq queue
The graph shows that the message is received by the Rabbitmq
rabbitmq graph
Finally the message is received by the receiver and gets printed to the console-
Receiver console

Downloads-

Spring Boot + RabbitMQ multiple consumers example