Spring Integration Polling File Avoid Reading While File Again
Batch processing of data is an efficient mode of processing large volumes of data where data is nerveless, processed and then batch results are produced. Batch processing can be applied in many utilise cases. Ane mutual use case of batch processing is transforming a large set of apartment, CSV or JSON files into a structured format that is set up for further processing.
In this article, I am going to demonstrate batch processing using 1 of the projects of Spring which is Leap Batch. Leap Batch provides functions for processing large volumes of data in batch jobs. This includes logging, transaction direction, job restart (if a job is not completed), task skip, job processing statistics, and resources management.
Let us wait at how Spring Batch works in a nutshell.
A pace is an object that encapsulates a sequential phase of a job and holds all the necessary data to define and control processing. It delegates all the information to a Chore to bear out its task.
Spring Batch uses chunk oriented mode of processing which is reading data one at a time, and creating chunks that will be written out within a transaction. The item is read by ItemReader and passed onto ItemProcessor,and so information technology is written out by ItemWriter one time the particular is ready. The Job Repository volition exist used to store the footstep execution periodically during the item processing.
Let's get into coding.
Setting Up the Project
Create a sample Spring Boot application. Here is my sample project structure.
In this article, I will be using sample information which represents voltage drop for a discharging Capacitor. Nosotros will read this data from a CSV file and write it out to an in-retention database which is H2.
Add the required dependencies to pom.xml.
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-kicking-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kick</groupId> <artifactId>leap-boot-starter-aop</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>bound-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>leap-boot-starter-information-jpa</artifactId> </dependency> <dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> The CSV file Volts.csv contains two fields voltand time.Let us create a JPA entity called Voltage. Note that this entity is just for the example. It is not production-ready code.
bundle com.techshard.batch.dao.entity; import javax.persistence.*; import javax.validation.constraints.NotNull; import java.math.BigDecimal; @Entity public class Voltage { @Id @Column (name = "ID", nullable = faux) @GeneratedValue (strategy = GenerationType.IDENTITY) private long id; @NotNull @Cavalcade (name = "volt", precision = 10, scale = four, nullable = false) private BigDecimal volt; @NotNull @Column (name = "time", nullable = simulated) private double time; public Voltage() { } public Voltage(concluding BigDecimal volt, final double time) { this.volt = volt; this.time = time; } public long getId(){ render id; } public BigDecimal getVolt(){ return volt; } public void setVolt(last BigDecimal volt){ this.volt = volt; } public double getTime(){ render time; } public void setTime(final double time){ this.time = time; } } Batch Configuration
Permit's create a batch configuration form:
@Configuration @EnableBatchProcessing public class BatchConfiguration { } @EnableBatchProcessingenables Spring Batch features and provides a base of operations configuration for setting up batch jobs in an @Configuration form.
We need to include two components in the above class.
@Autowired public JobBuilderFactory jobBuilderFactory; @Autowired public StepBuilderFactory stepBuilderFactory; JobBuilderFactorycreates a job builder. Using StepBuilderFactory, Spring Batch will create a step builder and will initialize its job repository and transaction director.
Configuring ItemReader
Nosotros will now ascertain ItemReader interface for our model Voltagewhich volition be used for reading information from CSV file.
@Bean public FlatFileItemReader<Voltage> reader() { render new FlatFileItemReaderBuilder<Voltage>() .proper name("voltItemReader") .resource(new ClassPathResource("Volts.csv")) .delimited() .names(new String[]{"volt", "time"}) .lineMapper(lineMapper()) .fieldSetMapper(new BeanWrapperFieldSetMapper<Voltage>() {{ setTargetType(Voltage.form); }}) .build(); } Here, we are creating FlatFileItemReaderBuilder of model Voltage.
-
proper name- Name of the ItemReader
-
resource- Specify path for the resource file to be read.
-
delimited -Builds delimited tokenizer.
-
names -Pass the fields that are to be read
-
lineMapper - Interface to map lines from file to domain object.
-
fieldSetMapper -Interface to map data obtained from a fieldset to an object.
Note that, we take passed custom lineMapper()to a higher place. Let us ascertain that edible bean.
@Bean public LineMapper<Voltage> lineMapper() { last DefaultLineMapper<Voltage> defaultLineMapper = new DefaultLineMapper<>(); final DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer(); lineTokenizer.setDelimiter(";"); lineTokenizer.setStrict(false); lineTokenizer.setNames(new String[] {"volt","fourth dimension"}); final VoltageFieldSetMapper fieldSetMapper = new VoltageFieldSetMapper(); defaultLineMapper.setLineTokenizer(lineTokenizer); defaultLineMapper.setFieldSetMapper(fieldSetMapper); return defaultLineMapper; } In the custom lineMapper, we can specify the delimiter to be read from CSV file and also used for reading cord values into database-specific datatypes. The VoltageFieldSetMapperis defined as follows:
package com.techshard.batch.configuration; import com.techshard.batch.dao.entity.Voltage; import org.springframework.batch.item.file.mapping.FieldSetMapper; import org.springframework.batch.particular.file.transform.FieldSet; import org.springframework.stereotype.Component; @Component public grade VoltageFieldSetMapper implements FieldSetMapper<Voltage> { @Override public Voltage mapFieldSet(FieldSet fieldSet) { final Voltage voltage = new Voltage(); voltage.setVolt(fieldSet.readBigDecimal("volt")); voltage.setTime(fieldSet.readDouble("time")); return voltage; } } Configuring ItemProcessor
We will define the processor in Batch configuration as follows:
@Bean public VoltageProcessor processor() { return new VoltageProcessor(); } We have defined a custom processor VoltageProcessor.Once the data is read, this processor is used for processing the information such as data conversion, applying concern logic and and then on. This is just an case. This custom processor may not always exist required. It can be divers depending on your awarding requirements.
package com.techshard.batch.configuration; import com.techshard.batch.dao.entity.Voltage; import org.springframework.batch.item.ItemProcessor; import java.math.BigDecimal; public course VoltageProcessor implements ItemProcessor<Voltage, Voltage>{ @Override public Voltage process(final Voltage voltage) { final BigDecimal volt = voltage.getVolt(); terminal double time = voltage.getTime(); final Voltage processedVoltage = new Voltage(); processedVoltage.setVolt(volt); processedVoltage.setTime(time); return processedVoltage; } } ItemWriter
Once the data is processed, the data needs to be stored in a database as per our requirement. We will define a JdbcBatchWriter to insert data into a database table. In that location is also JPA specific JpaItemWriterwhich can be used with EntityManager.
@Bean public JdbcBatchItemWriter<Voltage> writer(last DataSource dataSource) { return new JdbcBatchItemWriterBuilder<Voltage>() .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()) .sql("INSERT INTO voltage (volt, time) VALUES (:volt, :fourth dimension)") .dataSource(dataSource) .build(); } Job and Step Configuration
We will now define a Stridewhich will contain a reader, processor, and writer in the aforementioned way we need a StepBuilderFactory, which will exist used to inject in our Job() method.
@Edible bean public Pace step1(JdbcBatchItemWriter<Voltage> author) { return stepBuilderFactory.get("step1") .<Voltage, Voltage> clamper(10) .reader(reader()) .processor(processor()) .writer(author) .build(); } Here, step1 is just a name of the Step which we can ascertain. We can as well specify clamper size in Stride configuration.
Finally, a Job is divers as follows:
@Bean public Job importVoltageJob(NotificationListener listener, Step step1) { return jobBuilderFactory.get("importVoltageJob") .incrementer(new RunIdIncrementer()) .listener(listener) .menstruation(step1) .end() .build(); } Note that we take passed NotificationListenerthat extends Spring Batch's JobExecutionListenerSupport. It can log results before or later on task execution. Here, we have only defined afterJob(). JobExecutionListenerSupportalso provides beforeJob() to log any data before the task execution.
parcel com.techshard.batch.configuration; import com.techshard.batch.dao.entity.Voltage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.listener.JobExecutionListenerSupport; import org.springframework.beans.manufacturing plant.notation.Autowired; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Component; @Component public course NotificationListener extends JobExecutionListenerSupport{ private static final Logger LOGGER = LoggerFactory.getLogger(NotificationListener.class); private concluding JdbcTemplate jdbcTemplate; @Autowired public NotificationListener(concluding JdbcTemplate jdbcTemplate) { this.jdbcTemplate = jdbcTemplate; } @Override public void afterJob(last JobExecution jobExecution) { if(jobExecution.getStatus() == BatchStatus.COMPLETED) { LOGGER.info("!!! Chore FINISHED! Time to verify the results"); jdbcTemplate.query("SELECT volt, time FROM voltage", (rs, row) -> new Voltage( rs.getBigDecimal(1), rs.getDouble(2)) ).forEach(voltage -> LOGGER.info("Found <" + voltage + "> in the database.")); } } } Before nosotros run the application, we will enable H2 (in-memory) console in application.backdrop.
spring.datasource.url=jdbc:h2:mem:batchdb spring.datasource.driverClassName=org.h2.Driver spring.datasource.username=sa jump.datasource.password=password leap.jpa.database-platform=org.hibernate.dialect.H2Dialect spring.h2.console.enabled=true Additionally, I have also configured Attribute using Spring AOP to measure the fourth dimension taken by batch execution.
parcel com.techshard.batch; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.note.Around; import org.aspectj.lang.notation.Aspect; import org.aspectj.lang.reflect.MethodSignature; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @Aspect @Component public class TracePerformanceAspect { private final Logger logger = LoggerFactory.getLogger(TracePerformanceAspect.grade); @Around ("execution(* com.techshard..*.*(..)))") public Object logTracePerformanceAspect(ProceedingJoinPoint joinPoint) throws Throwable { MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature(); //Go intercepted method details Cord className = methodSignature.getDeclaringType().getSimpleName(); String methodName = methodSignature.getName(); long get-go = System.currentTimeMillis(); Object event = joinPoint.proceed(); long end = System.currentTimeMillis(); //Log method execution fourth dimension logger.info("Execution fourth dimension of " + className + "." + methodName + " :: " + (cease - start) + " ms"); return result; } } Running the Application
Run the Spring Boot application. Once the awarding is started, login to H2 console using link http://localhost:8080/h2-console/ . And so, you volition get a login screen as below.
In one case we login, nosotros volition be able to come across the table Voltageand all the tables created by Bound Batch. In these tables, we will detect all the details well-nigh job execution such as chore name, condition, id then on.
Determination
This commodity simply scratched the surface of Jump Batch in general. The example used in this commodity is not product-gear up code. Y'all tin can ascertain job configuration depending on your projection requirements. I hope yous enjoyed this article. Allow me know if yous have whatever comments or suggestions.
The complete code tin can be found on my GitHub repository.
Jump Framework Spring Batch Data (calculating) Batch processing Spring Boot Processing Database file IO career
Source: https://dzone.com/articles/batch-processing-large-data-sets-with-spring-boot
0 Response to "Spring Integration Polling File Avoid Reading While File Again"
Postar um comentário