Spring Boot + Batch Partitioning (2021) Hello World Example | CodeUsingJava






















Spring Boot + Batch Partitioning (2021) Hello World Example

In this tutorial we will be implementing Spring Boot Batch Partitoning with the help of example.
Spring Batch is one of the open-source framework available for batch processing. Spring batch programs generally uses a single thread for the execution. But there may be some case can occur where a specific thread takes much of the time in the executionby the use of single thread. There can be a better approach in such case. This can be done by the use of partition a step execution, where a small segment of work can be handled by every job and all jobs can run at a same time.
In this example, with the help of spring batch partitioning, we will be sending data from multiple csv files to the database at the same time. This would lead to the reduction in elapsed time and leading to the effeciency.
  • Table Of Contents :


    This will be the standard directory layout for maven project structure-
    Spring Boot Maven Project
    We need to start by creating a Maven pom.xml(Project Object Model) file. The pom.xml file contains the project configuration details.
    	<?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    
    	<groupId>com.codeusingjava.partitioning</groupId>
    	<artifactId>SpringBatchPartitioning</artifactId>
    	<version>0.0.1</version>
    	<packaging>jar</packaging>
    
    	<name>SpringBatchPartitioning</name>
    	<description>SpringBatchPartitioning</description>
    
    	<parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>1.5.1.RELEASE</version>
    		<relativePath/> <!-- lookup parent from repository -->
    	</parent>
    
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    		<java.version>1.8</java.version>
    	</properties>
    
    	<dependencies>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-batch</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-jdbc</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-web</artifactId>
    		</dependency>
    
    		<dependency>
    			<groupId>mysql</groupId>
    			<artifactId>mysql-connector-java</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-test</artifactId>
    			<scope>test</scope>
    		</dependency>
    	</dependencies>
    
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-maven-plugin</artifactId>
    			</plugin>
    		</plugins>
    	</build>
    
    
    </project>
    
    
    Creating the Database configuration in application.properties as follows-
    spring.datasource.url=jdbc:mysql://localhost:3306/testdb3?useSSL=false
    spring.datasource.password=****
    spring.datasource.username=root
    spring.datasource.driver-class-name=com.mysql.jdbc.Driver
    spring.batch.job.enabled=false
    
    Inside schema.sql, the query to create a table in database is present.Paste this content in the mysql workbench.
    	DROP TABLE IF EXISTS testdb3.record;
    	CREATE TABLE testdb3.record(
    	   id INT NOT NULL AUTO_INCREMENT,
    	   name VARCHAR(20) NOT NULL,
    	   description VARCHAR(20) NOT NULL,
    	   PRIMARY KEY (id)
    	);
    
    Create a csv file named demofile1.csv and write some dummy data in it.
    Spring Boot Maven Project
    Similarly make 4 other files demofile2.csv, demofile3.csv, demofile4.csv, demofile5.csv
    Creating the xml class batchjob.xml. It is the class where several beans are configured.
    The execution of Master step is divided into the slave step.
    Slaves can be defined as a threads that are locally executing.
    The partitioner has the task of dividing a execution in step.
    	<beans:beans xmlns="http://www.springframework.org/schema/batch"
    	xmlns:beans="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="
               http://www.springframework.org/schema/beans
               http://www.springframework.org/schema/beans/spring-beans.xsd
               http://www.springframework.org/schema/batch
               http://www.springframework.org/schema/batch/spring-batch-3.0.xsd">
    
    	<!-- partitioner job -->
    	<job id="partitionJob" xmlns="http://www.springframework.org/schema/batch">
    
    	
    		<step id="masterStep">
    			<partition step="slave" partitioner="partitioner">
    				<handler grid-size="5" task-executor="taskExecutor" />
    			</partition>
    		</step>
    
    	</job>
    
    	<!-- each thread will run this job, with different stepExecutionContext 
    		values. -->
    	<step id="slave" xmlns="http://www.springframework.org/schema/batch">
    		<tasklet>
    			<chunk reader="itemReader" processor="itemProcessor" writer="itemWriter"
    				commit-interval="1" />
    		</tasklet>
    	</step>
    
    	<beans:bean id="partitioner"
    		class="com.codeusingjava.partitioning.partitioner.SamplePartitioner" />
    
    	<beans:bean id="taskExecutor"
    		class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
    
    	<beans:bean id="itemReader"
    		class="com.codeusingjava.partitioning.step.Reader"
    		factory-method="reader" scope="step">
    		<beans:constructor-arg value="#{stepExecutionContext[filename]}" />
    	</beans:bean>
    
    	<beans:bean id="itemProcessor"
    		class="com.codeusingjava.partitioning.step.Processor" scope="step">
    		<beans:property name="threadName" value="#{stepExecutionContext[name]}" />
    	</beans:bean>
    
    	<beans:bean id="itemWriter"
    		class="com.codeusingjava.partitioning.step.Writer" scope="step" />
    
    	<beans:bean class="org.springframework.batch.core.scope.StepScope" />
    </beans:beans>
    
    Creating the Record model class as follows-
    	package com.codeusingjava.partitioning.model;
    
    	public class Record {
    		private int id;
    		private String name;
    		private String description;
    		
    		
    		public Record() {
    			super();
    			// TODO Auto-generated constructor stub
    		}
    		
    		
    		public Record(int id, String name, String description) {
    			super();
    			this.id = id;
    			this.name = name;
    			this.description = description;
    		}
    		public int getId() {
    			return id;
    		}
    		public void setId(int id) {
    			this.id = id;
    		}
    		public String getName() {
    			return name;
    		}
    		public void setName(String name) {
    			this.name = name;
    		}
    		public String getDescription() {
    			return description;
    		}
    		public void setDescription(String description) {
    			this.description = description;
    		}
    	
    	
    		@Override
    		public String toString() {
    			return "Record [id=" + id + ", name=" + name + ", description=" + description + "]";
    		}
    	
    		
    	}
    	
    	
    
    Creating the dao class named RecordDao.java
    	package com.codeusingjava.partitioning.dao;
    
    import java.sql.PreparedStatement;
    import java.sql.SQLException;
    import java.util.List;
    
    import javax.annotation.PostConstruct;
    import javax.sql.DataSource;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jdbc.core.BatchPreparedStatementSetter;
    import org.springframework.jdbc.core.support.JdbcDaoSupport;
    import org.springframework.stereotype.Repository;
    
    import com.codeusingjava.partitioning.dao.RecordDao;
    import com.codeusingjava.partitioning.model.Record;
    
    
    @Repository
    public class RecordDao extends JdbcDaoSupport{
    
    	@Autowired
    	DataSource dataSource;
    
    	@PostConstruct
    	private void initialize() {
    		setDataSource(dataSource);
    	}
    
    	public void insert(List<? extends Record> Records) {
    		String sql = "INSERT INTO record " + "(id, name, description) VALUES (?, ?, ?)";
    		getJdbcTemplate().batchUpdate(sql, new BatchPreparedStatementSetter() {
    			public void setValues(PreparedStatement ps, int i) throws SQLException {
    				Record record = Records.get(i);
    				ps.setLong(1, record.getId());
    				ps.setString(2, record.getName());
    				ps.setString(3, record.getDescription());
    			}
    
    			public int getBatchSize() {
    				return Records.size();
    			}
    		});
    
    	}
    }
    
    
    Creating the class for Partitioner. Here, gridsize can be indicated as number of threads to be executed.
    	package com.codeusingjava.partitioning.partitioner;
    
    	import java.util.Arrays;
    	import java.util.HashMap;
    	import java.util.List;
    	import java.util.Map;
    	
    	import org.slf4j.Logger;
    	import org.slf4j.LoggerFactory;
    	import org.springframework.batch.core.partition.support.Partitioner;
    	import org.springframework.batch.item.ExecutionContext;
    	
    	public class SamplePartitioner implements Partitioner{
    	
    		Logger logger = LoggerFactory.getLogger(this.getClass());
    		
    		List<String> fileNameList = Arrays.asList("demofile1.csv", "demofile2.csv", 
    					  "demofile3.csv", "demofile4.csv", "demofile5.csv");
    		
    		
    		
    		@Override
    		public Map<String, ExecutionContext> partition(int gridSize) {
    			
    			Map<String, ExecutionContext> partitionData = new HashMap<String, ExecutionContext>();
    			
    			//
    			// In the tutorial: gridSize = 5 for 5 Threads to read 5 files in fileNameList:
    			
    			for (int i = 0; i < gridSize; i++) {
    				ExecutionContext executionContext = new ExecutionContext();
    				logger.info("Starting : Thread" + i);
    				logger.info("File Name: " + fileNameList.get(i));
    				
    				// give fileName for ExecutionContext
    				executionContext.putString("filename", fileNameList.get(i));
    				// give a thread name for ExecutionContext
    				executionContext.putString("name", "Thread" + i);
    				
    				partitionData.put("partition: " + i, executionContext);
    			}
    			
    			return partitionData;
    		}
    	
    	}
    	
    
    The Reader class is configured as follows:- FlatFileItemReader is used for reading input.
    	package com.codeusingjava.partitioning.step;
    
    	import org.springframework.batch.item.file.FlatFileItemReader;
    	import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
    	import org.springframework.batch.item.file.mapping.DefaultLineMapper;
    	import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
    	import org.springframework.core.io.ClassPathResource;
    	
    	import com.codeusingjava.partitioning.model.Record;
    	
    	
    	public class Reader{
    		
    		public static FlatFileItemReader<Record> reader(String path) {
    			
    			FlatFileItemReader<Record> reader = new FlatFileItemReader<Record>();
    	
    			reader.setResource(new ClassPathResource(path));
    			reader.setLineMapper(new DefaultLineMapper<Record>() {
    				{
    					setLineTokenizer(new DelimitedLineTokenizer() {
    						{
    							setNames(new String[] { "id", "name", "description" });
    						}
    					});
    					setFieldSetMapper(new BeanWrapperFieldSetMapper<Record>() {
    						{
    							setTargetType(Record.class);
    						}
    					});
    				}
    			});
    			return reader;
    		}
    	}
    	
    
    Creating the processor class
    The processor class reads each and every data element, then processes the data and finally writes the processed record in the writer.
    	package com.codeusingjava.partitioning.step;
    
    	import java.util.Random;
    	
    	import org.slf4j.Logger;
    	import org.slf4j.LoggerFactory;
    	import org.springframework.batch.item.ItemProcessor;
    	
    	import com.codeusingjava.partitioning.model.Record;
    	
    	
    	public class Processor implements ItemProcessor<Record, Record> {
    	
    		private static final Logger log = LoggerFactory.getLogger(Processor.class);
    	
    		private String threadName;
    		
    		@Override
    		public Record process(Record record) throws Exception {
    			Random r = new Random();
    			
    			final String name = record.getName().toUpperCase();
    			final String description = record.getDescription().toUpperCase();
    			final Record fixedRecord = new Record(r.nextInt(), name, description);
    	
    			log.info(threadName + " processing : "+ "Converting (" + record + ") into (" + fixedRecord + ")");
    			
    			return fixedRecord;
    		}
    	
    		public String getThreadName() {
    			return threadName;
    		}
    	
    		public void setThreadName(String threadName) {
    			this.threadName = threadName;
    		}
    	}
    	
    
    The writer class is configured as follows:- This class is responsible for serialising objects and insertion of all the internal buffers.
    package com.codeusingjava.partitioning.step;
    
    import java.util.List;
    
    import org.springframework.batch.item.ItemWriter;
    
    import com.codeusingjava.partitioning.dao.RecordDao;
    import com.codeusingjava.partitioning.model.Record;
    
    
    public class Writer implements ItemWriter<Record> {
    
    	private final RecordDao recordDao;
    
    	public Writer(RecordDao recordDao) {
    		this.recordDao = recordDao;
    	}
    
    	@Override
    	public void write(List<? extends Record> customers) throws Exception {
    		recordDao.insert(customers);
    	}
    
    }
    
    The controller class can be defined as follows:-
    	package com.codeusingjava.partitioning.controller;
    
    	import org.slf4j.Logger;
    	import org.slf4j.LoggerFactory;
    	import org.springframework.batch.core.Job;
    	import org.springframework.batch.core.JobParameters;
    	import org.springframework.batch.core.JobParametersBuilder;
    	import org.springframework.batch.core.launch.JobLauncher;
    	import org.springframework.beans.factory.annotation.Autowired;
    	import org.springframework.web.bind.annotation.RequestMapping;
    	import org.springframework.web.bind.annotation.RestController;
    	
    	@RestController
    	public class WebController {
    		@Autowired
    		JobLauncher jobLauncher;
    	
    		@Autowired
    		Job job;
    	
    		@RequestMapping("/")
    		public String handle() throws Exception {
    			Logger logger = LoggerFactory.getLogger(this.getClass());
    			try {
    				JobParameters jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis())
    						.toJobParameters();
    				jobLauncher.run(job, jobParameters);
    			} catch (Exception e) {
    				logger.info(e.getMessage());
    			}
    			return "Done! Check Console for more details";
    		}
    	}
    
    Run the following Spring Boot class for starting our application.
    	package com.codeusingjava.partitioning;
    
    	import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
    	import org.springframework.boot.SpringApplication;
    	import org.springframework.boot.autoconfigure.SpringBootApplication;
    	import org.springframework.context.annotation.ImportResource;
    	
    	@EnableBatchProcessing
    	@ImportResource("classpath:batchjob.xml")
    	@SpringBootApplication
    	public class SpringBatchPartitioningApplication {
    	
    		public static void main(String[] args) {
    			SpringApplication.run(SpringBatchPartitioningApplication.class, args);
    		}
    	}
    
If we now run the application we get the output as follows:-
Spring Boot Maven Project

Spring Boot Maven Project

Spring Boot Maven Project
the csv records inside a database are as follows:-
Spring Boot Maven Project

Downloads-

Spring Boot + Spring Batch Partitioning Example