모르지 않다는 것은 아는것과 다르다.

Spring Batch

성능 평가 Async Step vs Multi-Thread Step vs Partition Step vs Parallel Step

채마스 2022. 2. 27. 01:31

성능 평가 Async Step vs Multi-Thread Step vs Partition Step vs Parallel Step

 

Async Step

  • single thread 기반과 async Processing 기반을 비교하면 아래와 같다.
    • single thread

  • async thread

  • ItemProcessor와 ItemWriter를 Async로 실행한다.
  • java.util.concurrent에서 제공하는 Future를 기반으로 asynchronous를 실행한다.
  • Async를 사용하기 위해 spring-batch-integration 디펜던시를 추가해 줘야한다.
  • ItemProcessor는 아래와 같이 구현할 수 있다.
private AsyncItemProcessor<User, User> itemProcessor() {
    ItemProcessor<User, User> itemProcessor =  user -> {
        if (user.availableLevelUp()){
            return user;
        }

        return null;
    };

    AsyncItemProcessor<User, User> asyncItemProcessor = new AsyncItemProcessor<>();
    asyncItemProcessor.setDelegate(itemProcessor);
    asyncItemProcessor.setTaskExecutor(this.taskExecutor);

    return asyncItemProcessor;
}
- itemProcessor를 구현하고 asyncItemProcessor.setDelegate 메소드에 넣어주면 된다.
- async 로 구현된 itemProcessor를 사용하는 Step은 반환값의 타입을 Future 로 바꿔줘야 된다.
  • ItemWriter는 아래와 같이 구현할 수 있다.
private AsyncItemWriter<User> itemWriter() {
    ItemWriter<User> itemWriter = users -> users.forEach(x -> {
            x.levelUp();
            userRepository.save(x);
        });

    AsyncItemWriter<User> asyncItemWriter = new AsyncItemWriter<>();
    asyncItemWriter.setDelegate(itemWriter);

    return asyncItemWriter;
}
  • itemWriter를 구현해주고 asyncItemWriter.setDelegate 메서드에 넣어주면 된다.



Multi-Thread Step

 

  • Chunk 단위로 멀티 스레딩 처리한다.
  • Thread-Safe 한 ItemReader 가 필수적으로 필요하다.
    • Thread-Safe 한 paging 기반 ItemReader가 사용하는 것이 좋다.
  • 구현은 아래와 같이 할 수 있다.
@Bean(JOB_NAME + "_userLevelUpStep")
public Step userLevelUpStep() throws Exception {
    return this.stepBuilderFactory.get(JOB_NAME + "_userLevelUpStep")
            .<User, User>chunk(CHUNK)
            .reader(itemReader())
            .processor(itemProcessor())
            .writer(itemWriter())
            .taskExecutor(this.taskExecutor)
            .throttleLimit(8) // 몇 개의 쓰레드로 chunk 를 처리할지 결정
            .build();
}
  • throttleLimit 은 몇 개의 스레드로 chunk를 처리할지 결정하는데 기본값은 4이다.




Partition Step

 

  • 하나의 Master 기준으로 여러 Slave Step을 생성해 Step 기준으로 Multi-Thread 처리하는 방식이다.
  • 구현은 아래와 같다.
@Bean
@StepScope
JpaPagingItemReader<? extends User> itemReader(@Value("#{stepExecutionContext[minId]}") Long   minId, @Value("#{stepExecutionContext[maxId]}") Long maxId) throws Exception {

    Map<String, Object> parameters = new HashMap<>();
    parameters.put("minId",minId);
    parameters.put("maxId",maxId);

    JpaPagingItemReader<User> itemReader = new JpaPagingItemReaderBuilder<User>()
            .queryString("select u from User u where u.id between :minId and :maxId")
            .parameterValues(parameters)
            .entityManagerFactory(entityManagerFactory)
            .pageSize(CHUNK)
            .name(JOB_NAME + "_userItemReader")
            .build();

    itemReader.afterPropertiesSet();
    return itemReader;
}
  • itemReader에서 stepExecutionContext를 사용하려면 @StepScope 애노테이션을 붙여줘야 한다. -> 그러기 위해서는 @Bean 애노테이션으로 설정되어야 한다.
  • #{stepExecutionContext [minId]} 을 통해서 stepExecutionContext에 저장된 minId를 가져올 수 있다.
  • master와 slave 설정은 아래와 같이 할 수 있다.
