Spring Boot + RabbitMQ (2023) - Implement Exchange Types
In this tutorial we will be implementing a RabbitMq Exchange types with Spring boot with the help of a simple example.RabbitMq is a AMQP broker that is used widely. AMQP stands for Advanced Message Queuing Protocol. RabbitMq is use for sending as well as receiving messages.
Different programming languages as well operating systems support RabbitMq.
We have already discussed about the basic flow of RabbitMQ in our previous article. Now we would be discussing about implementation of various exchange types in detail.
Exchanges are the key component for the transmission of message from the sender to the receiver.
It helps in the routing of messages to the specific queue with the help of routingKey, header attributes and binding value.
In this example, we have queues name primarystudentsQueue, secondarystudentsQueue and collegestudentsQueue. We will bind the message to the queues with the help of various implementation techniques.
Project Structure
This will be the standard directory layout for maven project structure--
We need to start by creating a Maven pom.xml(Project Object Model) file. The pom.xml file contains the project configuration details.
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.5.1</version> <relativePath /> <!-- lookup parent from repository --> </parent> <groupId>com.codeusingjava</groupId> <artifactId>RabbitMqExchangeTypes</artifactId> <version>0.0.1-SNAPSHOT</version> <name>RabbitMqExchangeTypes</name> <description>Demo project for Spring Boot</description> <properties> <java.version>11</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
Create the application.properties class for the configuration. All the configurations related to RabbitMQ are also done in this file.spring.application.name=SpringbootRabbitMQ server.port=8080 spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest rabbitmq.queue=MessageQueue rabbitmq.exchange=exchange rabbitmq.routingkey=routekey
Let us discuss this exchange types in detail:-Direct Exchange
Here, the routing of the message takes place where the binding key is exactly matched with the binding Queue, the message will be sent to the queue which consist of the exact match. It is also known as one-to-one exchange.
Create the Configuration class for Direct Exchange-package com.codeusingjava.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class DirectConfiguration { @Bean Queue primarystudentsQueue() { return new Queue("primarystudentsQueue", false); } @Bean Queue secondarystudentsQueue() { return new Queue("secondarystudentsQueue", false); } @Bean Queue collegestudentsQueue() { return new Queue("collegestudentsQueue", false); } @Bean DirectExchange exchange() { return new DirectExchange("direct-exchange"); } @Bean Binding primarystudentsBinding(Queue primarystudentsQueue, DirectExchange exchange) { return BindingBuilder.bind(primarystudentsQueue).to(exchange).with("queue.primarystudents"); } @Bean Binding secondarystudentsBinding(Queue secondarystudentsQueue, DirectExchange exchange) { return BindingBuilder.bind(secondarystudentsQueue).to(exchange).with("queue.secondarystudents"); } @Bean Binding collegestudentsBinding(Queue collegestudentsQueue,DirectExchange exchange) { return BindingBuilder.bind(collegestudentsQueue).to(exchange).with("queue.collegestudents"); } }
Creating the controller class for the direct exchange-package com.codeusingjava.controller; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping(value = "/direct/") public class DirectController { @Autowired private AmqpTemplate amqpTemplate; @GetMapping(value = "/sendmsg") public String producer(@RequestParam("exchange") String exchange, @RequestParam("routingKey") String routingKey, @RequestParam("msg") String msg) { amqpTemplate.convertAndSend(exchange, routingKey, msg); return "Message Successfully sent---- Method used Direct Exchange"; } }
Output
Run the following url-
http://localhost:8080/direct/sendmsg?exchange=direct-exchange&routingKey=queue.collegestudents&msg=hello%20college%20students
If we run our program as Spring boot application, we get the output as follows-
Now run the url http://localhost:15672
We can see message sent only to the exactly matching queue.
Fanout Exchange
The Fanout Exchange is used where the message needs to be passed to all the queues bounded it. It is one-to-many type of Exchange. In case of fanout-exchange, routing key is not needed.
Create the configuration class for Fanout Exchangepackage com.codeusingjava.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class FanoutConfiguration { @Bean Queue primarystudentsQueue() { return new Queue("primarystudentsQueue", false); } @Bean Queue secondarystudentsQueue() { return new Queue("secondarystudentsQueue", false); } @Bean Queue collegestudentsQueue() { return new Queue("collegestudentsQueue", false); } @Bean FanoutExchange exchange() { return new FanoutExchange("fanout-exchange"); } @Bean Binding primarystudentsBinding(Queue primarystudentsQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(primarystudentsQueue).to(fanoutExchange); } @Bean Binding secondarystudentsBinding(Queue secondarystudentsQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(secondarystudentsQueue).to(fanoutExchange); } @Bean Binding collegestudentsBinding(Queue collegestudentsQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(collegestudentsQueue).to(fanoutExchange); } }
Creating the controller class for the FanoutController-package com.codeusingjava.controller; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping(value = "/fanout/") public class FanoutController { @Autowired private AmqpTemplate amqpTemplate; @GetMapping(value = "/sendmsg") public String producer(@RequestParam("exchange") String exchange, @RequestParam("msg") String msg) { amqpTemplate.convertAndSend(exchange, "", msg); return "Message sent Exchange method used-- Fanout Exchange "; } }
Output
Run the following url-
http://localhost:8080/fanout/sendmsg?exchange=fanout-exchange&msg=hello%20everyone
If we run our program as Spring boot application, we get the output as follows-
Now run the url http://localhost:15672
We can see message sent to all the binding queue.
Topic Exchange
The logic behind the topic-exchange is similar to that of direct exchange. The only difference is here, there is no need to have an exact matching. The pattern used for routing can be in the form of Regular expression i.e. we can use symbols like dot(.), asterisk(*) or Hash(#).
Creating the configuration class for the topic Exchange-package com.codeusingjava.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class TopicConfiguration { @Bean Queue primarystudentsQueue() { return new Queue("primarystudentsQueue", false); } @Bean Queue secondarystudentsQueue() { return new Queue("secondarystudentsQueue", false); } @Bean Queue collegestudentsQueue() { return new Queue("collegestudentsQueue", false); } @Bean Queue totalQueue() { return new Queue("totalQueue", false); } @Bean TopicExchange topicExchange() { return new TopicExchange("topic-exchange"); } @Bean Binding primarystudentsBinding(Queue primarystudentsQueue, TopicExchange topicExchange) { return BindingBuilder.bind(primarystudentsQueue).to(topicExchange).with("queue.primarystudents"); } @Bean Binding secondarystudentsBinding(Queue secondarystudentsQueue, TopicExchange topicExchange) { return BindingBuilder.bind(secondarystudentsQueue).to(topicExchange).with("queue.secondarystudents"); } @Bean Binding collegestudentsBinding(Queue collegestudentsQueue, TopicExchange topicExchange) { return BindingBuilder.bind(collegestudentsQueue).to(topicExchange).with("queue.collegestudents"); } @Bean Binding allBinding(Queue totalQueue, TopicExchange topicExchange) { return BindingBuilder.bind(totalQueue).to(topicExchange).with("queue.*"); } }
package com.codeusingjava.controller; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping(value = "/topic/") public class TopicController { @Autowired private AmqpTemplate amqpTemplate; @GetMapping(value = "/sendmsg") public String producer(@RequestParam("exchange") String exchange, @RequestParam("routingKey") String routingKey, @RequestParam("msg") String msg) { amqpTemplate.convertAndSend(exchange, routingKey, msg); return "Message Successfully sent---- Method used Topic Exchange"; } }
Output
Run the following url-
http://localhost:8080/topic/sendmsg?exchange=topic-exchange&routingKey=queue.collegestudents&msg=hello%20college%20students
If we run our program as Spring boot application, we get the output as follows-
Now run the url http://localhost:15672
We can see message sent to the queues which is in the form of regular expression as well.
Header Exchange
As the name suggests, in case of Header Exchange, Instead of routing key, header attributes are used for binding the message to a particular queue.
The routing key can be either string, integer or Hash functions as well.
package com.codeusingjava.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.HeadersExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class HeaderConfiguration { @Bean Queue primarystudentsQueue() { return new Queue("primarystudentsQueue", false); } @Bean Queue secondarystudentsQueue() { return new Queue("secondarystudentsQueue", false); } @Bean Queue collegestudentsQueue() { return new Queue("collegestudentsQueue", false); } @Bean HeadersExchange headerExchange() { return new HeadersExchange("header-exchange"); } @Bean Binding primarystudentsBinding(Queue primarystudentsQueue, HeadersExchange exchange) { return BindingBuilder.bind(primarystudentsQueue).to(exchange).where("students").matches("primarystudents"); } @Bean Binding secondarystudentsBinding(Queue secondarystudentsQueue,HeadersExchange exchange) { return BindingBuilder.bind(secondarystudentsQueue).to(exchange).where("students").matches("secondarystudents"); } @Bean Binding collegestudentsBinding(Queue collegestudentsQueue, HeadersExchange exchange) { return BindingBuilder.bind(collegestudentsQueue).to(exchange).where("students").matches("collegestudents"); } }
Creating the configuration class for Header controller-package com.codeusingjava.controller; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping(value = "/header/") public class HeaderController { @Autowired private AmqpTemplate amqpTemplate; @GetMapping(value = "/sendmsg") public String producer(@RequestParam("exchange") String exchange, @RequestParam("students") String students, @RequestParam("msg") String msg) { MessageProperties msgProperties = new MessageProperties(); msgProperties.setHeader("students", students); MessageConverter messageConverter = new SimpleMessageConverter(); Message message = messageConverter.toMessage(msg, msgProperties); amqpTemplate.send(exchange, "", message); return "Message Successfully sent---- Method used Header Exchange"; } }
Output
Run the following url-http://localhost:8080/topic/sendmsg?exchange=topic-exchange&routingKey=queue.collegestudents&msg=hello%20college%20students
If we run our program as Spring boot application, we get the output as follows-
Now run the url http://localhost:15672
We can see message sent to the queues satisfying the header attributes.