Spring Batch
                
              ItemReader
                채마스
                 2022. 2. 27. 01:20
              
                          
            Custom ItemReader
- ItemReader 를 커스터마이징 아래와 같은 코드 방식으로 커스터마이징 할 수 있다.
    public class CustomItemReader<T> implements ItemReader {
    private final List<T> items;
    public CustomItemReader(List<T> items) {
        this.items = new ArrayList<>(items);
    }
    @Override
    public Object read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        if(!items.isEmpty()) {
            return items.remove(0);
        }
        return null; //chunk 반복의 끝 null 리턴
    }
}- 위와 같이 ItemReader를 구현한다.
- items 리스트에 객체가 있을때까지 제거하면서 반환한다.
    @Bean
    public Job itemReaderJob() throws Exception {
        return this.jobBuilderFactory.get("itemReaderJob")
                .incrementer(new RunIdIncrementer())
                .start(this.customItemReaderStep())
                .next(this.jpaStep())
                .build();
    }
    @Bean
    public Step customItemReaderStep() {
        return this.stepBuilderFactory.get("customItemReaderStep")
                .<Person, Person>chunk(10)
                .reader(new CustomItemReader<>(getItems()))
                .writer(itemWriter())
                .build();
    }
    private ItemWriter<Person> itemWriter() {
        return items -> log.info(items.stream()
                .map(Person::getName)
                .collect(Collectors.joining(",")));
    }
    private List<Person> getItems() {
        List<Person> items = new ArrayList<>();
        for(int i = 0; i <10; i++) {
            items.add(new Person(i+1,"test name","test age", "test address"));
        }
        return items;
    }
- reader() 의 변수로 CustomItemReader<>를 대입해서 ItemReader를 커스터마이징 할 수 있다.
CSV 파일 읽기
private FlatFileItemReader<Person> csvFileItemReader() throws Exception{
    DefaultLineMapper<Person> lineMapper = new DefaultLineMapper<>(); //csv 파일을 라인별로 앍을 수 있는 라인맵퍼
    DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); //person 필드명을 설정하는 tokenizer
    tokenizer.setNames("id", "name", "age", "address"); //person에 필드명 설정
    lineMapper.setLineTokenizer(tokenizer);
    lineMapper.setFieldSetMapper(fieldSet -> {
        int id = fieldSet.readInt("id");
        String name = fieldSet.readString("name");
        String age = fieldSet.readString("age");
        String address = fieldSet.readString("address");
        return new Person(id, name, age, address);
    });
    FlatFileItemReader<Person> itemReader = new FlatFileItemReaderBuilder<Person>()
            .name("csvFileItemReader")
            .encoding("UTF-8")
            .resource(new ClassPathResource("test.csv")) // resources 파일 밑에서 파일을 읽어옴
            .linesToSkip(1) //첫번째 줄 스킵한다는 의미
            .lineMapper(lineMapper)
            .build();
    itemReader.afterPropertiesSet(); //itemReader에서 필요한 필수 설정값이 잘 설정되었는지 체크함
    return itemReader;
}- csv 파일을 라인별로 읽기 위해서 DefaultLineMapper라는 라인맵퍼를 생성한다.
- 객체의 필드명을 설정하기 위해서 tokenizer 객체를 생성한다.
- ClassPathResource는 resources 폴더 밑에 파일을 읽을 수 있게 해준다.
- afterPropertiesSet()함수는 itemReader에서 필요한 필수 설정값이 잘 설정되었는지 체크하는 함수이다.
    @Bean
    public Step csvFileStep() throws Exception {
        return stepBuilderFactory.get("csvFileStep")
                .<Person, Person>chunk(10)
                .reader(this.csvFileItemReader())
                .writer(itemWriter())
                .build();
    }- 위와 같이 chunk 기반 스탭을 만들어주고 reader자리에 csvFileItemReader를 넣어주면 csv파일을 읽을 수 있다.
Cusor기반 조회, Paging 기반 조회 비교
- Cursor 기반 조회
- 배치 처리가 완료될 때 까지 DB Connection이 연결이다.
- DB Connection 빈도가 낮아 성능이 좋은 반면, 긴 Connection 유지 시간이 필요하다.
- 하나의 Connection에서 처리되기 때문에, Thread Safe 하지 않다.
- 모든 결과를 메모리에 할당하기 때문에, 더 많은 메모리를 사용해야한다.
 
- Paging 기반 조회
- 페이징 단위로 DB Connection을 연결
- DB Connection 빈도가 높아 비교적 성능이 낮은 반면, Connection 유지 시간이 짧다.
- 매번 Connection을 하기 때문에 Thread Safe하다.
- 페이징 단위의 결과만 메모리에 할당하기 때문에, 비교적 더 적은 메모리를 사용한다.
 
JDBC 데이터 읽기
- Cursor 기반으로 조회한 코드는 아래와 같다.
@Bean
public Step jdbcStep() throws Exception{
    return stepBuilderFactory.get("jdbcStep")
            .<Person, Person>chunk(10)
            .reader(jdbcCusorItemReader())
            .writer(itemWriter())
            .build();
}
private JdbcCursorItemReader<Person> jdbcCusorItemReader() throws Exception{
    JdbcCursorItemReader<Person> itemReader = new JdbcCursorItemReaderBuilder<Person>()
            .name("jdbcCursorItemReader")
            .dataSource(dataSource)
            .sql("select id, name, age, address from person")
            .rowMapper(((rs, rowNum) -> new Person(
                    rs.getInt(1), rs.getString(2), rs.getString(3), rs.getString(2)
            )))
            .build();
    itemReader.afterPropertiesSet();
    return itemReader;
}- 위와 같이 dataSource에서 데이터를 읽어와서 rowMapper를 통해서 객체를 생성할 수 있다.
- Step 에서는 reader()의 변수로 jdbcCusorItemReader()를 받아 jdbc 데이터를 읽을 수 있다.
JPA 데이터 읽기
- Cursor 기반으로 조회한 코드는 아래와 같다.
@Bean
public Step jpaStep() throws Exception {
    return stepBuilderFactory.get("jpaStep")
            .<Person,Person>chunk(10)
            .reader(this.jpaCursorItemReader())
            .writer(itemWriter())
            .build();
}
private JpaCursorItemReader<Person> jpaCursorItemReader() throws Exception {
    JpaCursorItemReader<Person> itemReader = new JpaCursorItemReaderBuilder<Person>()
            .name("jpaCursorItemReader")
            .entityManagerFactory(entityManagerFactory)
            .queryString("select p from Person p")
            .build();
    itemReader.afterPropertiesSet();
    return itemReader;
}- entityManagerFactory를 통해서 객체를 읽어 올 수 있다.
- Step 에서는 reader()의 변수로 jpaCursorItemReader()를 받아 데이터를 읽을 수 있다.