Spring Boot + Spring Batch Classifier (2021) Hello World Example | CodeUsingJava






















Spring Boot + Spring Batch Classifier (2021) Hello World Example

In this tutorial we will be implementing Spring boot Batch Classifier with the help of an example.
Spring Batch is one of the open-source framework available for batch processing.

Spring Batch Classifier is used when there is a need to classify data to multiple files or database based on some conditions. Multiple files can be of different formats.
In this example, a record of players is present in the csv file where different records like name, game and country of a player is stored.
The information of players will be linked based on the country they belong.
The working of classifier can be explained by the following diagram:-
Spring Boot Maven Project

Table Of Contents :


  • This will be the standard directory layout for maven project structure-
    Spring Boot Maven Project
    We need to start by creating a Maven pom.xml(Project Object Model) file. The pom.xml file contains the project configuration details.
    	<?xml version="1.0" encoding="UTF-8"?>
    	<project xmlns="http://maven.apache.org/POM/4.0.0"
    	  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	  <modelVersion>4.0.0</modelVersion>
    	
    	  <groupId>com.codeusingjava</groupId>
    	  <artifactId>spring-batch-hello-world</artifactId>
    	  <version>0.0.1-SNAPSHOT</version>
    	  <packaging>jar</packaging>
    	
    	  <name>spring-batch-hello-world</name>
    	  <description>Spring Batch Hello World Example</description>
    	  
    	
    	  <parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>2.2.2.RELEASE</version>
    		<relativePath /> <!-- lookup parent from repository -->
    	  </parent>
    	
    	  <properties>
    		<java.version>8</java.version>
    	  </properties>
    	
    	  <dependencies>
    		<dependency>
    		  <groupId>org.springframework.boot</groupId>
    		  <artifactId>spring-boot-starter-batch</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework</groupId>
    			<artifactId>spring-oxm</artifactId>
    		</dependency>
    		<!-- it is used to serialize/seserialize an object -->    
    		<dependency>
    			<groupId>com.thoughtworks.xstream</groupId>
    			<artifactId>xstream</artifactId>
    			<version>1.4.8</version>
    		</dependency>    
    		
    		<dependency>
    			<groupId>net.java.dev.stax-utils</groupId>
    			<artifactId>stax-utils</artifactId>
    			<version>20040917</version>
    		</dependency>    
    		<dependency>
    		  <groupId>org.springframework.boot</groupId>
    		  <artifactId>spring-boot-starter-test</artifactId>
    		  <scope>test</scope>
    		</dependency>
    		<dependency>
    		  <groupId>org.springframework.batch</groupId>
    		  <artifactId>spring-batch-test</artifactId>
    		  <scope>test</scope>
    		</dependency>    
    	  </dependencies>
    	
    	  <build>
    		<plugins>
    		  <plugin>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-maven-plugin</artifactId>
    		  </plugin>
    		</plugins>
    	  </build>
    	
    	</project>
    	
    
    Under src/main/resources, create a new folder csv and then create a csv file named players.csv.
    Spring Boot Maven Project
    Creating the model class for smooth converting spring bean into to csvfile.
    package com.codeusingjava.model;
    
    public class Player {
      private String name;
      private String country;
      private String games;
    
      public Player() {    
      }
    
    public Player(String name, String country, String games) {
    	super();
    	this.name = name;
    	this.country = country;
    	this.games = games;
    }
    
    
    public String getName() {
    	return name;
    }
    
    
    public void setName(String name) {
    	this.name = name;
    }
    
    
    public String getCountry() {
    	return country;
    }
    
    
    public void setCountry(String country) {
    	this.country = country;
    }
    
    public String getGames() {
    	return games;
    }
    public void setGames(String games) {
    	this.games = games;
    }
    
    @Override
    public String toString() {
    	return "Player [name=" + name + ", country=" + country + ", games=" + games + "]";
    }
    
     }
    
    	
    
    Linking records in multiple files based on the condition with the help of Classifier.
    Inside the classify method, the condition is defined on the basis of which records are to be classified.
    	package com.codeusingjava.batch;
    
    	import org.springframework.batch.item.ItemWriter;
    	import org.springframework.classify.Classifier;
    	
    	import com.codeusingjava.model.Player;
    	
    	public class MyClassifier implements Classifier<Player, ItemWriter<? super Player>> {
    	
    		private static final long serialVersionUID = 1L;
    	
    		private ItemWriter<Player> selectedPlayer;
    		private ItemWriter<Player> otherPlayer;
    	
    		public MyClassifier(ItemWriter<Player> selectedPlayer, ItemWriter<Player> otherPlayer) {
    			this.selectedPlayer = selectedPlayer;
    			this.otherPlayer = otherPlayer;
    		}
    	
    		@Override
    		public ItemWriter<? super Player> classify(Player player) {
    			// routing players of Chess to XML writer
    			return player.getCountry().contains("India") ?  selectedPlayer : otherPlayer;
    		}
    	
    	}
    	
    
    
    Creating the Configuration class. This class contains several beans defined in it.
    FlatFileItemReader is a reader class which reads the input.
    StaxEventItemWriter class is one of the implementation of ItemWriter class which makes use of Stax and Marsheller for serialisation of objects.
    ClassifierCompositeItemWriter always makes a call to the implementation of ItemWriter based on the routing.
    I have used files of xml and json so their writers beans are defined in the below class.
    	package com.codeusingjava.batch;
    
    	import java.io.File;
    	import java.util.HashMap;
    	import java.util.Map;
    	
    	import org.springframework.batch.core.Job;
    	import org.springframework.batch.core.Step;
    	import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
    	import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    	import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    	import org.springframework.batch.item.file.FlatFileItemReader;
    	import org.springframework.batch.item.file.FlatFileItemWriter;
    	import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
    	import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
    	import org.springframework.batch.item.file.transform.PassThroughLineAggregator;
    	import org.springframework.batch.item.support.ClassifierCompositeItemWriter;
    	import org.springframework.batch.item.xml.StaxEventItemWriter;
    	import org.springframework.context.annotation.Bean;
    	import org.springframework.context.annotation.Configuration;
    	import org.springframework.core.io.ClassPathResource;
    	import org.springframework.core.io.FileSystemResource;
    	import org.springframework.oxm.xstream.XStreamMarshaller;
    	
    	import com.codeusingjava.model.Player;
    	import com.codeusingjava.processor.PlayerItemProcessor;
    	
    	
    	@Configuration
    	@EnableBatchProcessing
    	public class HelloWorldJobConfig {
    	
    		@Bean
    		public Job helloWorlJob(JobBuilderFactory jobBuilders, StepBuilderFactory stepBuilders) throws Exception {
    			return jobBuilders.get("myJob").start(helloWorldStep(stepBuilders)).build();
    		}
    	
    		@Bean
    		public Step helloWorldStep(StepBuilderFactory stepBuilders) throws Exception {
    			return stepBuilders.get("myStep").<Player, Player>chunk(10).reader(reader())
    					// .processor(processor())
    					.writer(classifierCustomerCompositeItemWriter()).stream(xmlItemWriter()).stream(jsonItemWriter())
    					.build();
    		}
    	
    		@Bean
    		public FlatFileItemReader<Player> reader() {
    			return new FlatFileItemReaderBuilder<Player>().name("playerItemReader")
    					.resource(new ClassPathResource("csv/players.csv")).delimited()
    					.names(new String[] { "name", "games", "country" }).targetType(Player.class).build();
    		}
    	
    		@Bean
    		public ClassifierCompositeItemWriter<Player> classifierCustomerCompositeItemWriter() throws Exception {
    			ClassifierCompositeItemWriter<Player> compositeItemWriter = new ClassifierCompositeItemWriter<>();
    			// adapting your classifier here
    			compositeItemWriter.setClassifier(new MyClassifier(xmlItemWriter(), jsonItemWriter()));
    			return compositeItemWriter;
    		}
    	
    		@Bean
    		public PlayerItemProcessor processor() {
    			return new PlayerItemProcessor();
    		}
    	
    		@Bean
    		public FlatFileItemWriter<String> writer() {
    			return new FlatFileItemWriterBuilder<String>().name("greetingItemWriter")
    					.resource(new FileSystemResource("target/output.txt")).lineAggregator(new PassThroughLineAggregator<>())
    					.build();
    		}
    	
    		// XML writer
    		@Bean
    		public StaxEventItemWriter<Player> xmlItemWriter() throws Exception {
    	
    			String playerOutputPath = File.createTempFile("playerOutput", ".xml").getAbsolutePath();
    			System.out.println(">> XML Output = " + playerOutputPath);
    			Map<String, Class> aliases = new HashMap<>();
    			aliases.put("player", Player.class);
    			XStreamMarshaller marshaller = new XStreamMarshaller();
    			marshaller.setAliases(aliases);		
    	
    			// Serializing object to XML.
    			IndentingStaxEventItemWriter<Player> writer = new IndentingStaxEventItemWriter<>();
    			writer.setRootTagName("players");
    			writer.setMarshaller(marshaller);
    			writer.setResource(new FileSystemResource(playerOutputPath));
    			writer.afterPropertiesSet();
    			return writer;
    		}
    	
    		// JSON writer
    		@Bean
    		public FlatFileItemWriter<Player> jsonItemWriter() throws Exception {
    	
    			String playerOutputPath = File.createTempFile("playerOutput", ".json").getAbsolutePath();
    			System.out.println(">> JSON Output = " + playerOutputPath);
    			FlatFileItemWriter<Player> writer = new FlatFileItemWriter<>();
    			writer.setLineAggregator(new CustomLineAggregator());
    			writer.setResource(new FileSystemResource(playerOutputPath));
    			writer.afterPropertiesSet();
    			return writer;
    		}
    	}
    	
    
    Here is the implementation of StaxEventItemWriter class defined:-
    
    package com.codeusingjava.batch;
    
    import java.io.Writer;
    
    import javax.xml.stream.XMLEventWriter;
    import javax.xml.stream.XMLOutputFactory;
    import javax.xml.stream.XMLStreamException;
    
    import org.springframework.batch.item.xml.StaxEventItemWriter;
    
    import javanet.staxutils.IndentingXMLEventWriter;
    
    public class IndentingStaxEventItemWriter<T> extends StaxEventItemWriter<T> {
    
    	
    	private boolean indenting = true;
    
    	@Override
    	protected XMLEventWriter createXmlEventWriter(XMLOutputFactory outputFactory, Writer writer)
    			throws XMLStreamException {
    		if (isIndenting()) {
    			return new IndentingXMLEventWriter(super.createXmlEventWriter(outputFactory, writer));
    		} else {
    			return super.createXmlEventWriter(outputFactory, writer);
    		}
    	}
    
    	public boolean isIndenting() {
    		return indenting;
    	}
    	
    	public void setIndenting(boolean indenting) {
    		this.indenting = indenting;
    	}
    }
    
    CustomLineAggregator class is used for the conversion of Objects to Strings.
    	package com.codeusingjava.batch;
    
    	import org.springframework.batch.item.file.transform.LineAggregator;
    	
    	import com.codeusingjava.model.Player;
    	import com.fasterxml.jackson.databind.ObjectMapper;
    	
    	public class CustomLineAggregator implements LineAggregator<Player> {
    	
    		private ObjectMapper objectMapper = new ObjectMapper();
    	
    		@Override
    		public String aggregate(Player item) {
    			try {
    				return objectMapper.writeValueAsString(item);
    			} catch (Exception e) {
    				throw new RuntimeException("Unable to serialize Player", e);
    			}
    		}
    	}
    	
    
    The processor class which is used for the processing of data is defined as follows:-
    	package com.codeusingjava.processor;
    
    	import org.slf4j.Logger;
    	import org.slf4j.LoggerFactory;
    	import org.springframework.batch.item.ItemProcessor;
    	
    	import com.codeusingjava.model.Player;
    	
    	public class PlayerItemProcessor
    		implements ItemProcessor<Player, String> {
    	
    	  private static final Logger LOGGER =
    		  LoggerFactory.getLogger(PlayerItemProcessor.class);
    	
    	  @Override
    	  public String process(Player player) throws Exception {
    		String greeting = "Hello " + player.getName() + " "
    			+ player.getGames() + " from " + player.getCountry()+"!";
    	
    		LOGGER.info("converting '{}' into '{}'", player, greeting);
    		return greeting;
    	  }
    	}
    	
    
If we now run the application, we get the output as follows:-
Spring Boot Maven Project
To see the xml and the JSON files where the data is classified based on condition , go to the path displayed in the output.
Output on XML file denotes players of India.
Spring Boot Maven Project
Output on JSON file denotes players of other countries.
Spring Boot Maven Project

Downloads-

Spring Boot + Spring Batch Classifier Example