성능 평가 Async Step vs Multi-Thread Step vs Partition Step vs Parallel Step
Async Step
- single thread 기반과 async Processing 기반을 비교하면 아래와 같다.
- single thread
- async thread
- ItemProcessor와 ItemWriter를 Async로 실행한다.
- java.util.concurrent에서 제공하는 Future를 기반으로 asynchronous를 실행한다.
- Async를 사용하기 위해 spring-batch-integration 디펜던시를 추가해 줘야한다.
- ItemProcessor는 아래와 같이 구현할 수 있다.
private AsyncItemProcessor<User, User> itemProcessor() {
ItemProcessor<User, User> itemProcessor = user -> {
if (user.availableLevelUp()){
return user;
}
return null;
};
AsyncItemProcessor<User, User> asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(itemProcessor);
asyncItemProcessor.setTaskExecutor(this.taskExecutor);
return asyncItemProcessor;
}
- itemProcessor를 구현하고 asyncItemProcessor.setDelegate 메소드에 넣어주면 된다.
- async 로 구현된 itemProcessor를 사용하는 Step은 반환값의 타입을 Future 로 바꿔줘야 된다.
- ItemWriter는 아래와 같이 구현할 수 있다.
private AsyncItemWriter<User> itemWriter() {
ItemWriter<User> itemWriter = users -> users.forEach(x -> {
x.levelUp();
userRepository.save(x);
});
AsyncItemWriter<User> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(itemWriter);
return asyncItemWriter;
}
- itemWriter를 구현해주고 asyncItemWriter.setDelegate 메서드에 넣어주면 된다.
Multi-Thread Step
- Chunk 단위로 멀티 스레딩 처리한다.
- Thread-Safe 한 ItemReader 가 필수적으로 필요하다.
- Thread-Safe 한 paging 기반 ItemReader가 사용하는 것이 좋다.
- 구현은 아래와 같이 할 수 있다.
@Bean(JOB_NAME + "_userLevelUpStep")
public Step userLevelUpStep() throws Exception {
return this.stepBuilderFactory.get(JOB_NAME + "_userLevelUpStep")
.<User, User>chunk(CHUNK)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.taskExecutor(this.taskExecutor)
.throttleLimit(8) // 몇 개의 쓰레드로 chunk 를 처리할지 결정
.build();
}
- throttleLimit 은 몇 개의 스레드로 chunk를 처리할지 결정하는데 기본값은 4이다.
Partition Step
- 하나의 Master 기준으로 여러 Slave Step을 생성해 Step 기준으로 Multi-Thread 처리하는 방식이다.
- 구현은 아래와 같다.
@Bean
@StepScope
JpaPagingItemReader<? extends User> itemReader(@Value("#{stepExecutionContext[minId]}") Long minId, @Value("#{stepExecutionContext[maxId]}") Long maxId) throws Exception {
Map<String, Object> parameters = new HashMap<>();
parameters.put("minId",minId);
parameters.put("maxId",maxId);
JpaPagingItemReader<User> itemReader = new JpaPagingItemReaderBuilder<User>()
.queryString("select u from User u where u.id between :minId and :maxId")
.parameterValues(parameters)
.entityManagerFactory(entityManagerFactory)
.pageSize(CHUNK)
.name(JOB_NAME + "_userItemReader")
.build();
itemReader.afterPropertiesSet();
return itemReader;
}
- itemReader에서 stepExecutionContext를 사용하려면 @StepScope 애노테이션을 붙여줘야 한다. -> 그러기 위해서는 @Bean 애노테이션으로 설정되어야 한다.
- #{stepExecutionContext [minId]} 을 통해서 stepExecutionContext에 저장된 minId를 가져올 수 있다.
- master와 slave 설정은 아래와 같이 할 수 있다.
@Bean(JOB_NAME + "_userLevelUpStep.manager")
public Step userLevelUpManagerStep() throws Exception {
return this.stepBuilderFactory.get(JOB_NAME + "_userLevelUpStep.manager")
.partitioner(JOB_NAME + "_userLevelUpStep", new UserLevelUpPartitioner(userRepository))
.step(userLevelUpStep())
.partitionHandler(taskExecutorPartitionHandler())
.build();
}
- 위의 경우 userLevelUpMangerStep 이 master step이 되고,
- step(userLevelUpStep())에 설정된 스탭이 slave step이 된다.
- partitionHandler를 설정해 줘야 하는데 아래와 같이 설정해 줄 수 있다.
@Bean(JOB_NAME + "_taskExecutorPartitionHandler")
PartitionHandler taskExecutorPartitionHandler() throws Exception {
TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
handler.setStep(userLevelUpStep());
handler.setTaskExecutor(this.taskExecutor);
handler.setGridSize(8);
return handler;
}
handler.setStep(userLevelUpStep());
에서 slave step을 설정해 준다.handler.setTaskExecutor(this.taskExecutor);
또한 설정해 준다.- 마지막으로
handler.setGridSize(8);
를 설정해 줌으로써 partitional에서 사용할 grid size를 설정해 준다. -> 총 8개의 파티션으로 나눠서 진행한다는 의미이다.
Parallel Step
- Job에 포함된 N개의 Step을 병렬로 동시에 처리하는 방식이다.
- Multi- Thread 방식은 chunk 단위로 동시에 실행했으므로 차이가 있다.
- partition step 방식과는 Step을 병렬로 동시에 처리한다는 공통점이 있지만, partition step 방식의 경우 1개의 마스터 스탭을 여러 개의 슬래이브 스탭으로 나눠서 처리한다는 차이가 있다.
- 구현은 아래와 같이 할 수 있다.
@Bean(JOB_NAME + "_saveUserFlow")
public Flow saveUserFlow() {
TaskletStep saveUserStep = this.stepBuilderFactory.get(JOB_NAME + "_saveUserStep")
.tasklet(new SaveUserTasklet(userRepository))
.build();
return new FlowBuilder<SimpleFlow>(JOB_NAME + "_saveUserFlow")
.start(saveUserStep)
.build();
}
- 위와 같이 tasklet을 Flow로 감싸서 리턴해야 한다.
private Step orderStatisticsStep(@Value("#{jobParameters[date]}") String date) throws Exception {
return this.stepBuilderFactory.get(JOB_NAME + "_orderStatisticsStep")
.<OrderStatistics, OrderStatistics>chunk(CHUNK)
.reader(orderStatisticsItemReader(date))
.writer(orderStatisticsItemWriter(date))
.build();
}
- orderStaticsStep에 설정된 @Bean, @StepScope를 제거해 준다.
private Flow orderStatisticsFlow(String date) throws Exception {
return new FlowBuilder<SimpleFlow>(JOB_NAME + "_orderStatisticsStep")
.start(new JobParametersDecide("date"))
.on(JobParametersDecide.CONTINUE.getName())
.to(this.orderStatisticsStep(date))
.build();
}
- start, on , to 메서드를 실행한 후,
- orderStatisticsStep 메서드를 Flow로 감싸서 리턴한다.
@Bean(JOB_NAME + "_splitFlow")
@JobScope
public Flow splitFlow(@Value("#{jobParameters[date]}") String date) throws Exception {
Flow userLevelUpFlow = new FlowBuilder<SimpleFlow>(JOB_NAME + "_userLevelUpFlow")
.start(userLevelUpStep())
.build();
return new FlowBuilder<SimpleFlow>(JOB_NAME + "_splitFlow")
.split(this.taskExecutor)
.add(userLevelUpFlow, orderStatisticsFlow(date))
.build();
}
- 위와 같이 split(this.taskExecutor)에서 userLevelUpStep과 orderStaticsStep을 Flow로 감싸서 묶었다.
- 여기서 핵심은 splitFlow인데, splitFlow가 userLevelUpStep와 orderStaticsStep를 병렬로 처리한다.
'Spring Batch' 카테고리의 다른 글
Job, Step (0) | 2022.04.02 |
---|---|
Spring Batch 메타 테이블 (0) | 2022.04.02 |
JobExecutionDecider (0) | 2022.02.27 |
Skip, Retry (0) | 2022.02.27 |
StepListener (0) | 2022.02.27 |