모르지 않다는 것은 아는것과 다르다.

Spring Batch

Job, Step

채마스 2022. 4. 2. 11:12

Job

  • 전체 배치 프로세스를 캡슐화한 도메인이다.
  • Step의 순서를 정의한다.
  • JobParameters를 받는다.
@Bean
public Job helloJob() {
    return jobBuilderFactory.get("helloJob")
            .incrementer(new RunIdIncrementer())
            .start(this.helloStep()) // job 실행 시 최초로 실행될 step 설정
            .build();
}
  • job을 하나 생성한다. -> job은 batch의 실행단위를 말한다.
  • job의 실행단위를 구분할 수 있는 incrementer -> RunIdIncrementer는 job이 실행될 때마다 Id를 자동으로 생성해 준다.
  • job의 name을 "helloJob"으로 지정했는데 name은 spring batch를 실행할 수 있는 key이기도 하다.
  • start() 메서드는 Job 실행 시 최초로 실행될 step을 결정한다.

 

Step

  • step은 job의 실행단위로 생각하면 된다.
  • Chunk 기반 스텝, Tasklet 스텝 2가지로 나뉜다.
@Bean
public Step helloStep() {
    return stepBuilderFactory.get("helloStep")
            .tasklet(((contribution, chunkContext) -> { // tasklet 이라는 step 의 실행 단위 설정
                log.info("hello spring batch");
                return RepeatStatus.FINISHED;
            })).build();
}

 

 

Chunk 기반 Step

  • chunk 기반으로 하나의 트랜잭션에서 데이터를 처리한다.
  • commitInterval 만큼 데이터를 읽고 트랜잭션 경계 내에서 chunkSize 만큼 write 한다.
  • chunkSize: 한 트랜잭션에서 쓸 아이템의 수다.
  • commitInterval: reader가 한 번에 읽을 아이템의 수다.
  • chunkSize >= commitInterval 하지만 보통 같게 맞춰서 사용하는 것이 좋다.

코드 예시

@Bean
public Job sampleJob(JobRepository jobRepository, Step sampleStep) {
    return this.jobBuilderFactory.get("sampleJob")
            .repository(jobRepository)
            .start(sampleStep)
            .build();
}

@Bean
public Step sampleStep(PlatformTransactionManager trasactionManager) {
    return this.stepBuilderFactory.get("sampleStep")
            .<String, String>(10)
            .reader(itemReader())
            .writer(itemWriter())
            .build();
}
  • 먼저 sampleJob이라는 이름으로 job을 생성하고 sampleStep을 넣어준다.
  • 그다음, sampleStep에서 ItemReader, ItemProcessor, ItemWriter 구현체를 설정한다.
  • ItemProcessor는 생략할 수 있다.

 

 

Tasklet 기반 Step

  • 하나의 트랜잭션에서 데이터를 처리한다.
  • 단순한 처리를 할 때 사용한다.

코드 예시

@Bean
public Step sampleStap() {
    return this.stepBuilderFactory.get("sampleStep")
            .tasklet(myTasklet())
            .build();
}
  • 위와 같이 Tasklet 구현체를 설정한다.
    • 내부에 단순한 읽기, 쓰기, 처리 로직을 모두 넣는다.
  • RepeatStatus (반복 상태)를 설정한다.
    • ex) RepeatStatus.FINISHED

 

Job, Step 데이터 저장 방식

@Bean
public Job shareJob() {
    return jobBuilderFactory.get("shareJob")
            .incrementer(new RunIdIncrementer())
            .start(this.shareStep())
            .next(this.shareStep2())
            .build();
}
  • shareStep()을 실행한 뒤, shareStep2()를 실행한다.
@Bean
public Step shareStep() {
    return stepBuilderFactory.get("shareStep")
        .tasklet((contribution, chunkContext) -> {
            StepExecution stepExecution = contribution.getStepExecution();
            ExecutionContext stepExecutionContext = stepExecution.getExecutionContext();
            stepExecutionContext.putString("stepKey", "step execution context");

            JobExecution jobExecution = stepExecution.getJobExecution();
            JobInstance jobInstance = jobExecution.getJobInstance();
            ExecutionContext jobExecutionContext = jobExecution.getExecutionContext();
            jobExecutionContext.putString("jobKey", "job execution context");
            JobParameters jobParameters = jobExecution.getJobParameters();

            log.info("jobName : {}, stepName : {}, parameter : {}",
                    jobInstance.getJobName(),
                    stepExecution.getStepName(),
                    jobParameters.getLong("run.id"));

            return RepeatStatus.FINISHED;
        })
        .build();
}
  • contribution을 통해서 stepExecution을 꺼내고, stepExecution을 통해서 stepExcutionContext를 꺼낸다.
  • STEP_EXECUTION_CONTEXT 테이블에 "stepKey"라는 키로 "step execution context"를 저장했다.
  • stepExecution을 통해서 jobExecution을 꺼내고, jobExecution을 통해서 jobExecutionContext를 꺼낸다.
  • JOB_EXECUTION_CONTEXT 테이블에 "jobKey"라는 키로 "job execution context"를 저장했다.
  • jobExecutuon을 통해서 jobParameter를 꺼내서 결과를 확인해 본다.
  • 위의 결과로 jobName : shareJob, stepName : shareStep, parameter : 1 이 나온 것을 확인할 수 있다.
@Bean
public Step shareStep2() {
    return stepBuilderFactory.get("shareStep2")
        .tasklet((contribution, chunkContext) -> {
            StepExecution stepExecution = contribution.getStepExecution();
            ExecutionContext stepExecutionContext = stepExecution.getExecutionContext();

            JobExecution jobExecution = stepExecution.getJobExecution();
            ExecutionContext jobExecutionContext = jobExecution.getExecutionContext();

            log.info("jobKey : {}, stepkey : {}",
                    jobExecutionContext.getString("jobKey","emptyJobKey"),
                    stepExecutionContext.getString("stepKey","emptyStepKey"));

            return RepeatStatus.FINISHED;
        })
        .build();
}
  • 위의 결과는 jobKey : job execution context, stepKey : emptyStepKey인 것을 확인할 수 있다.
  • 그 이유는 jobExecutionContext는 job 이 관리하는 step 내에서 공유가능하고, stepExecutionContext는 해당 step 내에서만 공유가 가능하기 때문이다.

 

 

References

  • 황지연 님의 스프링 배치

 

'Spring Batch' 카테고리의 다른 글

ExecutionContext  (0) 2022.04.02
JobParameterValidator  (0) 2022.04.02
Spring Batch 메타 테이블  (0) 2022.04.02
성능 평가 Async Step vs Multi-Thread Step vs Partition Step vs Parallel Step  (0) 2022.02.27
JobExecutionDecider  (0) 2022.02.27