Spring Boot + RabbitMQ (2021) - Implement Exchange Types | CodeUsingJava






















Spring Boot + RabbitMQ (2021) - Implement Exchange Types

In this tutorial we will be implementing a RabbitMq Exchange types with Spring boot with the help of a simple example.
RabbitMq is a AMQP broker that is used widely. AMQP stands for Advanced Message Queuing Protocol. RabbitMq is use for sending as well as receiving messages.
Different programming languages as well operating systems support RabbitMq.
We have already discussed about the basic flow of RabbitMQ in our previous article. Now we would be discussing about implementation of various exchange types in detail.
Exchanges are the key component for the transmission of message from the sender to the receiver.
It helps in the routing of messages to the specific queue with the help of routingKey, header attributes and binding value.
Spring BootRabbitMQ Exchange types
In this example, we have queues name primarystudentsQueue, secondarystudentsQueue and collegestudentsQueue. We will bind the message to the queues with the help of various implementation techniques.

Project Structure

This will be the standard directory layout for maven project structure-
Spring Boot Maven Project
  • We need to start by creating a Maven pom.xml(Project Object Model) file. The pom.xml file contains the project configuration details.
    		<?xml version="1.0" encoding="UTF-8"?>
    		<project xmlns="http://maven.apache.org/POM/4.0.0"
    			xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    			xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    			<modelVersion>4.0.0</modelVersion>
    			<parent>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-starter-parent</artifactId>
    				<version>2.5.1</version>
    				<relativePath /> <!-- lookup parent from repository -->
    			</parent>
    			<groupId>com.codeusingjava</groupId>
    			<artifactId>RabbitMqExchangeTypes</artifactId>
    			<version>0.0.1-SNAPSHOT</version>
    			<name>RabbitMqExchangeTypes</name>
    			<description>Demo project for Spring Boot</description>
    			<properties>
    				<java.version>11</java.version>
    			</properties>
    			<dependencies>
    				<dependency>
    					<groupId>org.springframework.boot</groupId>
    					<artifactId>spring-boot-starter</artifactId>
    				</dependency>
    		
    				<dependency>
    					<groupId>org.springframework.boot</groupId>
    					<artifactId>spring-boot-starter-test</artifactId>
    					<scope>test</scope>
    				</dependency>
    		
    				<dependency>
    					<groupId>org.springframework.boot</groupId>
    					<artifactId>spring-boot-starter-amqp</artifactId>
    				</dependency> 
    				<dependency>
    					<groupId>org.springframework.boot</groupId>
    					<artifactId>spring-boot-starter-web</artifactId>
    				</dependency>
    			</dependencies>
    		
    			<build>
    				<plugins>
    					<plugin>
    						<groupId>org.springframework.boot</groupId>
    						<artifactId>spring-boot-maven-plugin</artifactId>
    					</plugin>
    				</plugins>
    			</build>
    		
    		</project>
    		
    	

    Create the application.properties class for the configuration. All the configurations related to RabbitMQ are also done in this file.
    spring.application.name=SpringbootRabbitMQ
    server.port=8080
    
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    
    rabbitmq.queue=MessageQueue
    rabbitmq.exchange=exchange
    rabbitmq.routingkey=routekey
    
        
    Let us discuss this exchange types in detail:-

    Direct Exchange

    Here, the routing of the message takes place where the binding key is exactly matched with the binding Queue, the message will be sent to the queue which consist of the exact match. It is also known as one-to-one exchange.
    Spring BootRabbitMQ Direct Exchange types
    Create the Configuration class for Direct Exchange-
    	package com.codeusingjava.config;
    
    	import org.springframework.amqp.core.Binding;
    	import org.springframework.amqp.core.BindingBuilder;
    	import org.springframework.amqp.core.DirectExchange;
    	import org.springframework.amqp.core.Queue;
    	import org.springframework.context.annotation.Bean;
    	import org.springframework.context.annotation.Configuration;
    	
    	@Configuration
    	public class DirectConfiguration {
    	
    		@Bean
    		Queue primarystudentsQueue() {
    			return new Queue("primarystudentsQueue", false);
    		}
    	
    		@Bean
    		Queue secondarystudentsQueue() {
    			return new Queue("secondarystudentsQueue", false);
    		}
    	
    		@Bean
    		Queue collegestudentsQueue() {
    			return new Queue("collegestudentsQueue", false);
    		}
    	
    		
    	
    		@Bean
    		DirectExchange exchange() {
    			return new DirectExchange("direct-exchange");
    		}
    		
    		@Bean
    		Binding primarystudentsBinding(Queue primarystudentsQueue, DirectExchange exchange) {
    			return BindingBuilder.bind(primarystudentsQueue).to(exchange).with("queue.primarystudents");
    		}
    		
    		@Bean
    		Binding secondarystudentsBinding(Queue secondarystudentsQueue, DirectExchange exchange) {
    			return BindingBuilder.bind(secondarystudentsQueue).to(exchange).with("queue.secondarystudents");
    		}
    		
    		@Bean
    		Binding collegestudentsBinding(Queue collegestudentsQueue,DirectExchange exchange) {
    			return BindingBuilder.bind(collegestudentsQueue).to(exchange).with("queue.collegestudents");
    		}	
    	}
    	
    
    Creating the controller class for the direct exchange-
    	package com.codeusingjava.controller;
    
    	import org.springframework.amqp.core.AmqpTemplate;
    	import org.springframework.beans.factory.annotation.Autowired;
    	import org.springframework.web.bind.annotation.GetMapping;
    	import org.springframework.web.bind.annotation.RequestMapping;
    	import org.springframework.web.bind.annotation.RequestParam;
    	import org.springframework.web.bind.annotation.RestController;
    	
    	@RestController
    	@RequestMapping(value = "/direct/")
    	public class DirectController {
    	
    		@Autowired
    		private AmqpTemplate amqpTemplate;
    	
    		@GetMapping(value = "/sendmsg")
    		public String producer(@RequestParam("exchange") String exchange, @RequestParam("routingKey") String routingKey,
    				@RequestParam("msg") String msg) {
    	
    			amqpTemplate.convertAndSend(exchange, routingKey, msg);
    	
    			return "Message Successfully sent---- Method used Direct Exchange";
    		}
    	
    	}
    
    Output
    Run the following url-
    http://localhost:8080/direct/sendmsg?exchange=direct-exchange&routingKey=queue.collegestudents&msg=hello%20college%20students
    If we run our program as Spring boot application, we get the output as follows-
    Spring BootRabbitMQ Direct Exchange output
    Now run the url http://localhost:15672
    We can see message sent only to the exactly matching queue.
    Spring BootRabbitMQ console

    Fanout Exchange

    The Fanout Exchange is used where the message needs to be passed to all the queues bounded it. It is one-to-many type of Exchange. In case of fanout-exchange, routing key is not needed.
    Spring BootRabbitMQ Fanout Exchange types
    Create the configuration class for Fanout Exchange
    	package com.codeusingjava.config;
    
    	import org.springframework.amqp.core.Binding;
    	import org.springframework.amqp.core.BindingBuilder;
    	import org.springframework.amqp.core.FanoutExchange;
    	import org.springframework.amqp.core.Queue;
    	import org.springframework.context.annotation.Bean;
    	import org.springframework.context.annotation.Configuration;
    	
    	@Configuration
    	public class FanoutConfiguration {
    	
    		@Bean
    		Queue primarystudentsQueue() {
    			return new Queue("primarystudentsQueue", false);
    		}
    	
    		@Bean
    		Queue secondarystudentsQueue() {
    			return new Queue("secondarystudentsQueue", false);
    		}
    	
    		@Bean
    		Queue collegestudentsQueue() {
    			return new Queue("collegestudentsQueue", false);
    		}
    	
    		
    	
    		@Bean
    		FanoutExchange exchange() {
    			return new FanoutExchange("fanout-exchange");
    		}
    		
    		@Bean
    		Binding primarystudentsBinding(Queue primarystudentsQueue, FanoutExchange fanoutExchange) {
    			return BindingBuilder.bind(primarystudentsQueue).to(fanoutExchange);
    		}
    		
    		@Bean
    		Binding secondarystudentsBinding(Queue secondarystudentsQueue, FanoutExchange fanoutExchange) {
    			return BindingBuilder.bind(secondarystudentsQueue).to(fanoutExchange);
    		}
    		
    		@Bean
    		Binding collegestudentsBinding(Queue collegestudentsQueue, FanoutExchange fanoutExchange) {
    			return BindingBuilder.bind(collegestudentsQueue).to(fanoutExchange);
    		}
    		
    	
    	}
    	
    
    Creating the controller class for the FanoutController-
    	package com.codeusingjava.controller;
    
    	import org.springframework.amqp.core.AmqpTemplate;
    	import org.springframework.beans.factory.annotation.Autowired;
    	import org.springframework.web.bind.annotation.GetMapping;
    	import org.springframework.web.bind.annotation.RequestMapping;
    	import org.springframework.web.bind.annotation.RequestParam;
    	import org.springframework.web.bind.annotation.RestController;
    	
    	@RestController
    	@RequestMapping(value = "/fanout/")
    	public class FanoutController {
    	
    		@Autowired
    		private AmqpTemplate amqpTemplate;
    	
    		@GetMapping(value = "/sendmsg")
    		public String producer(@RequestParam("exchange") String exchange,
    				@RequestParam("msg") String msg) {
    	
    			amqpTemplate.convertAndSend(exchange, "", msg);
    	
    			return "Message sent Exchange method used-- Fanout Exchange ";
    		}
    	}
    
    Output
    Run the following url-
    http://localhost:8080/fanout/sendmsg?exchange=fanout-exchange&msg=hello%20everyone
    If we run our program as Spring boot application, we get the output as follows-
    Spring BootRabbitMQ Fanout Exchange output
    Now run the url http://localhost:15672
    We can see message sent to all the binding queue.
    Spring BootRabbitMQ console

    Topic Exchange

    The logic behind the topic-exchange is similar to that of direct exchange. The only difference is here, there is no need to have an exact matching. The pattern used for routing can be in the form of Regular expression i.e. we can use symbols like dot(.), asterisk(*) or Hash(#).
    Spring BootRabbitMQ Fanout Exchange types
    Creating the configuration class for the topic Exchange-
    	package com.codeusingjava.config;
    
    	import org.springframework.amqp.core.Binding;
    	import org.springframework.amqp.core.BindingBuilder;
    	import org.springframework.amqp.core.Queue;
    	import org.springframework.amqp.core.TopicExchange;
    	import org.springframework.context.annotation.Bean;
    	import org.springframework.context.annotation.Configuration;
    	
    	@Configuration
    	public class TopicConfiguration {
    	
    		@Bean
    		Queue primarystudentsQueue() {
    			return new Queue("primarystudentsQueue", false);
    		}
    	
    		@Bean
    		Queue secondarystudentsQueue() {
    			return new Queue("secondarystudentsQueue", false);
    		}
    	
    		@Bean
    		Queue collegestudentsQueue() {
    			return new Queue("collegestudentsQueue", false);
    		}
    	
    		@Bean
    		Queue totalQueue() {
    			return new Queue("totalQueue", false);
    		}
    	
    		@Bean
    		TopicExchange topicExchange() {
    			return new TopicExchange("topic-exchange");
    		}
    		
    		@Bean
    		Binding primarystudentsBinding(Queue primarystudentsQueue, TopicExchange topicExchange) {
    			return BindingBuilder.bind(primarystudentsQueue).to(topicExchange).with("queue.primarystudents");
    		}
    		
    		@Bean
    		Binding secondarystudentsBinding(Queue secondarystudentsQueue, TopicExchange topicExchange) {
    			return BindingBuilder.bind(secondarystudentsQueue).to(topicExchange).with("queue.secondarystudents");
    		}
    		
    		@Bean
    		Binding collegestudentsBinding(Queue collegestudentsQueue, TopicExchange topicExchange) {
    			return BindingBuilder.bind(collegestudentsQueue).to(topicExchange).with("queue.collegestudents");
    		}
    		
    		@Bean
    		Binding allBinding(Queue totalQueue, TopicExchange topicExchange) {
    			return BindingBuilder.bind(totalQueue).to(topicExchange).with("queue.*");
    		}
    	
    	}
    	
    
    Creating the controller class for the topic controller-
    	package com.codeusingjava.controller;
    
    	import org.springframework.amqp.core.AmqpTemplate;
    	import org.springframework.beans.factory.annotation.Autowired;
    	import org.springframework.web.bind.annotation.GetMapping;
    	import org.springframework.web.bind.annotation.RequestMapping;
    	import org.springframework.web.bind.annotation.RequestParam;
    	import org.springframework.web.bind.annotation.RestController;
    	
    	@RestController
    	@RequestMapping(value = "/topic/")
    	public class TopicController {
    	
    		@Autowired
    		private AmqpTemplate amqpTemplate;
    	
    		@GetMapping(value = "/sendmsg")
    		public String producer(@RequestParam("exchange") String exchange, @RequestParam("routingKey") String routingKey,
    				@RequestParam("msg") String msg) {
    	
    			amqpTemplate.convertAndSend(exchange, routingKey, msg);
    	
    			return "Message Successfully sent---- Method used Topic Exchange";
    		}
    	
    	}
    
    Output
    Run the following url-
    http://localhost:8080/topic/sendmsg?exchange=topic-exchange&routingKey=queue.collegestudents&msg=hello%20college%20students
    If we run our program as Spring boot application, we get the output as follows-
    Spring BootRabbitMQ topic Exchange output
    Now run the url http://localhost:15672
    We can see message sent to the queues which is in the form of regular expression as well.
    Spring BootRabbitMQ console

    Header Exchange

    As the name suggests, in case of Header Exchange, Instead of routing key, header attributes are used for binding the message to a particular queue.
    The routing key can be either string, integer or Hash functions as well.
    Spring BootRabbitMQ Fanout Exchange types
    Creating the configuration class for the Header Configuration
    	package com.codeusingjava.config;
    
    	import org.springframework.amqp.core.Binding;
    	import org.springframework.amqp.core.BindingBuilder;
    	import org.springframework.amqp.core.HeadersExchange;
    	import org.springframework.amqp.core.Queue;
    	import org.springframework.amqp.core.TopicExchange;
    	import org.springframework.context.annotation.Bean;
    	import org.springframework.context.annotation.Configuration;
    	
    	@Configuration
    	public class HeaderConfiguration {
    	
    		@Bean
    		Queue primarystudentsQueue() {
    			return new Queue("primarystudentsQueue", false);
    		}
    	
    		@Bean
    		Queue secondarystudentsQueue() {
    			return new Queue("secondarystudentsQueue", false);
    		}
    	
    		@Bean
    		Queue collegestudentsQueue() {
    			return new Queue("collegestudentsQueue", false);
    		}
    	
    		
    	
    		@Bean
    		HeadersExchange headerExchange() {
    			return new HeadersExchange("header-exchange");
    		}
    		@Bean
    		Binding primarystudentsBinding(Queue primarystudentsQueue, HeadersExchange exchange) {
    			return BindingBuilder.bind(primarystudentsQueue).to(exchange).where("students").matches("primarystudents");
    		}
    		
    		@Bean
    		Binding secondarystudentsBinding(Queue secondarystudentsQueue,HeadersExchange exchange) {
    			return BindingBuilder.bind(secondarystudentsQueue).to(exchange).where("students").matches("secondarystudents");
    		}
    		
    		@Bean
    		Binding collegestudentsBinding(Queue collegestudentsQueue, HeadersExchange exchange) {
    			return BindingBuilder.bind(collegestudentsQueue).to(exchange).where("students").matches("collegestudents");
    		}	
    	}
    	
    
    Creating the configuration class for Header controller-
    	package com.codeusingjava.controller;
    
    	import org.springframework.amqp.core.AmqpTemplate;
    	import org.springframework.amqp.core.Message;
    	import org.springframework.amqp.core.MessageProperties;
    	import org.springframework.amqp.support.converter.MessageConverter;
    	import org.springframework.amqp.support.converter.SimpleMessageConverter;
    	import org.springframework.beans.factory.annotation.Autowired;
    	import org.springframework.web.bind.annotation.GetMapping;
    	import org.springframework.web.bind.annotation.RequestMapping;
    	import org.springframework.web.bind.annotation.RequestParam;
    	import org.springframework.web.bind.annotation.RestController;
    	
    	@RestController
    	@RequestMapping(value = "/header/")
    	public class HeaderController {
    	
    		@Autowired
    		private AmqpTemplate amqpTemplate;
    	
    		@GetMapping(value = "/sendmsg")
    		public String producer(@RequestParam("exchange") String exchange, @RequestParam("students") String students,
    				@RequestParam("msg") String msg) {
    	
    			MessageProperties msgProperties = new MessageProperties();
    			msgProperties.setHeader("students", students);
    			MessageConverter messageConverter = new SimpleMessageConverter();
    			Message message = messageConverter.toMessage(msg, msgProperties);
    			amqpTemplate.send(exchange, "", message);
    	
    			return "Message Successfully sent---- Method used Header Exchange";
    		}
    	}
    
If we now run the application we get the output as follows
Output
Run the following url-
http://localhost:8080/topic/sendmsg?exchange=topic-exchange&routingKey=queue.collegestudents&msg=hello%20college%20students
If we run our program as Spring boot application, we get the output as follows-
Spring BootRabbitMQ header Exchange output
Now run the url http://localhost:15672
We can see message sent to the queues satisfying the header attributes.
Spring BootRabbitMQ console

Downloads-

Spring Boot + RabbitMQ Exchange Type Example