Hello everyone,
Greetings today!
Today we will talk about the spring batch decider, which is mainly used to achieve conditional execution of steps.
JobExecutionDecider -- JobExecutionDecider is the interface we need to implement overriding the decide() method. From the decide() method, we can return different FlowExecutionStatus which we can use in batch job configuration for conditional flow.
Let's see how we can implement a conditional flow with the example below.
Create a project with the following dependency by going to Spring Initializr. & open project in IDE.
package com.practice.model;
import lombok.Getter;
import lombok.Setter;
import javax.persistence.*;
import java.time.LocalDateTime;
@Entity
@Table(name = "sales_enquiry")
@Getter
@Setter
public class SalesEnquiry {
@Id
@Column
@GeneratedValue(strategy = GenerationType.SEQUENCE,
generator = "id_sequence")
@SequenceGenerator(name = "id_sequence",
sequenceName = "id_sequence")
private Long id;
@Column
private String customerName;
@Column
private String customerContactNo;
@Column
private Boolean productSold;
@Column
private String productName;
@Column
private LocalDateTime enquiryDate;
}
package com.practice.dao;
import com.practice.model.SalesEnquiry;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
import java.time.LocalDateTime;
import java.util.List;
@Repository
public interface SalesEnquiryRepository extends
JpaRepository<SalesEnquiry,Long> {
List<SalesEnquiry> findByEnquiryDateBetweenAndProductSold
(LocalDateTime startDate,
LocalDateTime endDate,
Boolean productSold);
}
package com.practice.dto;
import lombok.*;
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class SalesEnquiryDTO {
private Long id;
private String customerName;
private String customerContactNo;
private String productName;
}
package com.practice.processor;
import com.practice.dto.SalesEnquiryDTO;
import com.practice.model.SalesEnquiry;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;
@Component
public class SalesReportProcessor implements
ItemProcessor<SalesEnquiry, SalesEnquiryDTO> {
@Override
public SalesEnquiryDTO process(SalesEnquiry salesEnquiry)
throws Exception {
return SalesEnquiryDTO.builder().id(salesEnquiry.getId()).
customerContactNo(salesEnquiry.getCustomerContactNo()).
customerName(salesEnquiry.getCustomerName()).
productName(salesEnquiry.getProductName())
.build();
}
}
- SalesSuccessFlowDecider-It will check if any sale was successful today if yes it will return FlowExecutionStatus as SALES_SUCCESS else it will return flow execution status as COMPLETED
package com.practice.decider;
@Component
public class SalesSuccessFlowDecider
implements JobExecutionDecider {
@Autowired
private SalesEnquiryRepository salesEnquiryRepository;
@Override
public FlowExecutionStatus decide(JobExecution jobExecution,
StepExecution stepExecution) {
ZonedDateTime zonedDateTime=ZonedDateTime.now();
List<SalesEnquiry> salesSuccessEnquiryList
=salesEnquiryRepository
.findByEnquiryDateBetweenAndProductSold(
LocalDate.now().atStartOfDay(),
LocalDate.now().plusDays(1)
.atStartOfDay(),
Boolean.TRUE);
if(!salesSuccessEnquiryList.isEmpty()){
return new FlowExecutionStatus("SALES_SUCCESS");
}
return new FlowExecutionStatus("COMPLETED");
}
}
- SalesUnsuccessfulDecider-- It will check if there are any sales inquiries today for which sale does not happen if yes then it will return the status as SALES_UN_SUCCESSFUL else it will return the status as COMPLETED as shown below
package com.practice.decider;
@Component
public class SalesUnsuccessfulDecider
implements JobExecutionDecider {
@Autowired
private SalesEnquiryRepository salesEnquiryRepository;
@Override
public FlowExecutionStatus decide(JobExecution jobExecution,
StepExecution stepExecution) {
List<SalesEnquiry> salesUnsuccessfulFulEnquiryList
=salesEnquiryRepository
.findByEnquiryDateBetweenAndProductSold
(
LocalDate.now().atStartOfDay(),
LocalDate.now().plusDays(1)
.atStartOfDay(),
Boolean.FALSE
);
if(!salesUnsuccessfulFulEnquiryList.isEmpty()){
return new FlowExecutionStatus("SALES_UN_SUCCESSFUL");
}
return new FlowExecutionStatus("COMPLETED");
}
}
@Bean
public Flow salesSuccessReportFlow(){
return
new FlowBuilder<SimpleFlow>("salesSuccessReportFlow")
.start(salesSuccessFlowDecider).on("SALES_SUCCESS")
.to(salesSuccessCSVStep()).next(salesUnsuccessfulReportFlow())
.from(salesSuccessFlowDecider).on("COMPLETED")
.to(salesUnsuccessfulReportFlow()).build();
}
@Bean
public Flow salesUnsuccessfulReportFlow(){
return
new FlowBuilder<SimpleFlow>("salesUnsuccessfulReportFlow")
.start(salesUnsuccessfulDecider).on("SALES_UN_SUCCESSFUL")
.to(salesUnsuccessfulCSVStep())
.from(salesUnsuccessfulDecider).on("COMPLETED")
.end()
.build();
}
- If any sales inquiry is converted to the sale for the current day then we will get status as SALES_SUCCESS and it will go to salesSuccessCSVStep() which will basically generate a CSV file for sales done today, after generating the CSV file it will go to salesUnsuccessfulReportFlow()
- If no sale was done today then based on status COMPLETED it will again go to salesUnsuccessfulReportFlow()
- It will check if there is any sale inquiry for which a sale does not occur if yes it will get status as SALES_UN_SUCCESSFUL and it will go to salesUnsuccessfulCSVStep to generate CSV of such records else it will get status as COMPLETED which will end our flow
package com.practice.config;
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private SalesSuccessFlowDecider salesSuccessFlowDecider;
@Autowired
private SalesUnsuccessfulDecider salesUnsuccessfulDecider;
@Autowired
private DataSource dataSource;
@Autowired
private SalesReportProcessor salesReportProcessor;
@Bean(name = "salesEnquiryReportJob")
public Job salesReportJob() {
return jobBuilderFactory.get("salesEnquiryReportJob")
.incrementer(new RunIdIncrementer())
.start(salesSuccessReportFlow()).end().build();
}
@Bean
public Flow salesSuccessReportFlow(){
return new FlowBuilder<SimpleFlow>("salesSuccessReportFlow")
.start(salesSuccessFlowDecider).on("SALES_SUCCESS")
.to(salesSuccessCSVStep())
.next(salesUnsuccessfulReportFlow())
.from(salesSuccessFlowDecider).on("COMPLETED")
.to(salesUnsuccessfulReportFlow()).build();
}
@Bean
public Flow salesUnsuccessfulReportFlow(){
return new FlowBuilder<SimpleFlow>
("salesUnsuccessfulReportFlow")
.start(salesUnsuccessfulDecider)
.on("SALES_UN_SUCCESSFUL")
.to(salesUnsuccessfulCSVStep())
.from(salesUnsuccessfulDecider)
.on("COMPLETED")
.end()
.build();
}
@Bean
public Step salesSuccessCSVStep(){
return stepBuilderFactory.get("salesSuccessCSVStep")
.<SalesEnquiry, SalesEnquiryDTO>chunk(250)
.reader(salesSuccessReader())
.processor(salesReportProcessor)
.writer(salesSuccessWriter())
.build();
}
@Bean
public JdbcCursorItemReader<SalesEnquiry>
salesSuccessReader(){
JdbcCursorItemReader<SalesEnquiry> jdbcCursorItemReader
=new JdbcCursorItemReader<>();
jdbcCursorItemReader
.setSql(
"Select * from sales_enquiry where ENQUIRY_DATE
>=trunc(sysdate) and
ENQUIRY_DATE<trunc(sysdate) + 1 and PRODUCT_SOLD=1"
);
jdbcCursorItemReader.setDataSource(dataSource);
jdbcCursorItemReader.setRowMapper(
new BeanPropertyRowMapper<SalesEnquiry>
(SalesEnquiry.class));
return jdbcCursorItemReader;
}
@Bean
public FlatFileItemWriter salesSuccessWriter(){
FlatFileItemWriter salesSuccessWriter=new FlatFileItemWriter();
salesSuccessWriter.setHeaderCallback((x)->x.write
("Id,Customer Name,Mobile No,Product Name"));
salesSuccessWriter.setResource(
new FileSystemResource(
"F://SalesEnquiryReport//sales_success_report.csv"
));
salesSuccessWriter.setLineAggregator
(salesEnquiryLineAggregator());
return salesSuccessWriter;
}
@Bean
public FlatFileItemWriter salesUnSuccessWriter()
{
FlatFileItemWriter salesSuccessWriter=new FlatFileItemWriter();
salesSuccessWriter.setHeaderCallback((x)->x.write
("Id,Customer Name,Mobile No,Product Name")
);
salesSuccessWriter.setResource(
new FileSystemResource(
"F://SalesEnquiryReport//sales_unsuccessful_report.csv"
));
salesSuccessWriter.setLineAggregator
(salesEnquiryLineAggregator());
return salesSuccessWriter;
}
@Bean
public DelimitedLineAggregator salesEnquiryLineAggregator()
{
DelimitedLineAggregator delimitedLineAggregator
=new DelimitedLineAggregator();
delimitedLineAggregator.setDelimiter(",");
BeanWrapperFieldExtractor beanWrapperFieldExtractor
=new BeanWrapperFieldExtractor();
beanWrapperFieldExtractor.setNames(
new String[]
{"id","customerName","customerContactNo","productName"}
);
delimitedLineAggregator.
setFieldExtractor(beanWrapperFieldExtractor);
return delimitedLineAggregator;
}
@Bean
public Step salesUnsuccessfulCSVStep(){
return stepBuilderFactory.get("salesUnsuccessfulCSVStep")
.<SalesEnquiry,SalesEnquiryDTO>chunk(250)
.reader(salesUnsuccessfulReport())
.processor(salesReportProcessor)
.writer(salesUnSuccessWriter()).build();
}
@Bean
public JdbcCursorItemReader<SalesEnquiry>
salesUnsuccessfulReport()
{
JdbcCursorItemReader<SalesEnquiry> jdbcCursorItemReader
=new JdbcCursorItemReader<>();
jdbcCursorItemReader
.setSql("Select * from sales_enquiry where ENQUIRY_DATE
>=trunc(sysdate)
and ENQUIRY_DATE<trunc(sysdate) + 1 and PRODUCT_SOLD=0");
jdbcCursorItemReader.setDataSource(dataSource);
jdbcCursorItemReader.setRowMapper(
new BeanPropertyRowMapper<SalesEnquiry>
(SalesEnquiry.class));
return jdbcCursorItemReader;
}
}
spring.datasource.initialize=true spring.datasource.url=jdbc:oracle:thin:@localhost:1521:orcl spring.datasource.password=add-your-password spring.datasource.driver-class-name =oracle.jdbc.driver.OracleDriver spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.Oracle10gDialect spring.jpa.hibernate.ddl-auto = update spring.datasource.username=sys as sysdba spring.datasource.initialization-mode=always
Other Spring Batch Post
Spring Batch Components And Architecture
Spring Batch Tasklet With Spring Boot + How to create Tasklet in Spring Batch
Spring Batch Example -CSV To Database with Spring Boot & Oracle
Reading And Writing Multiple Files In Spring Batch Using MultiResourceItemReader & ItemReader
Spring Batch - Skip Limit, Skip Policy & No Skip
0 Comments
If you have any doubts let me know.