첫 번째 요리. Spring Boot를 활용한 반정형 데이터 수집 애플리케이션입니다.
이 애플리케이션은 요청받은 데이터를 여러 형태의 저장소(로컬 파일, DB 등)에 저장하는 역할을 합니다.
요구사항
- POST 요청을 통해 수집할 데이터를 받음
- 한 시간마다 수집한 데이터를 로컬에 파일로 저장(롤링)
- File I/O 는 느리고, 요청마다 파일에 데이터를 쓰면 버틸 수 없기 때문에 임시로 메모리에 쌓고 비동기로 파일에 Write
조리 과정에서 여러 차례의 설계 수정과 리팩터링이 있었습니다. 이제 각 단계별로 설명해드리도록 하겠습니다.
단계별 레시피(소스 코드)도 함께 있으니 clone 받아 따라 해 보세요!
1. 초기 개발
우선은 요구사항에만 집중해 최대한 단순한 형태로 설계했습니다. 각종 Validation과 응답 데이터도 생략했습니다.
구현
컨트롤러
클라이언트로부터 수집 요청을 받아 메모리 저장소에 데이터를 저장합니다.
@Tag(name = "데이터 수집")
@RestController
@RequestMapping("/data")
public class DataCollectController {
@PostMapping
@Operation(summary = "데이터 수집 요청")
public CommonResponse<DataCollectRespDto> collectRequest(@RequestBody DataCollectReqDto dataCollectReqDto) {
InMemoryRepository.save(dataCollectReqDto);
return CommonResponse.of(new DataCollectRespDto());
}
}
메모리 저장소
임시 데이터를 메모리상에 저장합니다. static으로 구성해 static 메모리 영역에 할당되도록 했습니다.
public class InMemoryRepository {
private static Set<CollectedDataDto> repository = new HashSet<>();
public static boolean save(DataCollectReqDto dataCollectReqDto) {
return repository.add(new CollectedDataDto()
.setProdType(dataCollectReqDto.getProdType())
.setDataType(dataCollectReqDto.getDataType())
.setData(dataCollectReqDto.getData()));
}
public static Set<CollectedDataDto> getData() {
return Set.copyOf(repository);
}
public static void removeData(Set<CollectedDataDto> deleteData) {
repository.removeAll(deleteData);
}
}
getData()의 경우, repository 인스턴스가 아닌 데이터를 반환하는 것을 의미한다고 생각해 Set.copyOf()를 사용해 복제본을 반환해주었습니다.
파일 쓰기가 완료되면, 쓰인 임시 데이터를 제거하기 위해 임시 데이터 Set을 파라미터로 해당 데이터만을 제거하도록 removeData()를 구현했습니다.
파일 생성 스케쥴러
1시간 주기로 메모리 저장소에 있는 임시 데이터를 파일로 생성하고 메모리 저장소에서 임시 데이터를 제거합니다.
@Slf4j
@Service
@RequiredArgsConstructor
public class FileWriteScheduler {
private final ObjectMapper objectMapper;
private static final String FILE_PATH = "/Users/nhn/Documents";
@Scheduled(cron = "0 0 * * * *")
public void fileWriteTask() {
Set<CollectedDataDto> sourceData = InMemoryRepository.getData();
String fileName = ZonedDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss")) + ".txt";
Path path = Path.of(FILE_PATH, fileName);
if (!Files.exists(path)) {
try {
Files.createFile(path);
} catch (IOException e) {
e.printStackTrace();
}
}
try (AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
String collectData = objectMapper.writeValueAsString(sourceData);
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.put(collectData.getBytes(StandardCharsets.UTF_8));
Future<Integer> operation = fileChannel.write(byteBuffer, 0);
byteBuffer.clear();
while (!operation.isDone());
log.info("파일 생성 완료! " + path.toString());
InMemoryRepository.removeData(sourceData);
log.info("메모리 데이터 삭제 완료!");
} catch (IOException e) {
e.printStackTrace();
}
}
}
스케쥴링은 Spring Scheduler를 통해 매시 정각마다 테스트가 수행되도록 했고, 파일 생성과 데이터 쓰기는 비동기로 수행하기 위해 java.nio.Files와 java.nio.channels.AsynchronousFileChannel을 사용했습니다.
문제점
간단하게나마 요구사항을 만족하는 프로그램을 만들었지만, 문제가 있었습니다. 성능이 아주 처참했거든요. 테스트를 안 돌려봐도 알 수 있을 만큼...
🤦🏻 메모리 저장소(InMemoryRepository)를 static 변수를 사용해 구현할 경우 메모리 누수를 유발할 수 있었습니다.
InMemoryRepository에서는 static 변수에 Set 인스턴스를 할당해 임시 데이터를 저장했습니다.
자바의 메모리 공간은 크게 Static, Stack, Heap 영역으로 구분되는데, Static 영역의 경우 프로그램이 실행될 때 메모리에 한 번 할당되어 프로그램이 종료될 때까지 그 영역을 유지하며, GC에 의해 회수되지 않습니다.
때문에, GC는 HashSet 인스턴스(repository)를 회수하지 않게 되고, Heap 영역을 계속해서 차지하게 됩니다.
만약 임시로 수집되는 데이터가 매우 많아질 경우에는 더 많은 Heap 영역을 차지하게 될 것이고 끝내 OutOfMemoryError가 발생할 것입니다.
관련된 샘플 코드와 Heap size 측정 결과는 여기에서 확인하실 수 있습니다.
🤦🏻 메모리 저장소(InMemoryRepository)의 repository의 자료형에 Set은 부적절했습니다.
Set을 사용할 경우, Get → Remove 총 2개 단계를 거쳐야 하며 이는 데이터의 원자성을 해칠 수 있었습니다.
🤦🏻 메모리 저장소(InMemoryRepository)에서 데이터 조회 시 복사본을 반환하는 전략은 데이터 양에 따라 성능에 악영향을 주었습니다.
Set.copyOf 는 새로운 HashSet을 만들어 반환시켜주므로, 데이터 양이 많아지면 당연히 성능에 악영향을 주었습니다.
static <E> Set<E> copyOf(Collection<? extends E> coll) {
if (coll instanceof ImmutableCollections.AbstractImmutableSet) {
return (Set<E>)coll;
} else {
return (Set<E>)Set.of(new HashSet<>(coll).toArray());
}
}
🤦🏻 AsynchronousFileChannel을 통해 구현하고자 하는 내용은 java.nio.file.Files에 이미 구현되어있었습니다.
2. 설계 수정 및 피드백 반영
앞선 문제점 외에도 여러 가지 피드백을 받고 설계부터 수정했습니다. Service 레이어를 추가했고, 임시 데이터를 파일에 쓰는 작업을 스케쥴러로부터 분리해 추상화했습니다.
기존의 로직(파일 쓰기)을 수행하는 FileWriter는 1시간 주기로 새로운 파일을 생성하고, 데이터 쓰기 스케쥴러는 1초 주기로 Writer 구현체에 데이터 쓰기를 위임해 수행하도록 했습니다.
- 1차 설계 : 한 시간까지 메모리 저장소에 데이터를 모아두었다가 파일을 생성해 데이터를 씁니다.
- 2차 설계 : 한 시간마다 새로운 파일을 생성하며, 1초마다 메모리 저장소에서 데이터를 꺼내 파일에 씁니다.
이러한 변화로 트래픽이 많이 발생했을 때, 메모리 저장소가 너무 커져 생기는 성능 문제를 예방할 수 있게 되었습니다.
(1초마다 메모리 저장소는 비워질 테니까요)
구현
메모리 저장소
저장소를 Spring Bean으로 변경했습니다. 그리고 repository의 자료형을 LinkedBlockingQueue로 변경해 데이터를 가져오는 단계를 기존 2단계에서 1단계로 줄였고, 스레드 안정성을 보장하도록 했습니다.
/**
* <h1>메모리 임시 저장소</h1>
* 수집 요청 받은 데이터를 메모리에 임시로 저장하는 클래스
*/
@Component
public class InMemoryRepository {
private Queue<CollectedDataDto> repository = new LinkedBlockingQueue<>();
public boolean save(DataCollectReqDto dataCollectReqDto) {
return this.repository.add(new CollectedDataDto()
.setProdType(dataCollectReqDto.getProdType())
.setDataType(dataCollectReqDto.getDataType())
.setData(dataCollectReqDto.getData()));
}
/**
* 임시 저장된 모든 데이터를 조회합니다.
* @return 임시 저장된 모든 {@link CollectedDataDto 수집 데이터}
*/
public List<CollectedDataDto> getData() {
List<CollectedDataDto> rtn = new ArrayList<>();
while (!this.repository.isEmpty()) {
rtn.add(this.repository.poll());
}
return rtn;
}
}
자료형을 Queue로 변경했으니, 데이터를 poll 하면 바로 큐에서 제거될 것입니다.
Writer 인터페이스
데이터를 각각의 저장소에 쓰는 기능을 추상화해 각각의 저장소에 맞게 구현체에서 구현하도록 했습니다.
isWritable 메서드는 각 Writer 별 데이터 쓰기 작업이 가능한지 사전 체크용으로 사용됩니다.
/**
* 수집된 데이터를 저장하는 클래스를 추상화한 인터페이스
* <ul>
* <li>{@link io.hoon.datacollector.writers.FileWriter} 데이터를 로컬 파일로 저장하는 구현체</li>
* <li>{@link io.hoon.datacollector.writers.DBEachRowWriter} 데이터를 DB에 각각의 Row 로 저장하는 구현체</li>
* </ul>
*/
public interface Writer {
/**
* 현재 데이터 작성이 가능한지 여부를 반환합니다.
*
* @return 데이터 작성 가능 여부
*/
boolean isWritable();
/**
* 데이터를 작성합니다.
*
* @param dataCollection 작성할 데이터 컬렉션
* @throws Exception
*/
void write(Collection<CollectedDataDto> dataCollection) throws Exception;
}
FileWriter 클래스
기존의 로직(로컬 저장소에 파일로 데이터를 쓰기)을 수행하는 Writer 구현체 클래스를 작성했습니다.
1시간 주기로 파일을 생성하고, 쓰기(write) 메서드 호출 시 현재 파일 마지막 라인에 이어 쓰도록 했습니다.
/**
* <h1>로컬 파일 저장 클래스</h1>
* 전달받은 수집 데이터를 {@code .txt} 파일로 저장합니다. 데이터가 작성되는 파일은 1시간 마다 새로 생성됩니다.
*
* @see io.hoon.datacollector.writers.Writer
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class FileWriter implements Writer{
private static final String SEPARATOR = "\n";
private final ObjectMapper objectMapper;
@Value("${writer.file.root-path:/}")
private String fileRootPath;
private Path filePath;
@Override
public boolean isWritable() {
return this.filePath != null && Files.exists(this.filePath);
}
@Override
public void write(Collection<CollectedDataDto> dataCollection) throws IOException {
String valueAsString = objectMapper.writeValueAsString(dataCollection) + SEPARATOR;
Files.write(this.filePath, valueAsString.getBytes(StandardCharsets.UTF_8), StandardOpenOption.APPEND);
}
/**
* 1시간 주기로 데이터를 작성할 파일을 생성합니다.
*/
@Scheduled(fixedDelay = 1000 * 60 * 60L)
public void createFileTask() throws IOException {
createFile();
}
/**
* 데이터를 작성할 파일을 생성합니다.
*
* @throws IOException
*/
private void createFile() throws IOException {
String fileName = createFileName();
Path path = Path.of(this.fileRootPath, fileName);
if (!Files.exists(path)) {
this.filePath = Files.createFile(path);
log.info("파일이 생성되었습니다.");
} else {
this.filePath = path;
log.info("이미 파일이 존재해 해당 파일을 사용합니다.");
}
}
private String createFileName() {
return ZonedDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HHmmss")) + ".txt";
}
}
앞선 문제점을 해결하고자, AsynchronousFileChannel 대신 nio.file.Files.write()를 사용했습니다.
Files는 Non-blocking I/O로 작업 스레드가 블록킹 되지 않기 때문에 성능에 도움이 됩니다.
자세한 내용은 여기를 참고해주세요!
데이터 쓰기 스케쥴러
1초마다 메모리 저장소에서 데이터를 가져와 Writer 인터페이스 구현체에 쓰기 작업을 위임했습니다.
/**
* <h1>데이터 작성 스케쥴러</h1>
* 1초마다 {@link io.hoon.datacollector.repository.InMemoryRepository 임시 저장소}에 저장된 {@link CollectedDataDto 수집 데이터}를 구현된 {@link Writer} 클래스를 통해 작성합니다.<br>
* 아래의 경우 데이터를 작성하지 않고 스케쥴이 종료됩니다.
* <ul>
* <li>구현된 {@link Writer} Bean 이 존재하지 않는 경우</li>
* <li>모든 {@link Writer} Bean 이 작성 가능한 상태가 아닌 경우</li>
* <li>작성할 {@link CollectedDataDto 수집 데이터}가 존재하지 않는 경우 </li>
* </ul>
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class DataWriteScheduler {
private final DataCollectorService dataCollectorService;
private final List<Writer> writers;
@Scheduled(fixedDelay = 1000L)
public void writeDataTask() throws Exception {
Optional<Writer> anyWritableWriter = writers.stream().filter(Writer::isWritable).findAny();
if (anyWritableWriter.isEmpty()) {
log.info("현재 작성 가능한 Writer 가 존재하지 않아 건너뜁니다.");
return;
}
List<CollectedDataDto> collectedData = dataCollectorService.getData();
if (collectedData.isEmpty()) {
log.debug("작성할 데이터가 존재하지 않아 건너뜁니다.");
return;
}
for (Writer writer : writers) {
if (!writer.isWritable()) {
continue;
}
writer.write(collectedData);
}
log.info("데이터를 작성했습니다.");
}
}
SchedulerConfig
Spring Scheduler의 스레드 풀 기본 사이즈가 1 이기 때문에 파일 생성 스케쥴러와 데이터 작성 스케쥴러가 동일 스레드에서 실행될 것입니다. 그러면 데이터 작성 테스크가 소요되는 시간이 커질 수록 파일 생성 스케쥴러는 지체될 수 있기 때문에 쓰레드 풀 사이즈를 조정해주었습니다.
/**
* <h1>스케쥴러 Thread 설정 클래스</h1>
* {@link io.hoon.datacollector.writers.FileWriter} 의 파일 생성 스케쥴과 {@link io.hoon.datacollector.scheduler.DataWriteScheduler} 의 데이터 작성 스케쥴이 동일 Thread 에서 동작할 경우,
* 데이터 작성 테스크가 시간이 오래 소요될 경우 파일 생성 테스크가 지체될 수 있다고 생각해 Spring Scheduler 의 Thread Pool Size 를 조정(기본값 1)
*/
@Configuration
public class SchedulerConfig implements SchedulingConfigurer {
private static final int POOL_SIZE = 5;
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPoolSize(POOL_SIZE);
threadPoolTaskScheduler.setThreadNamePrefix("schedule-thread-pool-");
threadPoolTaskScheduler.initialize();
taskRegistrar.setTaskScheduler(threadPoolTaskScheduler);
}
}
문제점
많은 문제를 해결했지만 약간 부족한 부분이 여전히 있었습니다.
🤦🏻 Queue.add()의 경우 예외를 던지므로 응답 데이터에 결과를 반환하려면 Queue.offer()를 써야 했습니다.
Queue.add()는 데이터를 큐에 넣는 것을 실패했을 때 IllegalStateException을 던지고, 성공했을 때만 true를 반환했습니다.
Inserts the specified element into this queue if it is possible to do so immediately without violating capacity restrictions, returning true upon success and throwing an IllegalStateException if no space is currently available.
반면, Queue.offer()는 추가 결과를 성공/실패와 상관없이 boolean으로 리턴해줍니다.
return : true if the element was added to this queue, else false
🤦🏻 while문보다는 LinkedBlockingQueue.drainTo()를 사용하는 것이 나았습니다.
drainTo() 메서드는 큐를 블록킹하고 전체 요소를 poll 해주는 메서드로, polling을 반복하는 것보다 좋은 효율을 보여줍니다.
3. 피드백 반영
앞선 문제점을 해결했습니다. 1차적으로 개발이 완료되었다고 볼 수 있겠습니다. :)
구현
메모리 저장소
기존 Queue.add()를 Queue.offer()로 변경하고, while문을 drainTo()로 변경했습니다.
/**
* <h1>메모리 임시 저장소</h1>
* 수집 요청 받은 데이터를 메모리에 임시로 저장하는 클래스
*/
@Component
public class InMemoryRepository {
private Queue<CollectedDataDto> repository = new LinkedBlockingQueue<>();
public boolean save(DataCollectReqDto dataCollectReqDto) {
return this.repository.offer(new CollectedDataDto()
.setProdType(dataCollectReqDto.getProdType())
.setDataType(dataCollectReqDto.getDataType())
.setData(dataCollectReqDto.getData()));
}
/**
* 임시 저장된 모든 데이터를 조회합니다.
* @return 임시 저장된 모든 {@link CollectedDataDto 수집 데이터}
*/
public List<CollectedDataDto> getData() {
List<CollectedDataDto> rtn = new ArrayList<>();
((LinkedBlockingQueue) this.repository).drainTo(rtn);
return rtn;
}
}
4. 이벤트 Pub-Sub
한 단계 더 발전시켜서, 스케쥴러로 데이터 쓰기를 유발하기보다 데이터가 메모리 저장소에 저장될 때 이벤트를 발생시켜서 데이터를 쓰도록 변경했습니다. 기존 데이터 쓰기 스케쥴러는 지우기에는 아까워서 옵션으로 변경했습니다.
구현
데이터 수집 서비스
메모리 저장소에 임시 데이터 저장이 성공한 경우, DataCollectEvent를 발행하도록 했습니다.
/**
* <h1>데이터 수집 서비스</h1>
* {@link DataCollectReqDto 수집 요청 받은 데이터}를 가공해 {@link InMemoryRepository 임시 저장소}에 저장하고, {@link CollectedDataDto 수집 데이터}를 가져옵니다.
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class DataCollectorService {
private final InMemoryRepository inMemoryRepository;
private final ApplicationEventPublisher applicationEventPublisher;
/**
* 요청 받은 데이터를 임시 저장소에 저장하고 {@link DataCollectEvent 데이터 수집 이벤트}를 발행합니다.
*
* @param dataCollectReqDto 수집 요청 데이터
* @return 데이터 수집 결과
* @see io.hoon.datacollector.manager.DataWriteManager
*/
public DataCollectRespDto collectData(DataCollectReqDto dataCollectReqDto) {
// 데이터 임시 저장
boolean result = inMemoryRepository.save(dataCollectReqDto);
// 이벤트 발행
if (result) {
applicationEventPublisher.publishEvent(new DataCollectEvent());
log.info("이벤트를 발행했습니다.");
}
return new DataCollectRespDto()
.setProdType(dataCollectReqDto.getProdType())
.setDataType(dataCollectReqDto.getDataType())
.setSuccess(result);
}
public List<CollectedDataDto> getData() {
return inMemoryRepository.getData();
}
}
발행한 DataCollectEvent는 데이터 수집 이벤트 핸들러에서 감지하게 됩니다.
데이터 수집 이벤트 핸들러
DataCollectEvent를 감지해 데이터 쓰기 매니저에 데이터 쓰기 작업을 지시합니다.
/**
* <h1>데이터 수집 이벤트 핸들러</h1>
* {@link DataCollectEvent 데이터 수집 이벤트}가 발생했을때, {@link DataWriteManager 데이터 작성 매니저}의 데이터 작성 메서드를 호출합니다.
*
* @see DataCollectEvent
* @see DataWriteManager
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class DataCollectEventHandler {
private final DataWriteManager dataWriteManager;
/**
* 데이터 수집 이벤트를 핸들링합니다.
* Spring 의 ApplicationEventListener 가 비동기가 아니므로, @Async 애너테이션으로 비동기 처리를 해주도록 합니다.
* @param dataCollectEvent 데이터 수집 이벤트
* @throws Exception
*/
@Async
@EventListener
public void dataCollectEventListener(DataCollectEvent dataCollectEvent) throws Exception {
log.info("이벤트를 감지했습니다.");
dataWriteManager.writeDataTask();
}
}
이벤트 리스너 메서드는 Spring의 ApplicationEvenListener가 비동기가 아니기 때문에 @Async 애너테이션을 추가해 비동기 처리가 가능하도록 해주었습니다.
데이터 쓰기 매니저
Writer 구현체들에게 데이터 쓰기 작업을 요청합니다. 기존의 데이터 쓰기 스케쥴러 로직이 옮겨왔습니다.
/**
* <h1>데이터 작성 매니저</h1>
* {@link io.hoon.datacollector.repository.InMemoryRepository 임시 저장소}에 저장된 {@link CollectedDataDto 수집 데이터}를 구현된 {@link Writer} 클래스를 통해 작성합니다.<br>
* 아래의 경우 데이터를 작성하지 않고 종료됩니다.
* <ul>
* <li>구현된 {@link Writer} Bean 이 존재하지 않는 경우</li>
* <li>모든 {@link Writer} Bean 이 작성 가능한 상태가 아닌 경우</li>
* <li>작성할 {@link CollectedDataDto 수집 데이터}가 존재하지 않는 경우 </li>
* </ul>
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class DataWriteManager {
private final DataCollectorService dataCollectorService;
private final List<Writer> writers;
@Async
public void writeDataTask() throws Exception {
Optional<Writer> anyWritableWriter = writers.stream().filter(Writer::isWritable).findAny();
if (anyWritableWriter.isEmpty()) {
log.info("현재 작성 가능한 Writer 가 존재하지 않아 건너뜁니다.");
return;
}
List<CollectedDataDto> collectedData = dataCollectorService.getData();
if (collectedData.isEmpty()) {
log.debug("작성할 데이터가 존재하지 않아 건너뜁니다.");
return;
}
for (Writer writer : writers) {
if (!writer.isWritable()) {
continue;
}
writer.write(collectedData);
}
log.info("데이터를 작성했습니다.");
}
}
이벤트 핸들러와 마찬가지로 비동기 작업이 가능하게끔 @Async 애너테이션을 추가했습니다.
데이터 쓰기 스케쥴러
scheduler.data-write.enabled 프로퍼티가 true 일 때만 빈이 생성되도록 변경했습니다.
/**
* <h1>데이터 작성 스케쥴러</h1>
* 1초마다 {@link DataWriteManager 데이터 작성 메니저}의 데이터 작성 메서드를 호출합니다.<br>
* {@code scheduler.data-write.enabled} 속성이 true 이어야 동작합니다.
* @see DataWriteManager
*/
@Slf4j
@Service
@RequiredArgsConstructor
@ConditionalOnProperty(value = "scheduler.data-write.enabled", matchIfMissing = false, havingValue = "true")
public class DataWriteScheduler {
private final DataWriteManager dataWriteManager;
@Scheduled(fixedDelay = 1000L)
public void writeDataTask() throws Exception {
log.info("스케쥴 테스크를 실행합니다.");
dataWriteManager.writeDataTask();
}
}
SchedulerConfig
scheduler.data-write.enabled 프로퍼티가 true 일때만 빈이 생성되도록 변경했습니다.
/**
* <h1>스케쥴러 Thread 설정 클래스</h1>
* {@link io.hoon.datacollector.writers.FileWriter} 의 파일 생성 스케쥴과 {@link io.hoon.datacollector.scheduler.DataWriteScheduler} 의 데이터 작성 스케쥴이 동일 Thread 에서 동작할 경우,
* 데이터 작성 테스크가 시간이 오래 소요될 경우 파일 생성 테스크가 지체될 수 있다고 생각해 Spring Scheduler 의 Thread Pool Size 를 조정(기본값 1)<br>
* {@code scheduler.data-write.enabled} 속성이 true 이어야 동작합니다.
*/
@Configuration
@ConditionalOnProperty(value = "scheduler.data-write.enabled", matchIfMissing = false, havingValue = "true")
public class SchedulerConfig implements SchedulingConfigurer {
private static final int POOL_SIZE = 5;
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPoolSize(POOL_SIZE);
threadPoolTaskScheduler.setThreadNamePrefix("schedule-thread-pool-");
threadPoolTaskScheduler.initialize();
taskRegistrar.setTaskScheduler(threadPoolTaskScheduler);
}
}
문제점
이번 작업에서는 비동기와 동시성 관련해 피드백을 많이 받았습니다. 동시성은 항상 머리를 아프게 하네요...😫
🤦🏻 이벤트 발행을 명시적으로 하기보다는 AOP를 사용하는 것도 좋을 것 같습니다.
관심사 분리와 애플리케이션 확장성을 고려한 개선이 가능해 보입니다.
🤦🏻 메모리 저장소에서 데이터 저장 시 결과를 리턴해 클라이언트에서의 처리를 늘리는 것보다 저장이 될 때까지 대기하는 것이 클라이언트 입장에서 간편하고 좋을 것 같습니다.
이 애플리케이션을 사용하는 클라이언트 입장에서는 결과에는 신경 쓰지 않고 수집할 데이터를 보내기만 하길 기대할 것 같습니다.
그러한 측면에서 실패 시 결과를 리턴하기보다는 저장이 될 때까지 대기하다가 저장시키는 것이 좋을 것 같습니다.
LinkedBlockingQueue.put()을 사용하면 큐가 꽉 차있을 경우 지정된 시간 동안 대기하면서 요소를 추가합니다.
🤦🏻 비동기를 이중으로 사용하는 것은 매우 위험해 보입니다. 병렬 처리가 늘어날수록 동시성 이슈에 취약해지기 때문에 비동기를 위해 @Async를 사용하는 것이 아니라 비동기로 해결해야 하는 것인가를 고민해야 할 것 같습니다.
데이터 쓰기 매니저(DataWriteManager)의 writeDataTask()가 비동기일 경우, Writer 구현체의 isWritable() 구현에 따라 동시성 이슈가 발생할 여지가 있고, 이 작업이 비동기로만 해결되야하는지 의문입니다. 현재 상태에서는 데이터 수집 이벤트 핸들러(DataCollectEventHandler)의 리스너 메서드만 비동기로 처리하면 충분할 것 같습니다.
No Silver Bullet
5. 피드백 반영
앞선 피드백을 받아 비동기 처리 범위를 조정하고, AOP 처리를 추가했습니다.
구현
PublishDataCollectEvent 애너테이션
데이터 수집 이벤트를 발생시키는 메서드를 설정하기 위한 애너테이션을 생성했습니다.
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface PublishDataCollectEvent {
}
AopAdvisor
PublishDataCollectEvent 애너테이션이 붙은 메서드의 AOP 처리를 작성했습니다.
@Slf4j
@Aspect
@Component
@RequiredArgsConstructor
public class AopAdvisor {
private final ApplicationEventPublisher applicationEventPublisher;
/**
* PublishDataCollectEvent 어노테이션이 붙은 메서드의 AOP 처리
*
* @param joinPoint
* @return
* @throws Throwable
*/
@Around("@annotation(PublishDataCollectEvent)")
public Object eventPublish(ProceedingJoinPoint joinPoint) throws Throwable {
Object proceed = joinPoint.proceed();
if (proceed != null) {
applicationEventPublisher.publishEvent(new DataCollectEvent());
log.info("이벤트를 발행했습니다.");
}
return proceed;
}
}
데이터 쓰기 매니저
writeDataTask()에 붙였던 @Async 애너테이션을 제거했습니다.
메모리 저장소
Queue.offer()를 LinkedBlockingQueue.put()으로 변경했습니다.
마치며
이렇게 다섯 번의 과정을 거쳐 반 정형 데이터 수집 애플리케이션을 개발했습니다. 더 좋은 방법도 물론 있겠지만, 이것도 나름 나쁘지 않은 것 같습니다. (그랬으면...)
다음 포스팅에서는 로컬 환경에서 Docker를 이용해 ELK Stack을 구축하고 이 애플리케이션을 통해 생성된 파일을 수집해보도록 하겠습니다.
감사합니다.
'📦 ETC > TOY PROJECT' 카테고리의 다른 글
[찍어먹기] Spring Boot 부터 ELK Stack 까지 :: 트러블슈팅 (1) - Elasticsearch (0) | 2022.03.17 |
---|---|
[찍어먹기] Spring Boot 부터 ELK Stack 까지 :: 인증인가 처리 (0) | 2022.03.07 |
[찍어먹기] Spring Boot 부터 ELK Stack 까지 :: 데이터 수집해서 시각화 하기 (2) (0) | 2022.01.30 |
[찍어먹기] Spring Boot 부터 ELK Stack 까지 :: 데이터 수집해서 시각화 하기 (1) (0) | 2022.01.30 |