기존의 ItemReader가 아닌, InfluxDB에서 데이터를 받아서 저장하는 커스텀 ItemReader를 직접 구현해야 하는 상황이라, Spring Batch의 실행 흐름과 Reader의 동작 방식을 확실히 이해하고 가기로 했습니다.
Spring Batch 실행 흐름 개요
Spring Batch의 실행 흐름은 다음과 같이 Job → Step → Tasklet 구조로 진행됩니다.
1. Job 실행 (`JobLauncher.run()`)
├──> Step 실행 (`TaskletStep.doExecute()`)
├──> 트랜잭션 시작 (`doInTransaction()`)
├──> Chunk 단위 데이터 처리 (`ChunkOrientedTasklet.execute()`)
├──> ItemReader.read() 호출 (데이터 읽기)
├──> ItemProcessor.process() 호출 (데이터 가공)
├──> ItemWriter.write() 호출 (데이터 저장)
├──> Chunk 실행 완료 시 트랜잭션 커밋 (`commit()`)
├──> ItemReader가 `null` 반환 시 Step 종료
2. 모든 Step이 종료되면 Job 종료 (`JobRepository.update()`)
1. Step 실행 (TaskletStep.doExecute())
Job이 실행되면 Step이 실행됩니다.
@Override
protected void doExecute(StepExecution stepExecution) throws Exception {
stepOperations.iterate(new StepContextRepeatCallback(stepExecution) {
@Override
public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext) throws Exception {
return new TransactionTemplate(transactionManager, transactionAttribute)
.execute(new ChunkTransactionCallback(chunkContext, semaphore));
}
});
}
- stepOperations.iterate()를 통해 Step이 반복 실행됨.
- 내부적으로 트랜잭션을 시작 (TransactionTemplate.execute()).ChunkTransactionCallback 내부에서 ItemReader.read() → ItemProcessor.process() → ItemWriter.write()`가 실행됨.
2. 트랜잭션 내에서 Chunk 처리 (ChunkOrientedTasklet.execute())
Step이 실행되면 Chunk 단위로 반복 처리됩니다.
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
Chunk<I> inputs = (Chunk<I>) chunkContext.getAttribute(INPUTS_KEY);
if (inputs == null) {
inputs = chunkProvider.provide(contribution);
if (buffering) {
chunkContext.setAttribute(INPUTS_KEY, inputs);
}
}
chunkProcessor.process(contribution, inputs);
return RepeatStatus.continueIf(!inputs.isEnd());
}
- transactionTemplate.execute() 내에서 Chunk 단위로 트랜잭션 실행.
- ChunkProvider.provide() 호출 → ItemReader.read()가 실행되어 데이터 읽기.
- ChunkProcessor.process() 호출 → ItemProcessor와 ItemWriter가 실행됨.
3. Chunk 단위로 데이터 제공 (ChunkProvider.provide())
Step이 실행되면 Chunk 단위로 반복 처리됩니다.
@Override
public Chunk<I> provide(final StepContribution contribution) throws Exception {
final Chunk<I> inputs = new Chunk<>();
repeatOperations.iterate(new RepeatCallback() {
@Override
public RepeatStatus doInIteration(final RepeatContext context) throws Exception {
I item = null;
Timer.Sample sample = Timer.start(Metrics.globalRegistry);
String status = BatchMetrics.STATUS_SUCCESS;
try {
item = read(contribution, inputs); // 🔥 ItemReader.read() 실행됨!
} catch (SkipOverflowException e) {
status = BatchMetrics.STATUS_FAILURE;
return RepeatStatus.FINISHED;
} finally {
stopTimer(sample, contribution.getStepExecution(), status);
}
if (item == null) {
inputs.setEnd();
return RepeatStatus.FINISHED; // 🔥 Step 종료
}
inputs.add(item);
contribution.incrementReadCount();
return RepeatStatus.CONTINUABLE;
}
});
return inputs;
}
- repeatOperations.iterate()를 통해 Chunk 단위로 반복 실행됨.
- read(contribution, inputs);를 호출하여 ItemReader.read() 실행됨.
- inputs.setEnd()이 호출되면 Step 종료.
- 예외 발생 시 RepeatStatus.FINISHED를 반환하여 Step 중단.
4. ItemReader.read()가 실행되는 위치
Spring Batch에서 ItemReader.read()는 doExecute() 내부에서 직접 호출되지 않습니다.
대신 ChunkProvider가 실행되는 ChunkTransactionCallback 내부에서 실행됩니다.
@Override
public RepeatStatus doInTransaction(TransactionStatus status) {
try {
try {
result = tasklet.execute(contribution, chunkContext);
if (result == null) {
result = RepeatStatus.FINISHED;
}
}
catch (Exception e) {
if (transactionAttribute.rollbackOn(e)) {
chunkContext.setAttribute(ChunkListener.ROLLBACK_EXCEPTION_KEY, e);
throw e;
}
}
}
}
- Step 자체는 데이터를 직접 읽지 않고, Chunk 단위로 ItemReader.read()를 호출하여 데이터를 가져옴.
- ItemReader → ItemProcessor → ItemWriter가 실행되며 Chunk 단위로 트랜잭션이 관리됨.
5. 실제 코드
public class InputCheckReader implements ItemReader<aDto> {
private List<a> a;
private int currentIndex = 0;
@Override
public ControlPointInputCheckDto read() {
if (a == null) {
a = b.getAllByIsDeletedFalse(); // a 초기값 설정
}
while (currentIndex < a.size()) { // a 사이즈 만큼 반복
currentIndex++
return aDto.builder()
.createdAt(TimeUtil.now())
.createdBy("SYSTEM")
.build();
}
return null; // 읽을 데이터가 없을 떄 null 반환
}
}
- 대략 이런식으로 흘러간다.
- 리스트를 초기화 하고
- 리스트의 사이즈만큼 반복을 돌고 반환할게 없으면 null을 반환해 Batch를 종료한다.
null을 반환했을때 종료가 된다는 특징을 잘 활용하면 어떤 customReader도 잘 만들수 있다라고 생각합니다.
주의점
- temReader는 한 번에 하나의 데이터를 반환해야는걸 권장한다.
- List, Set 등의 Collection 객체를 반환하는 것은 피하는게 좋다.
- Collection을 반환하면 ItemProcessor와 ItemWriter가 리스트 전체를 하나의 객체로 인식할 가능성이 있음.
- Chunk Size(예: 100)를 초과하는 데이터가 한 번에 처리될 수 있어 예상과 다른 동작을 초래할 수 있음.
- 만약 한 번에 많은 데이터를 가져와야 한다면, Iterator 또는 Queue를 활용하여 하나씩 반환하는 방식으로 변환하는게 좋다.
@Component
public class CustomItemReader implements ItemReader<Data> {
private Iterator<Data> dataIterator;
@Autowired
private JdbcTemplate jdbcTemplate;
@Override
public Data read() throws Exception {
if (dataIterator == null || !dataIterator.hasNext()) {
List<Data> dataList = jdbcTemplate.query("SELECT * FROM my_table", new BeanPropertyRowMapper<>(Data.class));
dataIterator = dataList.iterator();
}
return dataIterator.hasNext() ? dataIterator.next() : null;
}
}
정리
- ItemReader.read()는 ChunkProvider.provide() 내부에서 반복적으로 호출되며 데이터를 가져오는 역할을 수행함.
- 데이터가 존재하는 경우, Chunk<I> 객체에 추가되며, RepeatStatus.CONTINUABLE을 반환하여 계속 처리함.
- ItemReader.read()가 null을 반환하면, inputs.setEnd()를 호출하여 Chunk 처리를 종료하고 Step 종료 신호를 보냄.
- Step 실행 흐름에 따라 tasklet.execute() 내부에서 직접 호출될 수도 있으며, 구현 방식에 따라 유연하게 동작 가능함.
'트러블슈팅 > 회사' 카테고리의 다른 글
코틀린에서 파이썬 코드 실행하기 (2) | 2024.12.28 |
---|---|
modbus 데이터 가져오기 (1) | 2024.12.28 |
Modbus protocol (2) | 2024.12.28 |
loki 적용하기 (0) | 2024.11.04 |
로그 수집 및 에러 알림 만들기 (0) | 2024.10.18 |