모르지 않다는 것은 아는것과 다르다.

Spring Batch

대용량 작업 분산처리하기(With Spring Batch)

채마스 2023. 6. 4. 15:24

개요

  • 만약 1억 건 이상의 데이터를 가공해야 하는 작업이 있다면 어떻게 처리하면 좋을까?
  • 병렬처리를 한다고 해도 단일 서버에서 1억 건을 처리하는 것은 무리가 있다.
  • 그렇기 때문에 여러 서버에서 분산처리하는 것이 바람직하다.
  • 이번 글에서는 대용량 작업을 Redis Streams를 활용해서 분산처리할 수 있는 방법에 대해서 정리해보려고 한다.

 

필수 사전 지식

 

먼저 Redis Streams에서 사용할 명령어를 간단하게 정리해 보자.

 

Redis Streams

Consumer Group 생성

  • Redis Streams에서 컨슈머 그룹을 생성하는 명령어는 아래와 같다. 
  • XGROUP CREATE [stream 명] [group 명] [id]
  • 예시는 아래와 같다.
    XGROUP CREATE job-execution-stream job-execution-group $ MKSTREAM
  • 위 예시는 'job-execution-stream'이라는 이름의 스트림에 'job-execution-group'이라는 새 컨슈머 그룹을 생성하고, 이 그룹이 스트림의 최신 메시지부터 읽기 시작하도록 설정한다는 의미다.
  • '$'의 의미는 이미 존재하는 모든 메시지는 무시하고 새로운 생성되는 메시지만 처리하겠다는 의미다.

Consumer Group 조회

XINFO GROUPS [stream 명]

Consumer Group 삭제

XGROUP DESTROY [stream 명] [group 명]

### Consumer Group에서 엔트리 조회
- 컨슈머 그룹에서 앤트리를 조회하는 명령어는 아래와 같다.

