개발/Spring Batch

Chunk Process

Debin 2023. 11. 10.
반응형

Chunk

  • Chunk란 여러 개의 아이템을 묶은 하나의 덩어리, 블록을 의미
  • 한번에 하나씩 아이템을 입력 받아 Chunk 단위의 덩어리로 만든 후 Chunk 단위로 트랜잭션을 처리함, 즉 Chunk 단위의 Commit과 Rollback이 이루어짐
  • 일반적으로 대용량 데이터를 한번에 처리하는 것이 아닌 청크 단위로 쪼개어서 더 이상 처리할 데이터가 없을 때까지 반복해서 입출력하는데 사용된다.
  • Chunk<I>는 ItemReader로 읽은 하나의 아이템을 Chunk에서 정한 개수만큼 반복해서 저장하는 타입
  • Chunk<O>는 ItemReader로부터 전달받은 Chunk<I>를 참조해서 ItemProcessor에서 적절하게 가공, 필터링한 다음 ItemWriter에 전달하는 타입

 

아키텍쳐

 

ChunkOrientedTasklet

  • ChunkOrientedTasklet은 스프링 배치에서 제공하는 Tasklet의 구현체로서 Chunk 지향 프로세싱을 담당하는 도메인 객체
  • ItemReader, ItemWriter, ItemProcessor를 사용해 Chunk 기반의 데이터 입출력 처리를 담당한다.
  • TaskletStep에 의해서 반복적으로 실행되며 ChunkOrientedTasklet이 실행될 때마다 매번 새로운 트랜잭션이 생성되어 처리가 이루어진다.
  • exception이 발생할 경우, 해당 chunk는 롤백 되며 이전에 커밋한 chunk는 완료된 상태가 유지된다.
  • 내부적으로 ItemReader를 핸들링하는 ChunkProvider와 ItemProcessor, ItemWriter를 핸들링하는 ChunkProcessor 타입의 구현체를 가진다.

 

구조

 

ChunkProvider

  • ItemReader를 사용해서 소스로부터 아이템을 Chunk Size만큼 읽어서 Chunk 단위로 만들어 제공하는 도메인 객체
  • Chunk<I>를 만들고 내부적으로 반복문을 사용해서 ItemReader.read()를 계속 호출하면서 Item을 Chunk에 쌓는다.
  • 외부로부터 ChunkProvider가 호출될 때 마다 새로운 Chunk가 생성된다.
  • 반복문 종료 시점
    • Chunk size만큼 item을 읽으면 반복문 종료되고 ChunkProcessor로 넘어감
    • ItemReader가 읽은 item이 null일 경우 반복문 종료 및 해당 Step 반복문까지 종료
  • 기본 구현체로서 SimplerChunkProvider와 FaultTolerantChunkProvider가 있다.

 

구조

 

ChunkProcessor

  • ItemProcessor를 사용해서 Item을 변형, 가공, 필터링하고 ItemWriter를 사용해서 Chunk 데이터를 저장, 출력한다.
  • Chunk<O>를 만들고 앞에서 넘어온 Chunk<I>의 item을 한 건씩 처리한 후 Chunk<O>에 저장한다.
  • 외부로부터 ChunkProcessor가 호출될 때마다 항상 새로운 Chunk가 생성된다.
  • ItemProcessor는 설정 시 선택사항으로서 만약 객체가 존재하지 않을 경우 ItemReader에서 읽은 item 그대로가 Chunk<O>에 저장된다.
  • ItemProcessor 처리가 완료되면 Chunk<O>에 있는 List<Item>을 ItemWriter에게 전달한다.
  • ItemWriter 처리가 완료되면 Chunk 트랜잭션이 종료하게 되고 Step 반복문에서 ChunkOrientedTasklet가 새롭게 실행된다.
  • ItemWriter는 Chunk size만큼 데이터를 Commit처리 하기 때문에 Chunk size는 곧Commit Interval이 된다.
  • 기본 구현체로서 SimpleChunkProcessor와 FaultTolerantChunkProcessor가 있다.

 

구조

 

ItemReader

  • 다양한 입력으로부터 데이터를 읽어서 제공하는 인터페이스
    • Flat 파일 - csv,txt (고정 위치로 정의된 데이터 필드나 특수문자로 구별된 데이터의 행)
    • XML, JSON
    • Database
    • JMS, RabbitMQ와 같은 Message Queuing 서비스
    • Custom Reader - 구현 시 멀티 스레드 환경에서 스레드에 안전하게 구현할 필요가 있다.
  • ChunkOrientedTasklet 실행 시 필수적 요소로 설정해야 한다.

 