@Bean(JOB_NAME + "_userLevelUpStep.manager")
public Step userLevelUpManagerStep() throws Exception {
    return this.stepBuilderFactory.get(JOB_NAME + "_userLevelUpStep.manager")
            .partitioner(JOB_NAME + "_userLevelUpStep", new UserLevelUpPartitioner(userRepository))
            .step(userLevelUpStep())
            .partitionHandler(taskExecutorPartitionHandler())
            .build();
}
  • 위의 경우 userLevelUpMangerStep 이 master step이 되고,
  • step(userLevelUpStep())에 설정된 스탭이 slave step이 된다.
  • partitionHandler를 설정해 줘야 하는데 아래와 같이 설정해 줄 수 있다.
@Bean(JOB_NAME + "_taskExecutorPartitionHandler")
PartitionHandler taskExecutorPartitionHandler() throws Exception {
    TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();

    handler.setStep(userLevelUpStep());
    handler.setTaskExecutor(this.taskExecutor);
    handler.setGridSize(8);

    return handler;
}
  • handler.setStep(userLevelUpStep()); 에서 slave step을 설정해 준다.
  • handler.setTaskExecutor(this.taskExecutor); 또한 설정해 준다.
  • 마지막으로 handler.setGridSize(8); 를 설정해 줌으로써 partitional에서 사용할 grid size를 설정해 준다. -> 총 8개의 파티션으로 나눠서 진행한다는 의미이다.




Parallel Step

  • Job에 포함된 N개의 Step을 병렬로 동시에 처리하는 방식이다.
  • Multi- Thread 방식은 chunk 단위로 동시에 실행했으므로 차이가 있다.
  • partition step 방식과는 Step을 병렬로 동시에 처리한다는 공통점이 있지만, partition step 방식의 경우 1개의 마스터 스탭을 여러 개의 슬래이브 스탭으로 나눠서 처리한다는 차이가 있다.
  • 구현은 아래와 같이 할 수 있다.
@Bean(JOB_NAME + "_saveUserFlow")
public Flow saveUserFlow() {
    TaskletStep saveUserStep = this.stepBuilderFactory.get(JOB_NAME + "_saveUserStep")
            .tasklet(new SaveUserTasklet(userRepository))
            .build();

    return new FlowBuilder<SimpleFlow>(JOB_NAME + "_saveUserFlow")
            .start(saveUserStep)
            .build();
}
  • 위와 같이 tasklet을 Flow로 감싸서 리턴해야 한다.
private Step orderStatisticsStep(@Value("#{jobParameters[date]}") String date) throws Exception {
  return this.stepBuilderFactory.get(JOB_NAME + "_orderStatisticsStep")
          .<OrderStatistics, OrderStatistics>chunk(CHUNK)
          .reader(orderStatisticsItemReader(date))
          .writer(orderStatisticsItemWriter(date))
          .build();
}
  • orderStaticsStep에 설정된 @Bean, @StepScope를 제거해 준다.
private Flow orderStatisticsFlow(String date) throws Exception {
  return new FlowBuilder<SimpleFlow>(JOB_NAME + "_orderStatisticsStep")
          .start(new JobParametersDecide("date"))
          .on(JobParametersDecide.CONTINUE.getName())
          .to(this.orderStatisticsStep(date))
          .build();
}
  • start, on , to 메서드를 실행한 후,
  • orderStatisticsStep 메서드를 Flow로 감싸서 리턴한다.
@Bean(JOB_NAME + "_splitFlow")
@JobScope
public Flow splitFlow(@Value("#{jobParameters[date]}") String date) throws Exception {
  Flow userLevelUpFlow = new FlowBuilder<SimpleFlow>(JOB_NAME + "_userLevelUpFlow")
          .start(userLevelUpStep())
          .build();

  return new FlowBuilder<SimpleFlow>(JOB_NAME + "_splitFlow")
          .split(this.taskExecutor)
          .add(userLevelUpFlow, orderStatisticsFlow(date))
          .build();
}
  • 위와 같이 split(this.taskExecutor)에서 userLevelUpStep과 orderStaticsStep을 Flow로 감싸서 묶었다.
  • 여기서 핵심은 splitFlow인데, splitFlow가 userLevelUpStep와 orderStaticsStep를 병렬로 처리한다.

'Spring Batch' 카테고리의 다른 글

Job, Step  (0) 2022.04.02
Spring Batch 메타 테이블  (0) 2022.04.02
JobExecutionDecider  (0) 2022.02.27
Skip, Retry  (0) 2022.02.27
StepListener  (0) 2022.02.27