서론
그동안 글을 많이 못썼다.. 회사에서는 쿼리 튜닝하느라 바빴고.. 사이드 프로젝트는 애들이 각자 바빠서 작업을 못했기 때문.. 그러다 회사에서 상담신청 개선 작업을 진행하게 되면서 사용자 로그 및 1:1문의와 관련하여 개인정보 처리방침에 의거 사용자의 민감 정보에 대한 처리가 필요하다고 생각하여 작업을 진행하였다.
본론
기능 요구사항을 정리 하였을때, 내용은 크게 어렵지 않았다.
회사의 개인정보 처리방침에 따라
- 사용자 로그는 최대 3개월동안 저장 가능하였고,
- 1:1문의에서 사용되는 개인정보 (이메일, 휴대폰 번호)는 6개월동안 저장이 가능하였다.
위의 요구사항을 토대로 매일 새벽의 특정 시간에, 로그 테이블과 1:1 문의 테이블에서 데이터의 생성 시각을 확인하여 특정 일자가 지난 데이터들을 삭제시켜주거나 (delete) 공백 처리하는(update) 작업이 필요하였다.
- 매일 새벽의 특정 시간에 작업을 하기 위해 스케줄링이라는 기술을 통해 정기적으로 특정 업무를 실행시킬 필요가 있었고,
- 데이터의 생성 시각을 확인하여 특정 일자가 지난 데이터들을 삭제시키거나 공백을 처리하기 위해서는 단순히 delete 혹은 update 쿼리를 실행시키는 방법도 있지만, 데이터의 양이 많고 작업이 복잡한 경우 배치라는 기술을 이용해서 작업의 안정성을 높일수 있었다.
따라서 데이터의 양이 많은 사용자 로그쪽은 스케줄링 + 배치를 적용하여 주기적으로 데이터를 처리하고자 하였고,
데이터의 양이 적은 1:1 문의쪽은 스케줄링을 통해서 작업을 처리하고자 하였다.
💡 스케줄링이란?
• 일정한 시간 간격으로 반복적인 작업을 수행하는 도구
• 스케줄링을 이용하면 작업을 자동으로 수행하거나 주기적 혹은 일정 시간이 지난 후에 작업을 수행할 수 있어서 효율적인 작업 관리가 가능
💡 배치 프로그래밍이란?
• '대량의 데이터를 처리하는 작업을 자동화'하는 프로그램을 의미
💡스케줄링 + 배치 프로그래밍
- 설정한 일정 주기로 특정 배치 프로그램이 돌아가게끔 하는 것을 의미
회사 프레임워크는 SpringBoot를 사용하고 있었기 때문에
Spring에서 제공하는 Spring Scheduler와 Spring Batch를 사용하고자 하였다.
Spring Scheduler
스프링 부트에서 Scheduler를 사용하기 위해서는 스케줄러를 사용하고자 할 클래스에 다음과 같은 설정이 필요한데,
- @EnableScheduling , @Configuration 어노테이션 등록
- 해당 스케줄러의 빈을 등록하여 Spring Context에서 찾아 사용 할 수있도록 설정
- 작업 단위에 @Scheduled 어노테이션과 cron 옵션을 통해 주기와 반복 단위를 설정
내부적인 작동 원리를 간략하게 설명하자면
1. @EnableScheduling 어노테이션 내부의 클래스에서
2. @Scheduled 어노테이션이 부여된 메서드를 가진 빈을 스프링 컨텍스트에서 찾아서 작업을 진행 한다
정도로 이해 하면 좋을 것 같다.
spring scheduler 작동 원리 (velog.io)
Spring Batch
스프링 부트에서 Batch를 사용하기 위해서는 다음과 같은 설정이 필요하다.
- 메타 데이터 테이블 설정
- BatchConfig 파일 생성 및 설정
- @EnableBatchProcessing, @Configuration 어노테이션 등록
- Job 및 하위 속성
- Step
- tasklet
- reader, processor, writer
- Step
- 해당 Batch를 사용하는 클래스에서의 설정
- jobLauncher DI
- BatchConfig DI
- Batch를 작동할 메서드
- job에 인자를 넘겨줄 jobParamter 설정
- job을 실행시키는 run 메서드 설정
Spring Batch의 작업 단위
SpringBatch에서는 Job이라는 하나의 작업단위를 Step별로 쪼개어 단계별로 진행을 시킬 수 있고,
Step은 Tasklet 혹은 (Reader, Processor, Writer)의 세트로 나눌 수 있다. (실제 비즈니스 로직 처리 담당)
또한 Spring Batch는 연동된 DB에 메타 데이터라는 테이블을 참조하여
- 이전에 실행한 Job이 어떤 것들이 있는지
- 최근 실패한 Batch Parameter가 어떤것들이 있고, 성공한 Job은 어떤것들이 있는지
- 다시 실행한다면 어디서 부터 시작하면 될지
- 어떤 Job에 어떤 Step들이 있었고, Step들 중 성공한 Step과 실패한 Step들은 어떤것들이 있는지
등의 정보를 확인 후 작업을 실행하기 때문에 메타 데이터 테이블을 직접 만들어주거나,
properties를 통해 생성을 해주어야 한다.
따라서 나는 application.properties 설정을 통해 자동으로 메타 데이터 테이블을 생성해 주었다.
spring.batch.job.enabled=false
spring.batch.initialize-schema=always
properties 설정이 되면, 다음과 같은 메타 데이터 테이블이 생성되는것을 확인 할 수 있다.
메타 데이터 테이블 정보
- BATCH_STEP_EXECUTION
- STEP의 실행 이력에 대한 정보를 담고있는 테이블
- BATCH_STEP_EXECUTION_CONTENT
- BATCH_JOB_INSTANCE
- Job Instance가 생성된 정보를 저장하는 테이블
- Job Instance가 같아도, Job Parameter가 다르면 다른 인스턴스로 분류 -> Job Parameter에 따라 생성
- 이미 존재하는 (사용했던) Job Parameter로 실행시 , 오류가 생김 -> JobInstanceAlreadyCompleteException
- BATCH_JOB_EXECUTION
- JOB_EXECUTION은 자신의 부모 JOB_INSTANCE가 실행 이력(성공, 실패했던 정보)을 담고있는 테이블
- JOB_EXECUTION과 JOB_INSTANCE는 부모 - 자식 관계 - BATCH_JOB_EXECUTION_CONTEXT
- BATCH_JOB_EXECUTION_PARAMS
- BATCH_JOB_EXECUTION 생성 당시 입력받은 Job Parameter의 정보를(String, Long, Double등의 데이터) 담고있는 테이블
이제 Batch Config 클래스를 생성하여 배치의 실질적인 구현을 진행해야하는데,
생성 전 Batch의 기본적인 내용에 대해 확인 하고 작업을 진행하였다.
Job 구현
Job Flow
- Job의 실질적인 비즈니스 로직의 구현은 Step에서 진행
- 해당 Step들의 flow는 Job 생성시 구현
- start()
- 매개변수로 Step을 넣으면 SimpleJobBuilder를 반환
- on()
- FlowBuilder의 이너 클래스인 TransitionBuilder 반환
- 조건으로 설정 할 ExitStatus를 지정한다. * 일 경우 모든 Status가 지정된다.
- to()
- FlowBuilder 반환
- 다음으로 이동할 step 지정
- from()
- FlowBuilder를 반환
- 추가적인 이벤트 캐치를 위한 이벤트 리스너 역할을 한다.
- end()
- FlowBuilder를 반환하는 end()와 FlowBuilder를 종료(FlowJobBuilder 반환)하는 end()가 있다.
- TransitionBuilder타입의 end() -> FlowBuilder 반환
- FlowBuilder 타입의 end() -> FlowJobBuilder 반환 (종료)
- FlowBuilder를 반환하는 end()와 FlowBuilder를 종료(FlowJobBuilder 반환)하는 end()가 있다.
- build()
- 최종적으로 Job 클래스의 인스턴스를 반환한다.
- start()
JobParameter, Scope
- JobParameter를 사용하기 위해선 Spring Batch 전용 Scope를 선언해주어야 한다.
- JobParameter : 외부 혹은 내부에서 파라미터를 받아 여러 Batch 컴포넌트에서 사용 할 수 있도록 지원하는 파라미터
- Double, Long, Date, String의 타입이 있고, LocalDate, LocalDateTime은 없어 String으로 변환해서 사용해야
- Scope : JobParameter가 사용되는 범위
- @JobScope : Job 범위 내에서 사용되는 어노테이션 (Step에 선언)
- @StepScope : Step 범위 내에서 사용되는 어노테이션 (Tasklet, ItemReader, ItemWriter, ItemProcessor)
- 실행 시점 : Spring Batch가 Spring 컨테이너를 통해 지정된 Step, Job의 실행 시점에 @xxxScope로 선언된 해당 컴포넌트를 Spring Bean으로 생성한다.
- Late Binding이 가능해진다 -> JobParameter를 실행 시점이 아닌 비즈니스 로직 처리 단계(Service 등)에서 Job Parameter를 할당 할 수 있다.
- 동일한 컴포넌트를 병렬 혹은 동시에 사용 할 때 서로의 상태를 침범 할 일이 없어진다.
- @StepScope, @JobScope Bean을 생성 할 때에만 Job Parameters가 생성된다.
- JobParameter : 외부 혹은 내부에서 파라미터를 받아 여러 Batch 컴포넌트에서 사용 할 수 있도록 지원하는 파라미터
- Late Binding으로 인하여 실행 시점에 값이 결정되므로 null을 설정해주고 @Value를 통해 Job Parameter를 설정한다.
Step 구현
Step은 Tasklet 단위로 처리가 되고,
Tasklet중 ChunkOrientedTasklet을 통해 Chunk를 통해 처리하며,
이의 구성 요소는 ItemReader, ItemWriter, ItemProccesor가 있다.
또한 Chunk 단위로트랜잭션을 처리를 하는데 실패 할 경우 해당 Chunk 만큼만 롤백이 되고,
이전에 커밋된 트랜잭션 범위까지는 반영이 된다.
위와 같이 Chunk 단위로 데이터를 처리한다. (그림은 개별 item이 처리되는 것만 다루고 있음)
Reader와 Processor을 통해 1건씩 데이터를 읽고 가공한 뒤, Writer에서 Chunk 단위로 작업을 수행한다.
for(int i=0; i<totalSize; i+=chunkSize){ // chunkSize 단위로 묶어서 처리
List items = new Arraylist();
for(int j = 0; j < chunkSize; j++){
Object item = itemReader.read()
Object processedItem = itemProcessor.process(item);
items.add(processedItem);
}
itemWriter.write(items);
}
java code로 표현하자면 위와 같다.
Reader에는 PageSize를 설정해주는데, ChunkSize와는 결론적으로 말하자면 의미는 다르지만 값은 같게 설정하는것이 성능 향상에 좋다고 한다.
- PagingSize란 말그대로 데이터를 읽어올 때 한 번에 몇개의 데이터를 가져오는지 설정하는 값이다
- ChunkSize란 한 번에 처리 될 트랜잭션의 단위이다.
- PagingSize가 10이고 ChunkSize가 50이면, Page조회가 5번 이루어진 후 1번의 트랜잭션이 발생한다.
-> 5번의 쿼리조회가 발생하는데, ChunkSize와 같게두면 1번의 쿼리조회로 처리가 가능하다.
-> Spring Batch의 PagingItemReader의 주석에는 상당히 큰 페이지 크기를 설정하고 페이지 크기와 일치하는 커미트 간격을 사용하면 성능이 향상됩니다. 라고 작성이 되어있다.
- 성능상 이슈 외에도, JPA를 사용한다면 영속성 컨텍스트가 깨지는 문제도 발생한다고 한다.
Processor는 Reader를 통해 데이터 조회 후, Validation체크나 후처리가 필요한 경우 사용을 한다.
Writer의 경우, DB insert 혹은 외부 API 전송 등 커스텀한 write 행위가 필요한 경우, Writer를 커스텀하게 사용하는것이 좋다.
나의 경우, 회사에서 MyBatis를 사용하고 있었고 작업이 간단했기 때문에
다음과 같이 간단하게 Reader와 Writer만을 이용하여 구현을 할 수 있었다.
@Configuration
@RequiredArgsConstructor
@EnableBatchProcessing
public class BatchConfig {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final SqlSessionFactory sqlSessionFactory;
private static final int CHUNK_SIZE = 500;
private static final String MAPPER_ID = "com.example.mapper.exampleMapper";
private static final String SELECT_QUERY_ID = "findPreviousExampleByRegTime";
private static final String DELETE_QUERY_ID = "deleteById";
@Bean
public Job makeExampleJob() {
return jobBuilderFactory.get("exampleJob")
.start(makeExampleStep())
.build();
}
@Bean
@JobScope
public Step makeExampleStep() {
return stepBuilderFactory.get("exampleStep")
.<Example, Void>chunk(CHUNK_SIZE)
.reader(reader(null))
.writer(writer())
.build();
}
@Bean
@StepScope
// jobParameter로 받은 date
public MyBatisPagingItemReader<Example> reader(@Value("#{jobParameters[date]}") String date) {
log.info("date : "+ date);
return new MyBatisPagingItemReaderBuilder()
.pageSize(CHUNK_SIZE)
.sqlSessionFactory(sqlSessionFactory)
.queryId(String.join(".", MAPPER_ID, SELECT_QUERY_ID))
.parameterValues(Collections.singletonMap("date", date))
.build();
}
@Bean
@StepScope
public MyBatisBatchItemWriter<Void> writer() {
return new MyBatisBatchItemWriterBuilder()
.sqlSessionFactory(sqlSessionFactory)
.statementId(String.join(".", MAPPER_ID, DELETE_QUERY_ID))
.itemToParameterConverter(item -> {
Example example = (Example) item;
return Collections.singletonMap("seq", example.getSeq());
})
.build();
}
}
이렇게 Batch Config 파일을 작성까지 하였고
해당 Batch를 동작시킬 Scheduler에 Batch Config 파일을 Injection 시키고
배치를 실행시키는 JobLauncher도 Injection 시켜
JobParameter을 설정 후 job을 동작시키는 로직으로 구현을 하였다.
@Configuration
@Slf4j
@RequiredArgsConstructor
@EnableScheduling
public class BatchScheduler {
private final JobLauncher jobLauncher;
private final BatchConfig batchConfig;
// 매일 새벽 3시에 실행되도록 설정
@Scheduled(cron = "0 03 00 * * ?", zone = "Asia/Seoul")
public void logCommonBatch() {
// 3개월 전의 날짜
String date = LocalDate.now().minusMonths(3).format(DateTimeFormatter.ofPattern("yyyyMMdd"));
JobParameters jobParameters = new JobParametersBuilder()
.addString("date", date)
.toJobParameters();
try {
jobLauncher.run(batchConfig.makeExampleJob(), jobParameters);
} catch (JobExecutionAlreadyRunningException | JobInstanceAlreadyCompleteException
| JobParametersInvalidException | JobRestartException e) {
log.error(e.getMessage());
}
}
}
블로그 출처
'💻Spring' 카테고리의 다른 글
Swagger 3 (springdoc-openapi-ui) 에서 @AuthenticationPrincipal 파라미터 무시 전역 설정 (0) | 2024.01.05 |
---|---|
Spring WebClient를 이용한 Open API 통신 (0) | 2023.12.11 |
[REST API] GET 메서드에서의 데이터 전달 (0) | 2023.09.25 |
Spring Security 사용자 권한에 따른 API 접근 설정하기 (0) | 2023.09.14 |
Spring Security 401,403 Custom Response 적용하기 (0) | 2023.08.24 |