Spring Batch
                
              성능 평가 Async Step vs Multi-Thread Step vs Partition Step vs Parallel Step
                채마스
                 2022. 2. 27. 01:31
              
                          
            성능 평가 Async Step vs Multi-Thread Step vs Partition Step vs Parallel Step
Async Step
- single thread 기반과 async Processing 기반을 비교하면 아래와 같다.
- single thread
 

- async thread

- ItemProcessor와 ItemWriter를 Async로 실행한다.
- java.util.concurrent에서 제공하는 Future를 기반으로 asynchronous를 실행한다.
- Async를 사용하기 위해 spring-batch-integration 디펜던시를 추가해 줘야한다.
- ItemProcessor는 아래와 같이 구현할 수 있다.
private AsyncItemProcessor<User, User> itemProcessor() {
    ItemProcessor<User, User> itemProcessor =  user -> {
        if (user.availableLevelUp()){
            return user;
        }
        return null;
    };
    AsyncItemProcessor<User, User> asyncItemProcessor = new AsyncItemProcessor<>();
    asyncItemProcessor.setDelegate(itemProcessor);
    asyncItemProcessor.setTaskExecutor(this.taskExecutor);
    return asyncItemProcessor;
}- itemProcessor를 구현하고 asyncItemProcessor.setDelegate 메소드에 넣어주면 된다.
- async 로 구현된 itemProcessor를 사용하는 Step은 반환값의 타입을 Future 로 바꿔줘야 된다.- ItemWriter는 아래와 같이 구현할 수 있다.
private AsyncItemWriter<User> itemWriter() {
    ItemWriter<User> itemWriter = users -> users.forEach(x -> {
            x.levelUp();
            userRepository.save(x);
        });
    AsyncItemWriter<User> asyncItemWriter = new AsyncItemWriter<>();
    asyncItemWriter.setDelegate(itemWriter);
    return asyncItemWriter;
}- itemWriter를 구현해주고 asyncItemWriter.setDelegate 메서드에 넣어주면 된다.
Multi-Thread Step

- Chunk 단위로 멀티 스레딩 처리한다.
- Thread-Safe 한 ItemReader 가 필수적으로 필요하다.
- Thread-Safe 한 paging 기반 ItemReader가 사용하는 것이 좋다.
 
- 구현은 아래와 같이 할 수 있다.
@Bean(JOB_NAME + "_userLevelUpStep")
public Step userLevelUpStep() throws Exception {
    return this.stepBuilderFactory.get(JOB_NAME + "_userLevelUpStep")
            .<User, User>chunk(CHUNK)
            .reader(itemReader())
            .processor(itemProcessor())
            .writer(itemWriter())
            .taskExecutor(this.taskExecutor)
            .throttleLimit(8) // 몇 개의 쓰레드로 chunk 를 처리할지 결정
            .build();
}- throttleLimit 은 몇 개의 스레드로 chunk를 처리할지 결정하는데 기본값은 4이다.
Partition Step