코드

public interface ItemReader<T> {

   T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;

}

 

  • T read()
    • 입력 데이터를 읽고 다음 데이터로 이동한다.
    • 아이템 하나를 리턴하며 더 이상 아이템이 없는 경우 null 리턴
    • 아이템 하나는 파일의 한줄, DB의 한 row 혹은 XML 파일에서 하나의 엘리먼트가 될 수 있다.
    • 더 이상 처리해야 할 Item이 없어도 예외가 발생하지 않고 ItemProcessor와 같은 다음 단계로 넘어간다.

 

다수의 구현체들이 ItemReader와 ItemStream 인터페이스를 동시에 구현하고 있다.

  • 파일의 스트림을 열거나 종료, DB 커넥션을 열거나 종료, 입력 장치 초기화 등의 작업
  • ExecutionContext에 read와 관련된 여러가지 상태 정보를 저장해서 재시작 시 다시 참조 하도록 지원

 

일부를 제외하고 하위 클래스들은 기본적으로 스레드에 안전하지 않기 때문에 병렬 처리시 데이터 정합성을 위한 동기화 처리가 필요

 

ItemWriter

  • Chunk 단위로 데이터를 받아 일괄 출력 작업을 위한 인터페이스
    • 플랫 파일 - csv, txt
    • XML, Json
    • Database
    • JMS, RabbitMQ와 같은 Message Queuing 서비스
    • Mail Service
    • Custom Writer
  • 아이템 하나가 아닌 아이템 리스트를 전달 받는다.
  • ChunkOrientedTasklet 실행 시 필수적 요소로 설정해야 한다.

 

구조

public interface ItemWriter<T> {

   void write(List<? extends T> items) throws Exception;

}

 

  • void write(List<? extends T> items) throws Exception
    • 출력 데이터를 아이템 리스트로 받아 처리한다.
    • 출력이 완료되고 트랜잭션이 종료되면 새로운 Chunk 단위 프로세스로 이동한다.

 

다수의 구현체들이 ItemWriter와 ItemStream을 동시에 구현하고 있다.

  • 파일의 스트림을 열거나 종료, DB 커넥션을 열거나 종료, 출력 장치 초기화 등의 작업

 

보통 ItemReader 구현체와 1:1 대응 관계인 구현체들로 구성되어 있다.

 

ItemProcessor

  • 데이터를 출력하기 전에 데이터를 가공, 변형, 필터링하는 역할
  • ItemReader 및 ItemWriter와 분리되어 비즈니스 로직을 구현할 수 있다.
  • ItemReader로 부터 받은 아이템을 특정 타입으로 변환해서 ItemWriter에 넘겨줄 수 있다.
  • ItemReader로 부터 받은 아이템들 중 필터 과정을 거쳐 원하는 Item들만 ItemWriter에게 넘겨줄 수 있다.
    • ItemProcessor에서 process() 실행 결과 null을 반환하면 Chunk<O>에 저장되지 않기 때문에 결국 ItemWriter에 전달되지 않는다.
  • ChunkOrientedTasklet 실행 시 선택적 요소이기 때문에 청크 기반 프로세싱에서 ItemProcessor 단계가 반드시 필요한 것은 아니다.

 

구조

public interface ItemProcessor<I, O> {

	@Nullable
	O process(@NonNull I item) throws Exception;
}

 

  • O process(@NonNull I item) throws Exception
    • <I> 제네릭은 ItemReader에서 받을 데이터 타입 지정
    • <O> 제네릭은 ItemWriter에게 보낼 데이터 타입 지정
    • 아이템 하나씩 가공 처리하며 null 리턴할 경우 해당 아이템은 Chunk<O>에 저장되지 않음

 

ItemStream을 구현하지 않는다.

거의 대부분 Customizing해서 사용하기 때문에 기본적으로 제공되는 구현체가 적다.

 

ItemStream

  • ItemReader와 ItemWriter 처리 과정 중 상태를 저장하고 오류가 발생하면 해당 상태를 참조하여 실패한 곳에서 재시작 하도록 지원
  • 리소스를 열고(Open) 닫아야(close)하며 입출력 장치 초기화 등의 작업을 해야 하는 경우
  • ExecutionContext를 매개변수로 받아서 상태 정보를 업데이트(update)하다.
  • ItemReader 및 ItemWriter는 ItemStream을 구현해야 한다.

 

구조

 

 

이상으로 포스팅을 마칩니다.

 

참고자료

https://www.inflearn.com/course/%EC%8A%A4%ED%94%84%EB%A7%81-%EB%B0%B0%EC%B9%98/dashboard

 

반응형

댓글