개요
- 만약 1억 건 이상의 데이터를 가공해야 하는 작업이 있다면 어떻게 처리하면 좋을까?
- 병렬처리를 한다고 해도 단일 서버에서 1억 건을 처리하는 것은 무리가 있다.
- 그렇기 때문에 여러 서버에서 분산처리하는 것이 바람직하다.
- 이번 글에서는 대용량 작업을 Redis Streams를 활용해서 분산처리할 수 있는 방법에 대해서 정리해보려고 한다.
필수 사전 지식
- Scaling and Parallel Processing(With Spring Batch)
- Redis Streams
먼저 Redis Streams에서 사용할 명령어를 간단하게 정리해 보자.
Redis Streams
Consumer Group 생성
- Redis Streams에서 컨슈머 그룹을 생성하는 명령어는 아래와 같다.
XGROUP CREATE [stream 명] [group 명] [id]
- 예시는 아래와 같다.
XGROUP CREATE job-execution-stream job-execution-group $ MKSTREAM
- 위 예시는 'job-execution-stream'이라는 이름의 스트림에 'job-execution-group'이라는 새 컨슈머 그룹을 생성하고, 이 그룹이 스트림의 최신 메시지부터 읽기 시작하도록 설정한다는 의미다.
- '$'의 의미는 이미 존재하는 모든 메시지는 무시하고 새로운 생성되는 메시지만 처리하겠다는 의미다.
Consumer Group 조회
XINFO GROUPS [stream 명]
Consumer Group 삭제
XGROUP DESTROY [stream 명] [group 명]
### Consumer Group에서 엔트리 조회
- 컨슈머 그룹에서 앤트리를 조회하는 명령어는 아래와 같다.
```ruby
XREADGROUP GROUP [group 명] [consumer 명] [COUNT(생략가능)] [BLOCK TIME(생략가능)] STREAMS [stream 명]
- 예시는 아래와 같다.
XREADGROUP GROUP job-execution-group instance-9001 BLOCK 0 STREAMS job-execution-stream >
- 위 예시는 'job-execution-group' 컨슈머 그룹에 속한 'instance-9001' 컨슈머가 'job-execution-stream' 스트림에서 새 메시지를 무한 대기하며 읽는 방식이다.
- '>'의 의미는 가장 오래된 엔트리부터 조회하겠다는 의미다.
- 이 방식을 사용하면 컨슈머는 새로운 메시지가 스트림에 도착하는 즉시 그 메시지를 읽을 수 있다.
엔트리 추가
- 엔트리를 추가하는 명령어는 아래와 같다.
XADD [stream 명] * [KEY] [VALUE] [KEY] [VALUE] ...
- '*' 라는 의미는 엔트리 Key(ID)를 자동으로 생성하겠다는 의미다.
XADD job-execution-stream * jobName heavyJob startId 1 endId 1000000
- 'job-execution-stream' 스트림에 새로운 메시지를 추가하며, 이 메시지에는 'jobName', 'startId', 'endId'라는 세 개의 필드가 포함되고 각각 'heavyJob', '1', '1000000'이라는 값을 가지게 된다.
- 'job-execution-stream' 라는 스트림이 없다면 자동으로 스트림을 생성하고 엔트리를 추가한다.
이제 본격적으로 구현에 들어가 보자.
분산처리가 가능한 형태의 Spring Batch Job Configuration 구성
- 분산처리가 가능한 형태로 Job Configuration을 작성해 보자.
import io.springbatch.example.common.redis.listener.RedissonLockJobExecutionListener;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.PagingQueryProvider;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@RequiredArgsConstructor
@Configuration
public class HeavyJobJobConfiguration {
private static final int CHUNK_SIZE = 10000;
private static final int PAGE_SIZE = 10000;
private static final int FETCH_SIZE = 10000;
private static final int THREAD_POOL_CORE_SIZE = 5;
private static final int THREAD_POOL_MAX_SIZE = 10;
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final DataSource dataSource;
private final JdbcTemplate jdbcTemplate;
private final RedissonLockJobExecutionListener redissonLockJobExecutionListener;
private Long minId;
private Long startId;
private Long endId;
@Bean(name = "heavyJob")
public Job heavyJob() throws Exception {
return jobBuilderFactory.get("heavyJob")
.start(heavyStep(null, null))
.listener(redissonLockJobExecutionListener) // Redis 분삭락 반환을 위한 Listener
.listener(new JobExecutionListener() {
@Override
public void beforeJob(JobExecution jobExecution) {
minId = jdbcTemplate.queryForObject("SELECT MIN(" + "employee_id" + ") from " + "employee", Long.class) - 1;
}
@Override
public void afterJob(JobExecution jobExecution) {
}
})
.build();
}
@Bean(name = "heavyStep")
@JobScope
public Step heavyStep(
@Value("#{jobParameters[startId]}") Long startId,
@Value("#{jobParameters[endId]}") Long endId
) throws Exception {
this.startId = minId + startId;
this.endId = minId + endId;
return stepBuilderFactory.get("heavyStep")
.<EmployeeDto, EmployeeDto>chunk(CHUNK_SIZE)
.reader(heavyItemReader())
.processor(heavyItemProcessor())
.writer(heavyItemWriter())
.taskExecutor(taskExecutor())
.build();
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(THREAD_POOL_CORE_SIZE);
executor.setMaxPoolSize(THREAD_POOL_MAX_SIZE);
executor.setThreadNamePrefix("multi_threaded_pool-");
executor.initialize();
return executor;
}
@Bean(name = "heavyItemReader")
public JdbcPagingItemReader<EmployeeDto> heavyItemReader() throws Exception {
return new JdbcPagingItemReaderBuilder<EmployeeDto>()
.name("multiThreadItemReader")
.dataSource(dataSource)
.pageSize(PAGE_SIZE)
.fetchSize(FETCH_SIZE)
.rowMapper(new BeanPropertyRowMapper<>(EmployeeDto.class))
.queryProvider(createQueryProvider(startId, endId))
.build();
}
public PagingQueryProvider createQueryProvider(Long startId, Long endId) throws Exception {
SqlPagingQueryProviderFactoryBean queryProviderFactoryBean = new SqlPagingQueryProviderFactoryBean();
queryProviderFactoryBean.setDataSource(dataSource);
queryProviderFactoryBean.setSelectClause("*");
queryProviderFactoryBean.setFromClause("from employee");
queryProviderFactoryBean.setWhereClause("where employee_id >= " + startId + " and employee_id <= " + endId);
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("employee_id", Order.ASCENDING);
queryProviderFactoryBean.setSortKeys(sortKeys);
return queryProviderFactoryBean.getObject();
}
@Bean(name = "heavyItemProcessor")
public ItemProcessor<EmployeeDto, EmployeeDto> heavyItemProcessor() {
return item -> {
item.setEmployeeName("Processed " + item.getEmployeeName());
return item;
};
}
@Bean(name = "heavyItemWriter")
public ItemWriter<EmployeeDto> heavyItemWriter() {
return new JdbcBatchItemWriterBuilder<EmployeeDto>()
.sql("INSERT INTO temp_employee (employee_id, begin_date, employee_name, end_date, employee_code, last_modified_date, created_date) " +
"VALUES (:employeeId, :beginDate, :employeeName, :endDate, :employeeCode, :lastModifiedDate, :createdDate)")
.dataSource(dataSource)
.beanMapped()
.build();
}
}
- 위의 Job Configuration은 Multi-threaded 가 적용된 Job이다.
- 자세한 구현 방식은 Scaling and Parallel Processing(With Spring Batch)에서 알아보았으니 생략하고, 위 코드에서는 Step을 구현한 부분에 집중하자.
- jobParameters에서 startId, endId를 전달받는 것을 확인할 수 있다.
- startId, endId는 createQueryProvider()에서 데이터를 조회하는 데 사용된다.
- Job을 정의하는 부분을 보면 RedissonLockJobExecutionListener를 사용한 것을 확인할 수 있는데, 이 부분은 맨 마지막에 설명한다. (우선 넘어가도록 하자.)
Redis Streams를 사용해서 대용량 작업 분산 처리하기
Redis Streams를 사용해서 대용량 작업 분산처리 크게 아래와 같이 3단계로 나눠서 생각해보자.
- 파드 생성 시 엔트리 수신 대기
- 엔트리 추가
- Group에서 엔트리 조회
1. 파드 생성 시 엔트리 수신 대기
- 위 과정은 애플리케이션이 구동되는 시점에 이루어진다.
- 아래와 같이 RedisConfig를 구성하면 위 그림과 같은 형태가 된다고 생각하면 된다.
import io.springbatch.example.common.redis.streams.listener.BatchJobStreamListener;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.StreamInfo;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.connection.RedisCommands;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStreamCommands;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.*;
@Slf4j
@Configuration
@EnableCaching
@RequiredArgsConstructor
public class RedisConfig {
@Value("${spring.cache.redis.host}")
private String host;
@Value("${spring.cache.redis.port}")
private int port;
@Value("${spring.cache.redis.password}")
private String password;
@Value("${stream.name:job-execution-stream}")
private String streamName;
@Value("${consumer.group.name:job-execution-group}")
private String consumerGroupName;
@Value("${consumer.name:instance}")
private String consumerName;
private final BatchJobStreamListener batchJobStreamListener;
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
@Bean
public RedisConnectionFactory redisConnectionFactory() {
LettuceConnectionFactory lettuceConnectionFactory = new LettuceConnectionFactory(host, port);
lettuceConnectionFactory.setPassword(password);
return lettuceConnectionFactory;
}
@Bean
public RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory());
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
return redisTemplate;
}
@Bean
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer(RedisConnectionFactory factory) {
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1))
.build();
return StreamMessageListenerContainer.create(factory, options);
}
@Bean
public Subscription subscription(RedisConnectionFactory factory,
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer) {
StringRedisTemplate redisTemplate = new StringRedisTemplate(factory);
redisTemplate.execute((RedisCallback<Void>) connection -> {
byte[] streamKeyBytes = streamName.getBytes(StandardCharsets.UTF_8);
try {
connection.xGroupCreate(streamKeyBytes, consumerGroupName, ReadOffset.from("0"));
} catch (RedisSystemException e) {
log.info("consumer group: {} already exists", consumerGroupName);
} catch (Exception e) {
// 예외 처리 로직
log.error("컨슈머 그룹 생성 중 에러", e);
}
return null;
});
Subscription subscription = listenerContainer.receiveAutoAck(
Consumer.from(consumerGroupName, consumerName),
StreamOffset.create(streamName, ReadOffset.lastConsumed()),
batchJobStreamListener);
listenerContainer.start();
return subscription;
}
}
- Subscription 빈을 등록하는 부분을 보면 컨슈머 그룹을 생성하고 있다.
- 만약 파드 3개를 띄우고 위와 같이 RedisConfig가 구성되어 있다고 가정하면 아래의 명령어가 실행된 상태가 된다.
- batchJobStreamListener에 대한 내용은 3단계에서 자세히 설명한다.
- 실제로는 Pod로 구성했지만 로컬에서는 아래와 같이 구성해서 테스트해 볼 수 있다.
- 위와 같이 포트와 컨슈머명을 같이 넘겼다.
- 로컬에서는 저렇게 instance-1, instance-2, instance-3 3개의 애플리케이션을 구동시켜 두면 된다.
XGROUP CREATE job-execution-stream job-execution-group $ MKSTREAM
XREADGROUP GROUP job-execution-group instance-1 block 0 streams job-execution-stream >
XREADGROUP GROUP job-execution-group instance-2 block 0 streams job-execution-stream >
XREADGROUP GROUP job-execution-group instance-3 block 0 streams job-execution-stream >
- job-execution-group라는 컨슈머 그룹이 생성되었고 파드 3개가 해당 컨슈머 그룹으로부터 엔트리를 조회하기 위해서 대기하고 있는 상태다.
2. 엔트리 추가
- 위 사진처럼 떠있는 파드 중 하나에서 대용량 작업을 받는다.
- 그 뒤에 해당 작업을 파드 수만큼 분할해서 Redis Streams에 엔트리를 추가한다.
- 예시 코드는 아래와 같다.
import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
@RestController
@RequiredArgsConstructor
public class RedisStreamTestController {
private final StringRedisTemplate redisTemplate;
@GetMapping("/redis-streams")
public void jobMaster(@RequestParam int rowSize, int replicas, String jobName) {
int partitionSize = rowSize / replicas; // 각 분할의 크기
int remainingSize = rowSize % replicas; // 남는 데이터 크기
int startId = 1;
int endId = 0;
for (int i = 0; i < replicas; i++) {
startId = endId + 1;
endId = startId + partitionSize - 1;
if (i < remainingSize) {
endId++;
}
Map<String, String> idMap = new HashMap<>();
idMap.put("jobName", jobName);
idMap.put("startId", Integer.toString(startId));
idMap.put("endId", Integer.toString(endId));
redisTemplate.opsForStream().add("job-execution-stream", idMap);
}
}
}
- 위와 같이 대용량 작업을 받아서 파드 수만큼 분할해서 Redis Streams에 엔트리를 추가한 것을 확인할 수 있다.
3. Group에서 엔트리 조회
- 1단계에서 RedisConfig를 구성 때에 batchJobStreamListener를 사용하였다.
- batchJobStreamListener의 구현은 아래와 같다.
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.NoSuchJobException;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
@Component
@Slf4j
@RequiredArgsConstructor
public class BatchJobStreamListener implements StreamListener<String, MapRecord<String, String, String>> {
private final JobLauncher jobLauncher;
private final RedissonClient redissonClient;
private final JobRegistry jobRegistry;
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
@Override
public void onMessage(MapRecord<String, String, String> message) {
RLock lock = redissonClient.getLock("batchStreamLock");
try {
// 락 획득
lock.lock();
log.info("Lock 획득! {}", LocalDateTime.now().format(FORMATTER));
Map<String, String> param = message.getValue();
String jobName = param.get("jobName");
jobRegistry.getJobNames().stream()
.filter(jobBeanName -> jobBeanName.equals(jobName))
.findFirst()
.map(jobBeanName -> {
try {
return jobRegistry.getJob(jobBeanName);
} catch (NoSuchJobException e) {
log.error("Job not found: {}", jobBeanName, e);
return null;
}
})
.ifPresent(job -> runJob(job, param));
} catch (Exception e) {
lock.unlock();
}
}
private void runJob(Job job, Map<String, String> param) {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("startId", Long.parseLong(param.get("startId")))
.addLong("endId", Long.parseLong(param.get("endId")))
.addDate("execute_time", new Date())
.toJobParameters();
try {
log.info("Running job: {} with parameters: {}", job.getName(), jobParameters);
jobLauncher.run(job, jobParameters);
} catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
log.error("Error executing job: {}", job.getName(), e);
}
}
}
- StreamListener를 implements 하여 BatchJobStreamListener를 구성하였다.
- StreamListener는 Spring Data Redis에서 제공해 주는 인터페이스다.
- StreamListener는 메시지 스트리밍을 위한 리스너 인터페이스로, 스트림에서 메시지를 수신하고 처리하는 데 사용된다.
주목할 점은 RedissonClient를 사용해서 락을 획득한다는 점이다. 이제 이 부분에 대해서 알아보자.
분산처리 시 Redis의 분산락을 획득해야 하는 이유
- Spring Batch에서 기본적으로 제공해 주는 테이블에 대한 Isolation Level은 ISOLATION_SERIALIZABLE다.
- 위 내용은 AbstractJobRepositoryFactoryBean에서 확인할 수 있다.
- 위와 같이 DEFAULT_ISOLATION_LEVEL가 ISOLATION_SERIALIZABLE로 설정된 것을 확인할 수 있다.
- Redis는 In-Memory 데이터베이스기 때문에 속도가 엄청 빠르다.
- 그렇기 때문에 RDBMS의 ISOLATION_SERIALIZABLE와 싱크를 맞춰주기 위해서는 분산락을 걸어 주어야 한다.
- 아래 코드가 Redis의 분산락과 관련된 부분이다.
- RedissonClient를 활용해서 작업을 실행하기 전에 분산락을 걸어 두었다.
- 그 뒤에 JobListener에서 작업 실행 전에 분산락을 반환했다.
- 이렇게 되면 Spring Batch에서 제공해 주는 테이블에 대한 Isolation Level(ISOLATION_SERIALIZABLE)과 싱크를 맞출 수 있다.
- 더 자세한 내용은 이번 글의 범위를 벗어나는 것 같으니 이 정도로만 정리하자.
'Spring Batch' 카테고리의 다른 글
Scaling and Parallel Processing(With Spring Batch) (0) | 2023.05.20 |
---|---|
ExecutionContext (0) | 2022.04.02 |
JobParameterValidator (0) | 2022.04.02 |
Job, Step (0) | 2022.04.02 |
Spring Batch 메타 테이블 (0) | 2022.04.02 |