- 하나의 Master 기준으로 여러 Slave Step을 생성해 Step 기준으로 Multi-Thread 처리하는 방식이다.
- 구현은 아래와 같다.
@Bean
@StepScope
JpaPagingItemReader<? extends User> itemReader(@Value("#{stepExecutionContext[minId]}") Long   minId, @Value("#{stepExecutionContext[maxId]}") Long maxId) throws Exception {
    Map<String, Object> parameters = new HashMap<>();
    parameters.put("minId",minId);
    parameters.put("maxId",maxId);
    JpaPagingItemReader<User> itemReader = new JpaPagingItemReaderBuilder<User>()
            .queryString("select u from User u where u.id between :minId and :maxId")
            .parameterValues(parameters)
            .entityManagerFactory(entityManagerFactory)
            .pageSize(CHUNK)
            .name(JOB_NAME + "_userItemReader")
            .build();
    itemReader.afterPropertiesSet();
    return itemReader;
}- itemReader에서 stepExecutionContext를 사용하려면 @StepScope 애노테이션을 붙여줘야 한다. -> 그러기 위해서는 @Bean 애노테이션으로 설정되어야 한다.
- #{stepExecutionContext [minId]} 을 통해서 stepExecutionContext에 저장된 minId를 가져올 수 있다.
- master와 slave 설정은 아래와 같이 할 수 있다.
@Bean(JOB_NAME + "_userLevelUpStep.manager")
public Step userLevelUpManagerStep() throws Exception {
    return this.stepBuilderFactory.get(JOB_NAME + "_userLevelUpStep.manager")
            .partitioner(JOB_NAME + "_userLevelUpStep", new UserLevelUpPartitioner(userRepository))
            .step(userLevelUpStep())
            .partitionHandler(taskExecutorPartitionHandler())
            .build();
}- 위의 경우 userLevelUpMangerStep 이 master step이 되고,
- step(userLevelUpStep())에 설정된 스탭이 slave step이 된다.
- partitionHandler를 설정해 줘야 하는데 아래와 같이 설정해 줄 수 있다.
@Bean(JOB_NAME + "_taskExecutorPartitionHandler")
PartitionHandler taskExecutorPartitionHandler() throws Exception {
    TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
    handler.setStep(userLevelUpStep());
    handler.setTaskExecutor(this.taskExecutor);
    handler.setGridSize(8);
    return handler;
}- handler.setStep(userLevelUpStep());에서 slave step을 설정해 준다.
- handler.setTaskExecutor(this.taskExecutor);또한 설정해 준다.
- 마지막으로 handler.setGridSize(8);를 설정해 줌으로써 partitional에서 사용할 grid size를 설정해 준다. -> 총 8개의 파티션으로 나눠서 진행한다는 의미이다.
Parallel Step
- Job에 포함된 N개의 Step을 병렬로 동시에 처리하는 방식이다.
- Multi- Thread 방식은 chunk 단위로 동시에 실행했으므로 차이가 있다.
- partition step 방식과는 Step을 병렬로 동시에 처리한다는 공통점이 있지만, partition step 방식의 경우 1개의 마스터 스탭을 여러 개의 슬래이브 스탭으로 나눠서 처리한다는 차이가 있다.
- 구현은 아래와 같이 할 수 있다.
@Bean(JOB_NAME + "_saveUserFlow")
public Flow saveUserFlow() {
    TaskletStep saveUserStep = this.stepBuilderFactory.get(JOB_NAME + "_saveUserStep")
            .tasklet(new SaveUserTasklet(userRepository))
            .build();
    return new FlowBuilder<SimpleFlow>(JOB_NAME + "_saveUserFlow")
            .start(saveUserStep)
            .build();
}- 위와 같이 tasklet을 Flow로 감싸서 리턴해야 한다.
private Step orderStatisticsStep(@Value("#{jobParameters[date]}") String date) throws Exception {
  return this.stepBuilderFactory.get(JOB_NAME + "_orderStatisticsStep")
          .<OrderStatistics, OrderStatistics>chunk(CHUNK)
          .reader(orderStatisticsItemReader(date))
          .writer(orderStatisticsItemWriter(date))
          .build();
}- orderStaticsStep에 설정된 @Bean, @StepScope를 제거해 준다.
private Flow orderStatisticsFlow(String date) throws Exception {
  return new FlowBuilder<SimpleFlow>(JOB_NAME + "_orderStatisticsStep")
          .start(new JobParametersDecide("date"))
          .on(JobParametersDecide.CONTINUE.getName())
          .to(this.orderStatisticsStep(date))
          .build();
}- start, on , to 메서드를 실행한 후,
- orderStatisticsStep 메서드를 Flow로 감싸서 리턴한다.
@Bean(JOB_NAME + "_splitFlow")
@JobScope
public Flow splitFlow(@Value("#{jobParameters[date]}") String date) throws Exception {
  Flow userLevelUpFlow = new FlowBuilder<SimpleFlow>(JOB_NAME + "_userLevelUpFlow")
          .start(userLevelUpStep())
          .build();
  return new FlowBuilder<SimpleFlow>(JOB_NAME + "_splitFlow")
          .split(this.taskExecutor)
          .add(userLevelUpFlow, orderStatisticsFlow(date))
          .build();
}- 위와 같이 split(this.taskExecutor)에서 userLevelUpStep과 orderStaticsStep을 Flow로 감싸서 묶었다.
- 여기서 핵심은 splitFlow인데, splitFlow가 userLevelUpStep와 orderStaticsStep를 병렬로 처리한다.