Spring Boot + RabbitMQ (2023) - Multiple Consumers
In the previous articles, we have configured rabbitmq in our system and sent the hello world messages from Producer to the Consumers.A single message can be sent to the only single consumer at a time, but with the help of multiple queues and binding, it is possible to send the same message to more then one consumer at a time.
The flow can be explained by the help of following diagram-
In this tutorial, we would be configuring multiple queues and sending the messages so that the feature of sending message to a number of consumers can be implemented using the functionality.
-
Project Structure
Producer Application
The producer application will send the message to the Rabbitmq and the further the message is passed to the consumer.
This will be the standard directory layout for maven project structure-
We need to start by creating a Maven pom.xml(Project Object Model) file. The pom.xml file contains the project configuration details.<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.codeusingjava</groupId> <artifactId>RabbitMqProducer</artifactId> <version>0.0.1</version> <packaging>jar</packaging> <name>RabbitMqProducer</name> <description>RabbitMqProducer</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.7.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
All the configurations related to the rabbitmq are done in the application.properties file.spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.exchange=jsa.exchange.logs spring.rabbitmq.queue=jsa.queue spring.rabbitmq.routingkey=jsa.routekey
We create a model class named Records which contains only two parameters i.e content and the routing key.package com.codeusingjava.model; public class Records { private String content; private String routingKey; public Records(){}; public Records(String content, String routingKey){ this.content = content; this.routingKey = routingKey; } public String getContent(){ return this.content; } public void setContent(String content){ this.content = content; } public String getRoutingKey(){ return this.routingKey; } public void setRoutingKey(String routingKey){ this.routingKey = routingKey; } @Override public String toString() { return String.format("{content = %s, routingKey = %s}", content, routingKey); } }
The Producer class for the application is as follows-
The amqpTemplate is used for converting the java object into the amqpexchange and then send this to the rabbitmq server.package com.codeusingjava.producer; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import com.codeusingjava.model.Records; @Component public class Producers { @Autowired private AmqpTemplate amqpTemplate; @Value("") private String exchange; public void produce(Records records){ String routingKey = records.getRoutingKey(); amqpTemplate.convertAndSend(exchange, routingKey, records); System.out.println("Send msg = " + records); } }
The configuration class can be defined as follows-
The Jackson2JsonMessageConverter converts the object to JSON and then JSON to object of java.package com.codeusingjava.config; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMqConfiguration { @Bean public MessageConverter jsonMessageConverter(){ return new Jackson2JsonMessageConverter(); } public AmqpTemplate rabbitTemplate(ConnectionFactory connectionFactory) { final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(jsonMessageConverter()); return rabbitTemplate; } }
Here we call the produce method and pass the content and the routing key to the rabbitmq.package com.codeusingjava; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import com.codeusingjava.model.Records; import com.codeusingjava.producer.Producers; @SpringBootApplication public class RabbitMqProducerApplication implements CommandLineRunner{ public static void main(String[] args) { SpringApplication.run(RabbitMqProducerApplication.class, args); } @Autowired Producers producer; @Override public void run(String... args) throws Exception { //using 1st routing String content = "Tomcat Started Successfully...."; String routingKey = "sys.dev.info"; // send to RabbitMQ producer.produce(new Records(content, routingKey)); //using 2nd routing content = "Exception is noticed.."; routingKey = "sys.test.error"; // send to RabbitMQ producer.produce(new Records(content, routingKey)); //using 3rd routing content = "Here is your Message.."; routingKey = "app.prod.error"; // send to RabbitMQ producer.produce(new Records(content, routingKey)); } }
Consumer Application
The message sent by the Producer class will be received by multiple consumers.
The Project Structure for the Consumer Application is as follows-
The pom.xml for the ConsumerApplication would be same as that of the Producer Application.<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.codeusingjava</groupId> <artifactId>RabbitMqConsumer</artifactId> <version>0.0.1</version> <packaging>jar</packaging> <name>RabbitMqConsumer</name> <description>SpringRabbitMqConsumer</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.7.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
The application.properties file would be defined as follows-spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.queue=jsa.records.sys
package com.codeusingjava.model; public class Records { private String content; private String routingKey; public Records(){} public Records(String content, String routingKey){ this.content = content; this.routingKey = routingKey; } public String getContent(){ return this.content; } public void setContent(String content){ this.content = content; } public String getRoutingKey(){ return this.routingKey; } public void setRoutingKey(String routingKey){ this.routingKey = routingKey; } @Override public String toString() { return String.format("{content = %s, routingKey = %s}", content, routingKey); } }
The Consumers class will receive the message from the Rabbitmq and prints the message to the console.
The RabbitListener listens the queue for any incoming message.package com.codeusingjava.consumer; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import com.codeusingjava.model.Records; @Component public class Consumers { @RabbitListener(queues="", containerFactory="jsaFactory") public void recievedMessage(Records records) { System.out.println("Recieved Message: " + records); } }
The main class for the RabbitMqConsumerApplication can be defined as follows-package com.codeusingjava; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class RabbitMqConsumerApplication { public static void main(String[] args) { SpringApplication.run(RabbitMqConsumerApplication.class, args); } }
Rabbitmq Configurations:
Before running the application, Open Command Prompt and go to the location where RabbitMQ is installed and start the server by the following command:-
rabbitmq-server start
install all the plugins using the following command:-
rabbitmq-plugins.bat enable rabbitmq_management
Now open localhost:15672 and add login by the following username and password-
username- guest , password- guest
Now add the following exchanges-
jsa.exchange.records
jsa.exchange.records.error
Now go to the queues section and add the following queues-
jsa.records.sys
jsa.records.prod.error
Finally we bind all the rabbitmq queues and exchanges as follows-
Run the application
Firstly we run the Rabbitmq Producer application and then run the Rabbitmq consumer application as Spring Boot application and the output is as follows-The graph shows that the message is received by the Rabbitmq