本文由 简悦 SimpRead 转码, 原文地址 www.toutiao.com
Spring Batch 是一个轻量级的,完全面向 Spring 的批处理框架,可以应用于企业级大量的数据处理系统。
环境:Springboot2.3.12RELEASE + Spring Batch4.2.7
Spring Batch 是一个轻量级的,完全面向 Spring 的批处理框架,可以应用于企业级大量的数据处理系统。Spring Batch 以 POJO 和大家熟知的 Spring 框架为基础,使开发者更容易的访问和利用企业级服务。Spring Batch 可以提供大量的,可重复的数据处理功能,包括日志记录 / 跟踪,事务管理,作业处理统计工作重新启动、跳过,和资源管理等重要功能。
业务场景:
- 定期提交批处理。
- 并行批处理:作业的并行处理
- 分阶段、企业消息驱动的处理
- 大规模并行批处理
- 故障后手动或计划重新启动
- 相关步骤的顺序处理(扩展到工作流驱动的批处理)
- 部分处理:跳过记录(例如,回滚时)
- 整批事务,适用于小批量或现有存储过程 / 脚本的情况
技术目标:
- 批处理开发人员使用 Spring 编程模型:专注于业务逻辑,让框架负责基础设施。
- 基础架构、批处理执行环境和批处理应用程序之间的关注点清晰分离。
- 提供通用的核心执行服务,作为所有项目都可以实现的接口。
- 提供可 “开箱即用” 的核心执行接口的简单和默认实现。
- 通过在所有层中利用 spring 框架,可以轻松配置、定制和扩展服务。
- 所有现有的核心服务都应该易于替换或扩展,而不会对基础架构层造成任何影响。
- 提供一个简单的部署模型,使用 Maven 构建的架构 JAR 与应用程序完全分离。
Spring Batch 的结构:
此分层体系结构突出了三个主要的高级组件:应用程序、核心和基础架构。该应用程序包含开发人员使用 SpringBatch 编写的所有批处理作业和自定义代码。批处理核心包含启动和控制批处理作业所需的核心运行时类。它包括 JobLauncher、Job 和 Step 的实现。应用程序和核心都构建在公共基础架构之上。此基础结构包含公共读写器和服务(如 RetryTemplate),应用程序开发人员(读写器,如 ItemReader 和 ItemWriter)和核心框架本身(retry,它是自己的库)都使用这些服务。
下面介绍开发流程
本例完成 读取文件内容,经过处理后,将数据保存到数据库中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
<version>6.0.7.Final</version>
</dependency>
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
spring:
datasource:
driverClassName: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/batch?serverTimezone=GMT%2B8
username: root
password: *******
type: com.zaxxer.hikari.HikariDataSource
hikari:
minimumIdle: 10
maximumPoolSize: 200
autoCommit: true
idleTimeout: 30000
poolName: MasterDatabookHikariCP
maxLifetime: 1800000
connectionTimeout: 30000
connectionTestQuery: SELECT 1
---
spring:
jpa:
generateDdl: false
hibernate:
ddlAuto: update
openInView: true
show-sql: true
---
spring:
batch:
job:
enabled: false #是否自动执行任务
initialize-schema: always #自动为我们创建数据库脚本
|
1
2
3
4
|
@Configuration
@EnableBatchProcessing
public class BatchConfig extends DefaultBatchConfigurer{
}
|
接着上一步的配置类 BatchConfig 重写对应方法
1
2
3
4
5
6
7
|
@Override
protected JobLauncher createJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(createJobRepository());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
|
接着上一步的配置类 BatchConfig 重写对应方法
1
2
3
4
5
6
7
8
9
10
11
|
@Resource
private PlatformTransactionManager transactionManager ;
@Override
protected JobRepository createJobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDatabaseType("mysql");
factory.setTransactionManager(transactionManager);
factory.setDataSource(dataSource);
factory.afterPropertiesSet();
return factory.getObject();
}
|
1
2
3
4
5
6
7
8
9
|
@Bean
public Job myJob(JobBuilderFactory builder, @Qualifier("myStep")Step step){
return builder.get("myJob")
.incrementer(new RunIdIncrementer())
.flow(step)
.end()
.listener(jobExecutionListener)
.build();
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
@Bean
public ItemReader<Person> reader(){
FlatFileItemReader<Person> reader = new FlatFileItemReader<>();
reader.setResource(new ClassPathResource("cvs/persons.cvs"));
reader.setLineMapper(new DefaultLineMapper<Person>() {
// 代码块
{
setLineTokenizer(new DelimitedLineTokenizer(",") {
{
setNames("id", "name");
}
}) ;
setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {
{
setTargetType(Person.class) ;
}
});
}
});
return reader;
}
|
1
2
3
4
5
6
7
8
9
10
11
12
|
@Bean
public ItemProcessor<Person, Person2> processorPerson(){
return new ItemProcessor<Person, Person2>() {
@Override
public Person2 process(Person item) throws Exception {
Person2 p = new Person2() ;
p.setId(item.getId()) ;
p.setName(item.getName() + ", pk");
return p ;
}
} ;
}
|
1
2
3
4
5
6
7
8
9
10
11
12
|
@Resource
private Validator<Person> validator ;
@Resource
private EntityManagerFactory entityManagerFactory ;
@Bean
public ItemWriter<Person2> writerPerson(){
JpaItemWriter<Person2> writer = null ;
JpaItemWriterBuilder<Person2> builder = new JpaItemWriterBuilder<>() ;
builder.entityManagerFactory(entityManagerFactory) ;
writer = builder.build() ;
return writer;
}
|
1
2
3
4
5
6
7
8
9
10
11
12
|
@Bean
public Step myStep(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader, ItemWriter<Person> writer, ItemProcessor<Person, Person> processor){
return stepBuilderFactory
.get("myStep")
.<Person, Person>chunk(2) // Chunk的机制(即每次读取一条数据,再处理一条数据,累积到一定数量后再一次性交给writer进行写入操作)
.reader(reader).faultTolerant().retryLimit(3).retry(Exception.class).skip(Exception.class).skipLimit(2)
.listener(new MyReadListener())
.processor(processor)
.writer(writer).faultTolerant().skip(Exception.class).skipLimit(2)
.listener(new MyWriteListener())
.build();
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public class MyReadListener implements ItemReadListener<Person> {
private Logger logger = LoggerFactory.getLogger(MyReadListener.class);
@Override
public void beforeRead() {
}
@Override
public void afterRead(Person item) {
System.out.println("reader after: " + Thread.currentThread().getName()) ;
}
@Override
public void onReadError(Exception ex) {
logger.info("读取数据错误:{}", ex);
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
@Component
public class MyWriteListener implements ItemWriteListener<Person> {
private Logger logger = LoggerFactory.getLogger(MyWriteListener.class);
@Override
public void beforeWrite(List<? extends Person> items) {
}
@Override
public void afterWrite(List<? extends Person> items) {
System.out.println("writer after: " + Thread.currentThread().getName()) ;
}
@Override
public void onWriteError(Exception exception, List<? extends Person> items) {
try {
logger.info(format("%s%n", exception.getMessage()));
for (Person item : items) {
logger.info(format("Failed writing BlogInfo : %s", item.toString()));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
|
person.cvs 文件内容
实体类:
1
2
3
4
5
6
7
8
9
|
@Entity
@Table(name = "t_person")
public class Person {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Integer id ;
private String name ;
}
|
启动任务执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
@RestController
@RequestMapping("/demo")
public class DemoController {
@Resource
@Qualifier("myJob")
private Job job ;
@Resource
private JobLauncher launcher ;
@GetMapping("/index")
public Object index() {
JobParameters jobParameters = new JobParametersBuilder().toJobParameters() ;
try {
launcher.run(job, jobParameters) ;
} catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException
| JobParametersInvalidException e) {
e.printStackTrace();
}
return "success" ;
}
}
|
启动服务,自动为我们创建了表
执行任务
查看表情况
完毕!!!
公众:Springboot 实战案例锦集