```ruby
XREADGROUP GROUP [group 명] [consumer 명] [COUNT(생략가능)] [BLOCK TIME(생략가능)] STREAMS [stream 명]
  • 예시는 아래와 같다.
    XREADGROUP GROUP job-execution-group instance-9001 BLOCK 0 STREAMS job-execution-stream >
  • 위 예시는 'job-execution-group' 컨슈머 그룹에 속한 'instance-9001' 컨슈머가 'job-execution-stream' 스트림에서 새 메시지를 무한 대기하며 읽는 방식이다.
  • '>'의 의미는 가장 오래된 엔트리부터 조회하겠다는 의미다.
  • 이 방식을 사용하면 컨슈머는 새로운 메시지가 스트림에 도착하는 즉시 그 메시지를 읽을 수 있다.

엔트리 추가

  • 엔트리를 추가하는 명령어는 아래와 같다.
  • XADD [stream 명] * [KEY] [VALUE] [KEY] [VALUE] ...
  • '*' 라는 의미는 엔트리 Key(ID)를 자동으로 생성하겠다는 의미다.
XADD job-execution-stream * jobName heavyJob startId 1 endId 1000000
  • 'job-execution-stream' 스트림에 새로운 메시지를 추가하며, 이 메시지에는 'jobName', 'startId', 'endId'라는 세 개의 필드가 포함되고 각각 'heavyJob', '1', '1000000'이라는 값을 가지게 된다.
  • 'job-execution-stream' 라는 스트림이 없다면 자동으로 스트림을 생성하고 엔트리를 추가한다.

 

이제 본격적으로 구현에 들어가 보자.

 

분산처리가 가능한 형태의 Spring Batch Job Configuration 구성

  • 분산처리가 가능한 형태로 Job Configuration을 작성해 보자.

import io.springbatch.example.common.redis.listener.RedissonLockJobExecutionListener;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.PagingQueryProvider;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

@Slf4j
@RequiredArgsConstructor
@Configuration
public class HeavyJobJobConfiguration {
    private static final int CHUNK_SIZE = 10000;
    private static final int PAGE_SIZE = 10000;
    private static final int FETCH_SIZE = 10000;
    private static final int THREAD_POOL_CORE_SIZE = 5;
    private static final int THREAD_POOL_MAX_SIZE = 10;
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource;
    private final JdbcTemplate jdbcTemplate;
    private final RedissonLockJobExecutionListener redissonLockJobExecutionListener;
    private Long minId;
    private Long startId;
    private Long endId;

    @Bean(name = "heavyJob")
    public Job heavyJob() throws Exception {
        return jobBuilderFactory.get("heavyJob")
                .start(heavyStep(null, null))
                .listener(redissonLockJobExecutionListener) // Redis 분삭락 반환을 위한 Listener
                .listener(new JobExecutionListener() {
                    @Override
                    public void beforeJob(JobExecution jobExecution) {
                        minId = jdbcTemplate.queryForObject("SELECT MIN(" + "employee_id" + ") from " + "employee", Long.class) - 1;
                    }

                    @Override
                    public void afterJob(JobExecution jobExecution) {

                    }
                })
                .build();
    }

    @Bean(name = "heavyStep")
    @JobScope
    public Step heavyStep(
            @Value("#{jobParameters[startId]}") Long startId,
            @Value("#{jobParameters[endId]}") Long endId
    ) throws Exception {

        this.startId = minId + startId;
        this.endId = minId + endId;

        return stepBuilderFactory.get("heavyStep")
                .<EmployeeDto, EmployeeDto>chunk(CHUNK_SIZE)
                .reader(heavyItemReader())
                .processor(heavyItemProcessor())
                .writer(heavyItemWriter())
                .taskExecutor(taskExecutor())
                .build();
    }

    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(THREAD_POOL_CORE_SIZE);
        executor.setMaxPoolSize(THREAD_POOL_MAX_SIZE);
        executor.setThreadNamePrefix("multi_threaded_pool-");
        executor.initialize();
        return executor;
    }


    @Bean(name = "heavyItemReader")
    public JdbcPagingItemReader<EmployeeDto> heavyItemReader() throws Exception {
        return new JdbcPagingItemReaderBuilder<EmployeeDto>()
                .name("multiThreadItemReader")
                .dataSource(dataSource)
                .pageSize(PAGE_SIZE)
                .fetchSize(FETCH_SIZE)
                .rowMapper(new BeanPropertyRowMapper<>(EmployeeDto.class))
                .queryProvider(createQueryProvider(startId, endId))
                .build();
    }


    public PagingQueryProvider createQueryProvider(Long startId, Long endId) throws Exception {
        SqlPagingQueryProviderFactoryBean queryProviderFactoryBean = new SqlPagingQueryProviderFactoryBean();
        queryProviderFactoryBean.setDataSource(dataSource);
        queryProviderFactoryBean.setSelectClause("*");
        queryProviderFactoryBean.setFromClause("from employee");
        queryProviderFactoryBean.setWhereClause("where employee_id >= " + startId + " and employee_id <= " + endId);

        Map<String, Order> sortKeys = new HashMap<>(1);
        sortKeys.put("employee_id", Order.ASCENDING);
        queryProviderFactoryBean.setSortKeys(sortKeys);

        return queryProviderFactoryBean.getObject();
    }


    @Bean(name = "heavyItemProcessor")
    public ItemProcessor<EmployeeDto, EmployeeDto> heavyItemProcessor() {
        return item -> {
            item.setEmployeeName("Processed " + item.getEmployeeName());
            return item;
        };
    }

    @Bean(name = "heavyItemWriter")
    public ItemWriter<EmployeeDto> heavyItemWriter() {
        return new JdbcBatchItemWriterBuilder<EmployeeDto>()
                .sql("INSERT INTO temp_employee (employee_id, begin_date, employee_name, end_date, employee_code, last_modified_date, created_date) " +
                        "VALUES (:employeeId, :beginDate, :employeeName, :endDate, :employeeCode, :lastModifiedDate, :createdDate)")
                .dataSource(dataSource)
                .beanMapped()
                .build();
    }

}
  • 위의 Job Configuration은 Multi-threaded 가 적용된 Job이다.
  • 자세한 구현 방식은 Scaling and Parallel Processing(With Spring Batch)에서 알아보았으니 생략하고, 위 코드에서는 Step을 구현한 부분에 집중하자.
  • jobParameters에서 startId, endId를 전달받는 것을 확인할 수 있다.
  • startId, endId는 createQueryProvider()에서 데이터를 조회하는 데 사용된다.
  • Job을 정의하는 부분을 보면 RedissonLockJobExecutionListener를 사용한 것을 확인할 수 있는데, 이 부분은 맨 마지막에 설명한다. (우선 넘어가도록 하자.)

 

Redis Streams를 사용해서 대용량 작업 분산 처리하기

Redis Streams를 사용해서 대용량 작업 분산처리 크게 아래와 같이 3단계로 나눠서 생각해보자.

  1. 파드 생성 시 엔트리 수신 대기
  2. 엔트리 추가
  3. Group에서 엔트리 조회

1. 파드 생성 시 엔트리 수신 대기

  • 위 과정은 애플리케이션이 구동되는 시점에 이루어진다.
  • 아래와 같이 RedisConfig를 구성하면 위 그림과 같은 형태가 된다고 생각하면 된다.
import io.springbatch.example.common.redis.streams.listener.BatchJobStreamListener;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.StreamInfo;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.connection.RedisCommands;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStreamCommands;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.*;

@Slf4j
@Configuration
@EnableCaching
@RequiredArgsConstructor
public class RedisConfig {

    @Value("${spring.cache.redis.host}")
    private String host;

    @Value("${spring.cache.redis.port}")
    private int port;

    @Value("${spring.cache.redis.password}")
    private String password;

    @Value("${stream.name:job-execution-stream}")
    private String streamName;

    @Value("${consumer.group.name:job-execution-group}")
    private String consumerGroupName;

    @Value("${consumer.name:instance}")
    private String consumerName;
    private final BatchJobStreamListener batchJobStreamListener;

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        return container;
    }

    @Bean
    public RedisConnectionFactory redisConnectionFactory() {
        LettuceConnectionFactory lettuceConnectionFactory = new LettuceConnectionFactory(host, port);
        lettuceConnectionFactory.setPassword(password);
        return lettuceConnectionFactory;
    }

    @Bean
    public RedisTemplate<String, Object> redisTemplate() {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory());
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        return redisTemplate;
    }

    @Bean
    public StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer(RedisConnectionFactory factory) {
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer
                .StreamMessageListenerContainerOptions
                .builder()
                .pollTimeout(Duration.ofSeconds(1))
                .build();

        return StreamMessageListenerContainer.create(factory, options);
    }

    @Bean
    public Subscription subscription(RedisConnectionFactory factory,
                                     StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer) {

        StringRedisTemplate redisTemplate = new StringRedisTemplate(factory);
        redisTemplate.execute((RedisCallback<Void>) connection -> {
            byte[] streamKeyBytes = streamName.getBytes(StandardCharsets.UTF_8);

            try {
                connection.xGroupCreate(streamKeyBytes, consumerGroupName, ReadOffset.from("0"));
            } catch (RedisSystemException e) {
                log.info("consumer group: {} already exists", consumerGroupName);
            } catch (Exception e) {
                // 예외 처리 로직
                log.error("컨슈머 그룹 생성 중 에러", e);
            }
            return null;
        });

        Subscription subscription = listenerContainer.receiveAutoAck(
                Consumer.from(consumerGroupName, consumerName),
                StreamOffset.create(streamName, ReadOffset.lastConsumed()),
                batchJobStreamListener);

        listenerContainer.start();
        return subscription;
    }

}
  • Subscription 빈을 등록하는 부분을 보면 컨슈머 그룹을 생성하고 있다.
  • 만약 파드 3개를 띄우고 위와 같이 RedisConfig가 구성되어 있다고 가정하면 아래의 명령어가 실행된 상태가 된다.
  • batchJobStreamListener에 대한 내용은 3단계에서 자세히 설명한다.
  • 실제로는 Pod로 구성했지만 로컬에서는 아래와 같이 구성해서 테스트해 볼 수 있다.

  • 위와 같이 포트와 컨슈머명을 같이 넘겼다.
  • 로컬에서는 저렇게 instance-1, instance-2, instance-3 3개의 애플리케이션을 구동시켜 두면 된다.
XGROUP CREATE job-execution-stream job-execution-group $ MKSTREAM

XREADGROUP GROUP job-execution-group instance-1 block 0 streams job-execution-stream >
XREADGROUP GROUP job-execution-group instance-2 block 0 streams job-execution-stream >
XREADGROUP GROUP job-execution-group instance-3 block 0 streams job-execution-stream >
  • job-execution-group라는 컨슈머 그룹이 생성되었고 파드 3개가 해당 컨슈머 그룹으로부터 엔트리를 조회하기 위해서 대기하고 있는 상태다.

 

2. 엔트리 추가

  • 위 사진처럼 떠있는 파드 중 하나에서 대용량 작업을 받는다.
  • 그 뒤에 해당 작업을 파드 수만큼 분할해서 Redis Streams에 엔트리를 추가한다.
  • 예시 코드는 아래와 같다.

import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;

@RestController
@RequiredArgsConstructor
public class RedisStreamTestController {

    private final StringRedisTemplate redisTemplate;


    @GetMapping("/redis-streams")
    public void jobMaster(@RequestParam int rowSize, int replicas, String jobName) {
        int partitionSize = rowSize / replicas; // 각 분할의 크기
        int remainingSize = rowSize % replicas; // 남는 데이터 크기

        int startId = 1;
        int endId = 0;

        for (int i = 0; i < replicas; i++) {
            startId = endId + 1;
            endId = startId + partitionSize - 1;

            if (i < remainingSize) {
                endId++;
            }

            Map<String, String> idMap = new HashMap<>();
            idMap.put("jobName", jobName);
            idMap.put("startId", Integer.toString(startId));
            idMap.put("endId", Integer.toString(endId));

            redisTemplate.opsForStream().add("job-execution-stream", idMap);
        }
    }
}
  • 위와 같이 대용량 작업을 받아서 파드 수만큼 분할해서 Redis Streams에 엔트리를 추가한 것을 확인할 수 있다.

 

3. Group에서 엔트리 조회

  • 1단계에서 RedisConfig를 구성 때에 batchJobStreamListener를 사용하였다.
  • batchJobStreamListener의 구현은 아래와 같다.
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.NoSuchJobException;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;

@Component
@Slf4j
@RequiredArgsConstructor
public class BatchJobStreamListener implements StreamListener<String, MapRecord<String, String, String>> {

    private final JobLauncher jobLauncher;
    private final RedissonClient redissonClient;
    private final JobRegistry jobRegistry;
    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");


    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        RLock lock = redissonClient.getLock("batchStreamLock");

        try {
            // 락 획득
            lock.lock();
            log.info("Lock 획득! {}", LocalDateTime.now().format(FORMATTER));

            Map<String, String> param = message.getValue();
            String jobName = param.get("jobName");

            jobRegistry.getJobNames().stream()
                    .filter(jobBeanName -> jobBeanName.equals(jobName))
                    .findFirst()
                    .map(jobBeanName -> {
                        try {
                            return jobRegistry.getJob(jobBeanName);
                        } catch (NoSuchJobException e) {
                            log.error("Job not found: {}", jobBeanName, e);
                            return null;
                        }
                    })
                    .ifPresent(job -> runJob(job, param));


        } catch (Exception e) {
            lock.unlock();
        }

    }

    private void runJob(Job job, Map<String, String> param) {
        JobParameters jobParameters = new JobParametersBuilder()
                .addLong("startId", Long.parseLong(param.get("startId")))
                .addLong("endId", Long.parseLong(param.get("endId")))
                .addDate("execute_time", new Date())
                .toJobParameters();

        try {
            log.info("Running job: {} with parameters: {}", job.getName(), jobParameters);
            jobLauncher.run(job, jobParameters);
        } catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
            log.error("Error executing job: {}", job.getName(), e);
        }
    }
}
  • StreamListener를 implements 하여 BatchJobStreamListener를 구성하였다.
  • StreamListener는 Spring Data Redis에서 제공해 주는 인터페이스다.
  • StreamListener는 메시지 스트리밍을 위한 리스너 인터페이스로, 스트림에서 메시지를 수신하고 처리하는 데 사용된다.

 

주목할 점은 RedissonClient를 사용해서 락을 획득한다는 점이다. 이제 이 부분에 대해서 알아보자.

 

분산처리 시 Redis의 분산락을 획득해야 하는 이유

  • Spring Batch에서 기본적으로 제공해 주는 테이블에 대한 Isolation Level은 ISOLATION_SERIALIZABLE다.
  • 위 내용은 AbstractJobRepositoryFactoryBean에서 확인할 수 있다.

  • 위와 같이 DEFAULT_ISOLATION_LEVEL가 ISOLATION_SERIALIZABLE로 설정된 것을 확인할 수 있다.
  • Redis는 In-Memory 데이터베이스기 때문에 속도가 엄청 빠르다.
  • 그렇기 때문에 RDBMS의 ISOLATION_SERIALIZABLE와 싱크를 맞춰주기 위해서는 분산락을 걸어 주어야 한다.
  • 아래 코드가 Redis의 분산락과 관련된 부분이다.

  • RedissonClient를 활용해서 작업을 실행하기 전에 분산락을 걸어 두었다.
  • 그 뒤에 JobListener에서 작업 실행 전에 분산락을 반환했다.
  • 이렇게 되면 Spring Batch에서 제공해 주는 테이블에 대한 Isolation Level(ISOLATION_SERIALIZABLE)과 싱크를 맞출 수 있다.
  • 더 자세한 내용은 이번 글의 범위를 벗어나는 것 같으니 이 정도로만 정리하자.

'Spring Batch' 카테고리의 다른 글

Scaling and Parallel Processing(With Spring Batch)  (0) 2023.05.20
ExecutionContext  (0) 2022.04.02
JobParameterValidator  (0) 2022.04.02
Job, Step  (0) 2022.04.02
Spring Batch 메타 테이블  (0) 2022.04.02