絕了!Spring Batch 百萬(wàn)數(shù)據(jù)分區(qū)處理,僅需五秒搞定
環(huán)境:SpringBoot3.4.2
1. 簡(jiǎn)介
Spring Batch默認(rèn)是單線程的。為了實(shí)現(xiàn)并行處理,我們需要對(duì)批處理作業(yè)的步驟進(jìn)行分區(qū)。

如上圖所示,作業(yè)在左側(cè)以一系列步驟(Step)實(shí)例的順序運(yùn)行,其中一個(gè)步驟實(shí)例被標(biāo)記為管理者(manager)。圖中的所有工作節(jié)點(diǎn)(workers)均為同一步驟(Step)的相同實(shí)例,實(shí)際上這些工作節(jié)點(diǎn)完全可以替代管理者的角色,且對(duì)作業(yè)的最終結(jié)果不會(huì)產(chǎn)生任何影響。工作節(jié)點(diǎn)通常為遠(yuǎn)程服務(wù),但也可以是本地執(zhí)行線程。
Spring Batch允許將輸入數(shù)據(jù)從Manager步驟傳遞到Worker步驟,以便每個(gè)Worker都確切知道要做什么。JobRepository確保在作業(yè)的單個(gè)執(zhí)行過(guò)程中,每個(gè)Worker只被執(zhí)行一次。
分區(qū)使用多個(gè)線程來(lái)處理一系列數(shù)據(jù)集。數(shù)據(jù)集的范圍可以通過(guò)編程方式定義。根據(jù)用例,我們可以決定在分區(qū)中創(chuàng)建多少個(gè)線程來(lái)使用。線程的數(shù)量純粹基于需求/要求。
當(dāng)我們需要從源系統(tǒng)中讀取數(shù)百萬(wàn)條記錄,并且不能僅依賴(lài)單個(gè)線程來(lái)處理所有記錄(這可能會(huì)很耗時(shí))時(shí),分區(qū)就非常有用。我們希望使用多個(gè)線程來(lái)讀取和處理數(shù)據(jù),以有效地利用系統(tǒng)資源。
接下來(lái),我們將通過(guò)Spring Batch分區(qū)功能實(shí)現(xiàn)百萬(wàn)數(shù)據(jù)的讀取處理。
2.實(shí)戰(zhàn)案例
依賴(lài)管理
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>開(kāi)啟批處理:
@Configuration
@EnableBatchProcessing
public class AppConfig {}準(zhǔn)備數(shù)據(jù)如下:

數(shù)據(jù)總是為:

