- Published on
멀티스레드와 비동기 적용하기
- Authors
- Name
- zkrp
저번에 멀티쓰레드와 비동기의 기초 개념에 대해서 글을 작성했습니다.
이번 글에서는 프로젝트에 멀티쓰레드와 비동기를 실제로 적용한 경험을 공유하려 합니다.
본격적인 적용 사례를 소개하기 전에, 먼저 자바에서 멀티쓰레드와 비동기를 다룰 때 사용하는 대표적인 라이브러리들을 간단히 정리해 보겠습니다.
1. 멀티쓰레드 JAVA 라이브러리
Runnable
자바에서 쓰레드에서 실행할 작업을 정의하는 인터페이스 입니다.
- 결과를 반환할 수 없다는 한계가 있습니다.
- 반환값을 얻으려면 공용 메모리(static)이나 파이프 등을 사용합니다
- 예외 처리를 할 수 없음
Callable
Runnable 의 단점을 보완하기 위한 인터페이스 입니다.
- call() 은 반환 타입이 제네릭 V
- Callable 인터페이스의 구현체인 작업(Task)은 가용 가능한 쓰레드가 없어서 실행이 미뤄질 수 있고, 작업 시간이 오래 걸릴 수도 있습니다.
- 미래에 완료된 Callable의 반환값을 구하기 위해 사용되는 것이 Future
- Future로 인해 결과를 나중에 받거나 ,실행 상태를 추적할수 있습니다.
- 미래에 완료된 Callable의 반환값을 구하기 위해 사용되는 것이 Future
- throw Exception 예외가 선언되어 있습니다.
이러한 특징이 Runnable 보다 훨씬 편리하다는 장점이 있습니다.
Future
요청 스레드는 대기하지 않고, 다른 작업을 수행 가능하게 합니다. future.get()을 호출해서 최종 결과를 반환합니다.
- 전달한 작업의 미래 결과를 담고 있습니다.
- 동시에 쓰레드 요청이 가능합니다.
- Future.get()
- 완료 상태: Future가 완료 상태면 결과에 포함 되어 있습니다, 이 경우 요청 스레드는 대기하지 않고, 값을 즉시 반환합니다.
- Future가 완료 상태가 아닌경우 -> taskA가 아직 수행되지 않았거나 수행중
- 결과를 받기 위해 대기, 요청 스레드가 마치 락을 얻을때처럼, 결과를 얻기 위해 대기
- 블로킹 메서드라고 함 - 스레드가 작업을 바로 수행하지 않고 , 다른 작업이 완료 될때까지 대기
Future는 결과를 비동기적으로 받을 수 있지만, 여러 Future를 조합하거나 예외 처리를 다루기 어렵습니다. 이 한계를 해결하기 위해 CompletableFuture가 도입되었습니다.
Executor
등록된 작업을 실행하기 위한 인터페이스입니다.
- 등록된 작업(Runnable)을 실행하기 위한 인터페이스 , 작업 등록과 작업 실행 중에서 작업 실행만을 책임집니다.
- 스레드 생성/관리는 직접 하지 않습니다.
ExecutorService
작업 등록 뿐만 아니라 실행을 위한 책임도 가지는 인터페이스 입니다.
- 작업(Runnable, Callable)을 등록해서 스레드 풀에서 실행
- ExecutorService의 submit()은 Runnable/Callable 등록 시 Future를 반환합니다.
- 스레드 수·큐 정책 같은 실행 환경 관리
멀티쓰레드의 구현을 위해서는 대부분 다음과 같이 ExecutorService 구현체를 생성해서 여러 작업들을 동시에 실행시키게 됩니다.
ThreadPoolExecutor
ThreadPoolExecutor 를 사용하면 스레드 풀에 사용되는 숫자와 블로킹 큐등 사용 가능합니다.
쓰레드를 미리 생성해서 대기하는 구조입니다.
newFixedThreadPool()
- 고정된 쓰레드 개수를 갖는 쓰레드 풀을 생성함
- ExecutorService 인터페이스를 구현한 ThreadPoolExecutor 객체가 생성됨
설정 값: corePoolSize , maximumPoolSize, KeepAliveTime, BlockingQueue
작업 큐(BlockingQueue)에 작업을 담아두고, 풀에 있는 스레드들이 하나씩 가져가서 실행하는 구조
Executors
ExecutorService, ThreadPoolExecutor 같은 구현체를 손쉽게 생성할 수 있도록 도와주는 팩토리 클래스입니다.
- newFixedThreadPool(): 고정된 스레드 개수의 스레드 풀을 생성합니다. (ThreadPoolExecutor 반환)
- 이외에도 newCachedThreadPool(), newSingleThreadExecutor(), newScheduledThreadPool() 등 메서드를 가지고있습니다.
2. 비동기 JAVA 라이브러리
CompletableFuture
비동기 프로그래밍을 위한 Future 확장 클래스입니다.
Future를 기반으로 외부에서 완료시킬 수 있어서 CompletableFuture라는 이름을 갖게 되었습니다.
Future의 단점을 극복
- 여러 연산을 결합하기 어려운 문제
- 비동기 처리 중에 발생하는 예외를 처리하기 어려운 문제
CompletableFuture의 주요 기능
- 비동기 작업 시작 (내부적으로 ExecutorService를 씀, 기본은 ForkJoinPool.commonPool)
- supplyAsync(Supplier): 반환 값 있음
- runAsync(Runnable): 반환값 없음
- 결과 처리
- thenApply(fn): 반환 값을 받아서 다른 값을 반환함, 함수형 인터페이스 Function을 파라미터로 받음
- thenAccpet(consumer): 결과를 소비만 하고 반환 없음
- thenRun (Runnable 을 파라미터로 받음): 반환 값을 받지 않고 다른 작업을 실행, 함수형 인터페이스
- **결과 합성 조합**
- thenCompose(fn): 비동기 결과를 다른 CompletableFuture와 연결 (flatMap
- thenCombine(cf, fn): 두 비동기 결과를 합쳐 새 값 생성
- 여러 비동기 작업 조합
- allOf(f1, f2, ...): 여러 작업들을 동시에 실행하고, 모든 작업 결과에 콜백을 실행
- anyOf(f1, f2, ...): 여러 작업들 중에서 가장 빨리 끝난 하나의 결과에 콜백을 실행
- 예외 처리
- exceptionally(fn): 예외 발생 시 대체 값 반환
- handle((res, ex) -> ...): 성공/실패 모두 처리
- 비동기 작업 시작 (내부적으로 ExecutorService를 씀, 기본은 ForkJoinPool.commonPool)
장점
- 논블로킹 콜백 스타일 지원
- 동시성 제어 로직을 깔끔하게 표현 가능
- ExecutorService 직접 다루는 것보다 코드 레벨 추상화 ↑
3. 실제 프로젝트 적용
실제로 프로젝트에 적용한 부분에 대해 설명하겠습니다.
제가 맡은 프로젝트에서는 사용자 위치를 기반으로 반경 10km 내의 도서관을 찾고, 해당 도서관 API를 호출해 책 보유 여부를 확인하는 기능이 있었습니다.
문제는, 주변에 도서관이 50개 이상 있을 경우 최대 50번의 API 호출을 동기적으로 처리해야 했다는 점입니다.
실제로 실행해 보니 한 번 요청에 약 20초가 소요되었고, 여러 사용자가 동시에 요청하면 지연 시간이 기하급수적으로 늘어날 수 있었습니다.
이 문제를 해결하기 위해 멀티쓰레드 + 비동기 처리를 도입했습니다.
@Bean(name = "externalApiTaskExecutor")
public Executor externalApiTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20); // 유지할 코어 스레드 수
executor.setMaxPoolSize(40); // 최대 스레드 수 (큐가 꽉 차면 증가)
executor.setQueueCapacity(100); // 대기 큐 크기
executor.setThreadNamePrefix("api-call-"); // 스레드 이름(로그/모니터링 용이)
executor.setRejectedExecutionHandler(
new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
멀티쓰레드 처리를 위해 저는 ThreadPoolTaskExecutor를 사용하여 스레드 풀(Thread Pool) 을 직접 구성했습니다.
스프링부트에서는 ThreadPoolTaskExecutor를 Bean으로 등록해두면, @Async("빈이름") 방식으로 손쉽게 사용할 수 있습니다.
이렇게 스레드 풀을 Bean으로 등록해두면, 효율적인 리소스 관리가 가능하고, 동시에 많은 API 호출을 안정적으로 처리할 수 있습니다.
참고) CompletableFuture.supplyAsync는 별도 Executor를 넘기지 않으면 ForkJoinPool.commonPool()(JVM 전역 공유)을 사용합니다.
공용 풀은
- 같은 JVM의 다른 비동기 작업들과 경쟁하고,
- 풀의 parallelism 한도에 묶여, 서비스 트래픽이 몰릴 때 실제 병렬 처리량이 기대보다 낮아질 수 있습니다.
@Async("externalApiTaskExecutor")
public CompletableFuture<List<AbstractMap.SimpleEntry<String, BookSearchReseponseDto>>>
getCompletableFutureList(LocationDto locationDto, List<String> libraryCodes) {
List<CompletableFuture<AbstractMap.SimpleEntry<String, BookSearchReseponseDto>>> futures =
libraryCodes.stream()
.map(libCode -> CompletableFuture
.supplyAsync(() -> callBookExist(libCode, locationDto.getIsbn()), externalApiTaskExecutor)
.completeOnTimeout(null, 3, java.util.concurrent.TimeUnit.SECONDS)
.exceptionally(ex -> {
log.warn("API 실패: {}", libCode, ex);
return null;
})
).toList();
return CompletableFuture
.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.toList());
}
다음은 비동기와 멀티쓰레드를 구현한 로직입니다.
사용자의 위치 반경 내 여러 도서관 API를 동시에 호출해 책 보유 여부를 확인하는 과정에서 @Async와 CompletableFuture를 활용했습니다.
- @Async("externalApiTaskExecutor")
- 앞에서 Bean으로 등록한 externalApiTaskExecutor 스레드 풀을 사용하여, API 호출 메서드를 비동기로 실행할 수 있도록 지정했습니다.
- CompletableFuture.supplyAsync(...)
- 각 도서관 API 호출을 별도의 비동기 작업으로 실행해서 값을 반환합니다.
- callBookExist(libCode, locationDto.getIsbn()) 뒤에 인자를 주지 않으면 지바가 기본으로 제공하는 ForkJoinPool.commonPool을 쓰기 때문에 직접 생성한 쓰레드풀을 명시했습니다.
- 이때 직접 만든 스레드 풀을 전달하여 스레드가 동시에 여러 작업을 할 수 있게 구현하였습니다.
- 각 도서관 API 호출을 별도의 비동기 작업으로 실행해서 값을 반환합니다.
- completeOnTimeout(…, 3초)
- 특정 도서관 API가 응답이 늦어지더라도. 3초 안에 응답이 없으면 자동으로 타임아웃 처리되도록 했습니다.
- 지정한 시간안에 Future가 완료되지 않으면 Future 를 성공 상태(null)로 변환시킵니다.
- 즉 타임아웃이 발생해도 에러가 터지지 않고 정상 완료된 것처럼 처리 됩니다.
- 지정한 시간안에 Future가 완료되지 않으면 Future 를 성공 상태(null)로 변환시킵니다.
- 특정 도서관 API가 응답이 늦어지더라도. 3초 안에 응답이 없으면 자동으로 타임아웃 처리되도록 했습니다.
- exceptionally(...)
- API 호출 중 예외가 발생하더라도 전체 처리가 중단되지 않도록, 로그만 남기고 null을 반환하도록 했습니다.
- CompletableFuture.allOf(...)
- 여러 비동기 작업들을 동시에 실행한 뒤, 모든 작업이 완료될 때까지 기다립니다.
- futures.stream().map(CompletableFuture::join)
- 모든 결과를 모아 최종적으로 리스트 형태로 반환합니다. 이때 null(실패한 API)은 필터링하여 제외했습니다.
이런 구조를 통해, 최대 50개의 도서관 API를 동시에 요청할 수 있었고, 기존에 동기 방식으로 20초 이상 걸리던 작업이 약 3초로 단축되어 훨씬 빠르게 처리할 수 있었습니다.
읽어 주셔서 감사합니다!