基于H2实现 Spring Batch应用

基于H2实现 Spring Batch应用

上文我们已经学习了Spring batch的入门教程,但没有使用数据库,仅使用内存存储spring batch元信息及执行信息。本文我们学习如何配置数据库运行,为了简化使用h2数据库。

需求说明

使用数据库保存元信息,可以随时跟踪执行进度,重新执行失败记录。这里使用H2数据库存储。
从csv文件中读取信息,每条信息判断是数据库中是否存在,如果存在更新余额,否则插入新的记录。

假设原始数据如下:

1,John Wick,101,134
2,Neo,102,445
3,Jack Bauer,103,344
4,Pavan Solapure,101,-34

每列数据分别表示用户编号、用户名称、部门编号和金额。

引入依赖

implementation 'org.springframework.boot:spring-boot-starter-batch'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
implementation 'org.springframework.boot:spring-boot-starter-web'
runtimeOnly 'com.h2database:h2'

compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'

testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.batch:spring-batch-test'

配置application.properties

# Enabling H2 Console
spring.h2.console.enabled=true

# Datasource
spring.datasource.url=jdbc:h2:mem:testdb
spring.datasource.username=sa
spring.datasource.password=
spring.datasource.driver-class-name=org.h2.Driver

# disable spring batch auto run job
spring.batch.job.enabled=false

input.file=classpath:person.csv

配置spring.h2.console.enabled=true为了在浏览器中查看h2数据。下面配置h2数据源。

spring.batch.job.enabled=false 禁止spring boot 自动运行job。input.file配置输入文件名称。

定义实体类及repository

定义输入的实体类:

@Data
@Entity
public class Person {

    @Id
    private Long userId;

    private String name;

    private String dept;

    private BigDecimal account;
}

定义PersonRepository:

public interface PersonRepository extends CrudRepository<Person, Long> {

}

实现Reader

读取csv文件,这里使用FlatFileItemReader,并转换为Person输出。

public class Reader extends FlatFileItemReader<Person> {

    public Reader(Resource resource) {
        super();
        setResource(resource);

        DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
        lineTokenizer.setNames("userId", "name", "dept", "amount");
        lineTokenizer.setDelimiter(",");
        lineTokenizer.setStrict(false);

        BeanWrapperFieldSetMapper<Person> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
        fieldSetMapper.setTargetType(Person.class);

        DefaultLineMapper<Person> defaultLineMapper = new DefaultLineMapper<>();
        defaultLineMapper.setLineTokenizer(lineTokenizer);
        defaultLineMapper.setFieldSetMapper(fieldSetMapper);
        setLineMapper(defaultLineMapper);
    }
}

实现Writer

写所有person对象至数据库:

@Component
public class Writer implements ItemWriter<Person> {

    @Autowired
    private PersonRepository repo;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void write(List<? extends Person> users) {
        repo.saveAll(users);
    }
}

实现Processor

处理过程为,首先查询是否存在对应用户信息,存在则更新acount,否则不处理。

@Component
public class Processor implements ItemProcessor<Person, Person> {

    private final PersonRepository userRepo;
    public Processor(PersonRepository userRepo) {this.userRepo = userRepo;}

    @Override
    public Person process(Person user) {
        Optional<Person> userFromDb = userRepo.findById(user.getUserId());
        userFromDb.ifPresent(person -> user.setAccount(user.getAccount().add(person.getAccount())));
        return user;
    }
}

配置job

@Component
@Slf4j
public class AccountKeeperJob extends JobExecutionListenerSupport {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final Processor processor;
    private final Writer writer;

    @Value("${input.file}")
    Resource resource;

    @Autowired
    public AccountKeeperJob(JobBuilderFactory jobBuilderFactory,
                            StepBuilderFactory stepBuilderFactory, Processor processor, Writer writer) {
        this.jobBuilderFactory = jobBuilderFactory;
        this.stepBuilderFactory = stepBuilderFactory;
        this.processor = processor;
        this.writer = writer;
    }

    @Bean(name = "accountJob")
    public Job accountKeeperJob() {

        Step step = stepBuilderFactory.get("step-1")
            .<Person, Person> chunk(1)
            .reader(new Reader(resource))
            .processor(processor)
            .writer(writer)
            .build();

        return jobBuilderFactory.get("accounting-job")
                                .incrementer(new RunIdIncrementer())
                                .listener(this)
                                .start(step)
                                .build();
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
            log.info("BATCH JOB COMPLETED SUCCESSFULLY");
        }
    }
}

我们通过reader、processor、writer创建step,有步骤创建job。并增加监听器,执行完毕打印日志。

通过web请求执行job

@RestController
public class JobInvokerController {

    private final JobLauncher jobLauncher;
    private final Job accountKeeperJob;

    public JobInvokerController(JobLauncher jobLauncher, @Qualifier("accountJob") Job accountKeeperJob) {
        this.jobLauncher = jobLauncher;
        this.accountKeeperJob = accountKeeperJob;
    }

    @RequestMapping("/run-batch-job")
    public String handle() throws Exception {

        JobParameters jobParameters = new JobParametersBuilder()
            .addString("source", "Spring Boot")
            .toJobParameters();
        jobLauncher.run(accountKeeperJob, jobParameters);

        return "Batch job has been invoked";
    }
}

前面通过spring.batch.job.enabled=false配置禁用自动运行job。这里提供请求方式执行job。

配置启用SpringBatch

最后在启动类中增加@EnableBatchProcessing,生成必要基础bean,详细内容见入门篇

@SpringBootApplication
@EnableBatchProcessing
public class BatchWebApplication {

    public static void main(String[] args) {
        SpringApplication.run(BatchWebApplication.class, args);
    }

}

系统启动后,会自动创建spring batch数据库表。执行成功后可以查看执行记录。

通过h2查看运行结果

当程序首次运行后,通过该链接 http://localhost:8080/h2-console 查看h2数据库。如下图:
基于H2实现 Spring Batch应用
一旦通过rest请求方式执行job之后,再次查看H2,结果如下:
基于H2实现 Spring Batch应用

总结

本文在入门篇的基础上,使用h2数据库序列化spring batch元信息及执行信息,通过web方式执行job。