2.1 自定義分區(qū)
分區(qū)器(Partitioner)是用于為分區(qū)步驟創(chuàng)建輸入?yún)?shù)的核心策略接口,其輸入?yún)?shù)形式為ExecutionContext實(shí)例。通常目標(biāo)是生成一組互不重疊的輸入值集合,例如一組無(wú)重疊的主鍵范圍或唯一文件名。
在此示例中,我們通過(guò)查詢(xún)表獲取最大ID值和最小ID值(假設(shè)ID是連續(xù)的,如果你的ID是UUID,那么可以通過(guò)時(shí)間字段來(lái)劃分),并基于此在所有記錄間創(chuàng)建分區(qū)。
對(duì)于分區(qū)器,我們將網(wǎng)格大?。╣ridSize)設(shè)置為線程數(shù)量??筛鶕?jù)實(shí)際需求使用自定義值。
public class IdRangePartitioner implements Partitioner {
private final JdbcClient jdbcClient;
public IdRangePartitioner(JdbcClient jdbcClient) {
this.jdbcClient = jdbcClient;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
int min = this.jdbcClient.sql("SELECT MIN(id) FROM o_user").query(Integer.class).single() ;
int max = this.jdbcClient.sql("SELECT MAX(id) FROM o_user").query(Integer.class).single() ;
int targetSize = (max - min) / gridSize + 1;
Map<String, ExecutionContext> result = new HashMap<>();
int number = 0;
int start = min;
int end = start + targetSize - 1;
while (start <= max) {
ExecutionContext value = new ExecutionContext();
result.put("partition" + number, value);
if (end >= max) {
end = max;
}
// 這里設(shè)置的值每一個(gè)Worker,都將會(huì)在具體的ItemReader中通過(guò)SpEL動(dòng)態(tài)獲取
value.putInt("minValue", start);
value.putInt("maxValue", end);
start += targetSize;
end += targetSize;
number++;
}
return result ;
}
}2.2 配置Job
接下來(lái),我們需要配置job執(zhí)行所需要的bean(Reader,Writer,Step)。
@Configuration
public class JobConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
private final DataSource dataSource ;
public JobConfig(JobRepository jobRepository, PlatformTransactionManager transactionManager,
DataSource dataSource) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
this.dataSource = dataSource ;
}
@Bean
IdRangePartitioner partitioner(JdbcClient jdbcClient) {
IdRangePartitioner columnRangePartitioner = new IdRangePartitioner(jdbcClient);
return columnRangePartitioner;
}
@Bean
@StepScope
JdbcPagingItemReader<User> pagingItemReader(
@Value("#{stepExecutionContext['minValue']}") Long minValue,
@Value("#{stepExecutionContext['maxValue']}") Long maxValue) {
System.out.println("reading " + minValue + " to " + maxValue);
Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("id", Order.ASCENDING);
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("id, name, age, phone, sex");
queryProvider.setFromClause("from o_user");
queryProvider.setWhereClause("where id >= " + minValue + " and id < " + maxValue);
queryProvider.setSortKeys(sortKeys);
JdbcPagingItemReader<User> reader = new JdbcPagingItemReader<>();
reader.setDataSource(this.dataSource);
reader.setFetchSize(1000);
reader.setRowMapper(new UserRowMapper());
reader.setQueryProvider(queryProvider);
return reader;
}
@Bean
@StepScope
JdbcBatchItemWriter<User> userItemWriter() {
JdbcBatchItemWriter<User> itemWriter = new JdbcBatchItemWriter<>();
itemWriter.setDataSource(dataSource);
itemWriter.setSql("insert into n_user values (:id, :name, :age, :phone, :sex)");
itemWriter.setItemSqlParameterSourceProvider
(new BeanPropertyItemSqlParameterSourceProvider<>());
return itemWriter;
}
// 主Step
@Bean
Step stepMaster(IdRangePartitioner partitioner) {
return new StepBuilder("stepMaster", jobRepository)
.partitioner(slaveStep().getName(), partitioner)
.step(slaveStep())
// 配置線程數(shù)
.gridSize(50)
.taskExecutor(new SimpleAsyncTaskExecutor())
.build();
}
@Bean
Step slaveStep() {
return new StepBuilder("slaveStep", jobRepository)
.<User, User>chunk(1000, transactionManager)
.reader(pagingItemReader(null, null))
.writer(userItemWriter())
.build();
}
@Bean
Job job(@Qualifier("stepMaster") Step stepMaster) {
return new JobBuilder("job", jobRepository)
.start(stepMaster)
.build();
}
}說(shuō)明:
- stepMaster中設(shè)置了任務(wù)執(zhí)行器SimpleAsyncTaskExecutor,使用該執(zhí)行器我們也可以非常方便的設(shè)置使用虛擬線程執(zhí)行(setVirtualThreads(true))。
- 步驟(Step)中使用的資源(如數(shù)據(jù)源DataSource)可能存在并發(fā)限制。
- IdRangePartitioner:用于為分區(qū)步驟創(chuàng)建輸入?yún)?shù)的核心策略接口,輸入?yún)?shù)形式為ExecutionContext實(shí)例。
- JdbcPagingItemReader:此Bean使用分頁(yè)方式讀取數(shù)據(jù),并根據(jù)范圍接受最小值(minValue)和最大值(maxValue),獲取該范圍內(nèi)的數(shù)據(jù)。同時(shí)我們?cè)O(shè)置每次讀取1000條。
- JdbcBatchItemWriter:此Bean將數(shù)據(jù)寫(xiě)入另一個(gè)表。
- Step:這是批處理作業(yè)中配置的步驟,負(fù)責(zé)數(shù)據(jù)的讀取和寫(xiě)入操作。
- Job:表示批處理作業(yè)的批處理領(lǐng)域?qū)ο蟆?/li>
2.3 其它輔助類(lèi)
User實(shí)體對(duì)象
public class User {
private Integer id ;
private String name ;
private Integer age ;
private String phone ;
private String sex ;
// getters, setters
}表數(shù)據(jù)到對(duì)象的映射Mapper類(lèi)
public class UserRowMapper implements RowMapper<User> {
@Override
public User mapRow(ResultSet rs, int rowNum) throws SQLException {
User user = new User();
user.setId(rs.getInt("id"));
user.setName(rs.getString("name"));
user.setPhone(rs.getString("phone"));
user.setSex(rs.getString("sex"));
user.setAge(rs.getInt("age"));
return user;
}
}2.4 測(cè)試
@Component
public class TaskRunner implements CommandLineRunner {
private final JobLauncher jobLauncher;
private final Job job;
public TaskRunner(JobLauncher jobLauncher, Job job) {
this.jobLauncher = jobLauncher;
this.job = job;
}
@Override
public void run(String... args) throws Exception {
long start = System.currentTimeMillis() ;
JobParameters jobParameters = new JobParametersBuilder()
.addString("JobId", String.valueOf(System.currentTimeMillis()))
.toJobParameters();
JobExecution execution = jobLauncher.run(job, jobParameters);
System.err.println("STATUS :: " + execution.getStatus());
}
}啟動(dòng)服務(wù)后,控制臺(tái)首先輸出的是分區(qū)信息:

當(dāng)完成以后輸出結(jié)果:
































