itemWriter를 구현해주고 asyncItemWriter.setDelegate 메서드에 넣어주면 된다.
Multi-Thread Step
Chunk 단위로 멀티 스레딩 처리한다.
Thread-Safe 한 ItemReader 가 필수적으로 필요하다.
Thread-Safe 한 paging 기반 ItemReader가 사용하는 것이 좋다.
구현은 아래와 같이 할 수 있다.
@Bean(JOB_NAME + "_userLevelUpStep")
public Step userLevelUpStep() throws Exception {
return this.stepBuilderFactory.get(JOB_NAME + "_userLevelUpStep")
.<User, User>chunk(CHUNK)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.taskExecutor(this.taskExecutor)
.throttleLimit(8) // 몇 개의 쓰레드로 chunk 를 처리할지 결정
.build();
}
throttleLimit 은 몇 개의 스레드로 chunk를 처리할지 결정하는데 기본값은 4이다.
Partition Step
하나의 Master 기준으로 여러 Slave Step을 생성해 Step 기준으로 Multi-Thread 처리하는 방식이다.
구현은 아래와 같다.
@Bean
@StepScope
JpaPagingItemReader<? extends User> itemReader(@Value("#{stepExecutionContext[minId]}") Long minId, @Value("#{stepExecutionContext[maxId]}") Long maxId) throws Exception {
Map<String, Object> parameters = new HashMap<>();
parameters.put("minId",minId);
parameters.put("maxId",maxId);
JpaPagingItemReader<User> itemReader = new JpaPagingItemReaderBuilder<User>()
.queryString("select u from User u where u.id between :minId and :maxId")
.parameterValues(parameters)
.entityManagerFactory(entityManagerFactory)
.pageSize(CHUNK)
.name(JOB_NAME + "_userItemReader")
.build();
itemReader.afterPropertiesSet();
return itemReader;
}
itemReader에서 stepExecutionContext를 사용하려면 @StepScope 애노테이션을 붙여줘야 한다. -> 그러기 위해서는 @Bean 애노테이션으로 설정되어야 한다.
#{stepExecutionContext [minId]} 을 통해서 stepExecutionContext에 저장된 minId를 가져올 수 있다.