모르지 않다는 것은 아는것과 다르다.

Spring Batch

ItemReader

채마스 2022. 2. 27. 01:20

Custom ItemReader

  • ItemReader 를 커스터마이징 아래와 같은 코드 방식으로 커스터마이징 할 수 있다.
    public class CustomItemReader<T> implements ItemReader {

    private final List<T> items;

    public CustomItemReader(List<T> items) {
        this.items = new ArrayList<>(items);
    }

    @Override
    public Object read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        if(!items.isEmpty()) {
            return items.remove(0);
        }

        return null; //chunk 반복의 끝 null 리턴
    }
}
  • 위와 같이 ItemReader를 구현한다.
  • items 리스트에 객체가 있을때까지 제거하면서 반환한다.

    @Bean
    public Job itemReaderJob() throws Exception {
        return this.jobBuilderFactory.get("itemReaderJob")
                .incrementer(new RunIdIncrementer())
                .start(this.customItemReaderStep())
                .next(this.jpaStep())
                .build();
    }

    @Bean
    public Step customItemReaderStep() {
        return this.stepBuilderFactory.get("customItemReaderStep")
                .<Person, Person>chunk(10)
                .reader(new CustomItemReader<>(getItems()))
                .writer(itemWriter())
                .build();
    }

    private ItemWriter<Person> itemWriter() {
        return items -> log.info(items.stream()
                .map(Person::getName)
                .collect(Collectors.joining(",")));
    }

    private List<Person> getItems() {
        List<Person> items = new ArrayList<>();
        for(int i = 0; i <10; i++) {
            items.add(new Person(i+1,"test name","test age", "test address"));
        }
        return items;
    }
  • reader() 의 변수로 CustomItemReader<>를 대입해서 ItemReader를 커스터마이징 할 수 있다.




CSV 파일 읽기

private FlatFileItemReader<Person> csvFileItemReader() throws Exception{
    DefaultLineMapper<Person> lineMapper = new DefaultLineMapper<>(); //csv 파일을 라인별로 앍을 수 있는 라인맵퍼
    DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); //person 필드명을 설정하는 tokenizer
    tokenizer.setNames("id", "name", "age", "address"); //person에 필드명 설정
    lineMapper.setLineTokenizer(tokenizer);

    lineMapper.setFieldSetMapper(fieldSet -> {
        int id = fieldSet.readInt("id");
        String name = fieldSet.readString("name");
        String age = fieldSet.readString("age");
        String address = fieldSet.readString("address");

        return new Person(id, name, age, address);
    });
    FlatFileItemReader<Person> itemReader = new FlatFileItemReaderBuilder<Person>()
            .name("csvFileItemReader")
            .encoding("UTF-8")
            .resource(new ClassPathResource("test.csv")) // resources 파일 밑에서 파일을 읽어옴
            .linesToSkip(1) //첫번째 줄 스킵한다는 의미
            .lineMapper(lineMapper)
            .build();

    itemReader.afterPropertiesSet(); //itemReader에서 필요한 필수 설정값이 잘 설정되었는지 체크함

    return itemReader;
}
  • csv 파일을 라인별로 읽기 위해서 DefaultLineMapper라는 라인맵퍼를 생성한다.
  • 객체의 필드명을 설정하기 위해서 tokenizer 객체를 생성한다.
  • ClassPathResource는 resources 폴더 밑에 파일을 읽을 수 있게 해준다.
  • afterPropertiesSet()함수는 itemReader에서 필요한 필수 설정값이 잘 설정되었는지 체크하는 함수이다.
    @Bean
    public Step csvFileStep() throws Exception {
        return stepBuilderFactory.get("csvFileStep")
                .<Person, Person>chunk(10)
                .reader(this.csvFileItemReader())
                .writer(itemWriter())
                .build();
    }
  • 위와 같이 chunk 기반 스탭을 만들어주고 reader자리에 csvFileItemReader를 넣어주면 csv파일을 읽을 수 있다.






Cusor기반 조회, Paging 기반 조회 비교

  • Cursor 기반 조회
    • 배치 처리가 완료될 때 까지 DB Connection이 연결이다.
    • DB Connection 빈도가 낮아 성능이 좋은 반면, 긴 Connection 유지 시간이 필요하다.
    • 하나의 Connection에서 처리되기 때문에, Thread Safe 하지 않다.
    • 모든 결과를 메모리에 할당하기 때문에, 더 많은 메모리를 사용해야한다.
  • Paging 기반 조회
    • 페이징 단위로 DB Connection을 연결
    • DB Connection 빈도가 높아 비교적 성능이 낮은 반면, Connection 유지 시간이 짧다.
    • 매번 Connection을 하기 때문에 Thread Safe하다.
    • 페이징 단위의 결과만 메모리에 할당하기 때문에, 비교적 더 적은 메모리를 사용한다.




JDBC 데이터 읽기

 

  • Cursor 기반으로 조회한 코드는 아래와 같다.
@Bean
public Step jdbcStep() throws Exception{
    return stepBuilderFactory.get("jdbcStep")
            .<Person, Person>chunk(10)
            .reader(jdbcCusorItemReader())
            .writer(itemWriter())
            .build();
}

private JdbcCursorItemReader<Person> jdbcCusorItemReader() throws Exception{
    JdbcCursorItemReader<Person> itemReader = new JdbcCursorItemReaderBuilder<Person>()
            .name("jdbcCursorItemReader")
            .dataSource(dataSource)
            .sql("select id, name, age, address from person")
            .rowMapper(((rs, rowNum) -> new Person(
                    rs.getInt(1), rs.getString(2), rs.getString(3), rs.getString(2)
            )))
            .build();
    itemReader.afterPropertiesSet();
    return itemReader;
}
  • 위와 같이 dataSource에서 데이터를 읽어와서 rowMapper를 통해서 객체를 생성할 수 있다.
  • Step 에서는 reader()의 변수로 jdbcCusorItemReader()를 받아 jdbc 데이터를 읽을 수 있다.




JPA 데이터 읽기

  • Cursor 기반으로 조회한 코드는 아래와 같다.
@Bean
public Step jpaStep() throws Exception {
    return stepBuilderFactory.get("jpaStep")
            .<Person,Person>chunk(10)
            .reader(this.jpaCursorItemReader())
            .writer(itemWriter())
            .build();
}

private JpaCursorItemReader<Person> jpaCursorItemReader() throws Exception {
    JpaCursorItemReader<Person> itemReader = new JpaCursorItemReaderBuilder<Person>()
            .name("jpaCursorItemReader")
            .entityManagerFactory(entityManagerFactory)
            .queryString("select p from Person p")
            .build();
    itemReader.afterPropertiesSet();
    return itemReader;
}
  • entityManagerFactory를 통해서 객체를 읽어 올 수 있다.
  • Step 에서는 reader()의 변수로 jpaCursorItemReader()를 받아 데이터를 읽을 수 있다.



'Spring Batch' 카테고리의 다른 글

JobExecutionListener  (0) 2022.02.27
ItemWriter  (0) 2022.02.27
ItemProcessor  (0) 2022.02.27
Tasklet, Chunk 비교  (0) 2022.02.27
Spring Batch  (0) 2022.02.27