Published on

멀티스레드와 비동기 적용하기

Authors
  • avatar
    Name
    zkrp
    Twitter

저번에 멀티쓰레드와 비동기의 기초 개념에 대해서 글을 작성했습니다.

https://wngud.tistory.com/29

이번 글에서는 프로젝트에 멀티쓰레드와 비동기를 실제로 적용한 경험을 공유하려 합니다.
본격적인 적용 사례를 소개하기 전에, 먼저 자바에서 멀티쓰레드와 비동기를 다룰 때 사용하는 대표적인 라이브러리들을 간단히 정리해 보겠습니다.

1. 멀티쓰레드 JAVA 라이브러리

Runnable

자바에서 쓰레드에서 실행할 작업을 정의하는 인터페이스 입니다.

  • 결과를 반환할 수 없다는 한계가 있습니다.
    • 반환값을 얻으려면 공용 메모리(static)이나 파이프 등을 사용합니다
  • 예외 처리를 할 수 없음

Callable

Runnable 의 단점을 보완하기 위한 인터페이스 입니다.

  • call() 은 반환 타입이 제네릭 V
  • Callable 인터페이스의 구현체인 작업(Task)은 가용 가능한 쓰레드가 없어서 실행이 미뤄질 수 있고, 작업 시간이 오래 걸릴 수도 있습니다.
    • 미래에 완료된 Callable의 반환값을 구하기 위해 사용되는 것이 Future
      • Future로 인해 결과를 나중에 받거나 ,실행 상태를 추적할수 있습니다.
  • throw Exception 예외가 선언되어 있습니다.

이러한 특징이 Runnable 보다 훨씬 편리하다는 장점이 있습니다.

Future

요청 스레드는 대기하지 않고, 다른 작업을 수행 가능하게 합니다. future.get()을 호출해서 최종 결과를 반환합니다.

  • 전달한 작업의 미래 결과를 담고 있습니다.
  • 동시에 쓰레드 요청이 가능합니다.
  • Future.get()
    • 완료 상태: Future가 완료 상태면 결과에 포함 되어 있습니다, 이 경우 요청 스레드는 대기하지 않고, 값을 즉시 반환합니다.
    • Future가 완료 상태가 아닌경우 -> taskA가 아직 수행되지 않았거나 수행중
      • 결과를 받기 위해 대기, 요청 스레드가 마치 락을 얻을때처럼, 결과를 얻기 위해 대기
      • 블로킹 메서드라고 함 - 스레드가 작업을 바로 수행하지 않고 , 다른 작업이 완료 될때까지 대기

Future는 결과를 비동기적으로 받을 수 있지만, 여러 Future를 조합하거나 예외 처리를 다루기 어렵습니다. 이 한계를 해결하기 위해 CompletableFuture가 도입되었습니다.

Executor 

등록된 작업을 실행하기 위한 인터페이스입니다.

  • 등록된 작업(Runnable)을 실행하기 위한 인터페이스 , 작업 등록과 작업 실행 중에서 작업 실행만을 책임집니다.
  • 스레드 생성/관리는 직접 하지 않습니다.

ExecutorService 

작업 등록 뿐만 아니라 실행을 위한 책임도 가지는 인터페이스 입니다.

  • 작업(Runnable, Callable)을 등록해서 스레드 풀에서 실행
    • ExecutorService의 submit()은 Runnable/Callable 등록 시 Future를 반환합니다.
  • 스레드 수·큐 정책 같은 실행 환경 관리

멀티쓰레드의 구현을 위해서는 대부분 다음과 같이 ExecutorService 구현체를 생성해서 여러 작업들을 동시에 실행시키게 됩니다.

ThreadPoolExecutor

ThreadPoolExecutor 를 사용하면 스레드 풀에 사용되는 숫자와 블로킹 큐등 사용 가능합니다.

  • 쓰레드를 미리 생성해서 대기하는 구조입니다.

  •  newFixedThreadPool()

    • 고정된 쓰레드 개수를 갖는 쓰레드 풀을 생성함
    • ExecutorService 인터페이스를 구현한 ThreadPoolExecutor 객체가 생성됨
  • 설정 값: corePoolSize , maximumPoolSize, KeepAliveTime, BlockingQueue

  • 작업 큐(BlockingQueue)에 작업을 담아두고, 풀에 있는 스레드들이 하나씩 가져가서 실행하는 구조

Executors

ExecutorService, ThreadPoolExecutor 같은 구현체를 손쉽게 생성할 수 있도록 도와주는 팩토리 클래스입니다.

  •  newFixedThreadPool(): 고정된 스레드 개수의 스레드 풀을 생성합니다. (ThreadPoolExecutor 반환)
    • 이외에도 newCachedThreadPool(), newSingleThreadExecutor(), newScheduledThreadPool() 등 메서드를 가지고있습니다.

2. 비동기 JAVA 라이브러리

CompletableFuture

비동기 프로그래밍을 위한 Future 확장 클래스입니다.

  • Future를 기반으로 외부에서 완료시킬 수 있어서 CompletableFuture라는 이름을 갖게 되었습니다.

  • Future의 단점을 극복 

    • 여러 연산을 결합하기 어려운 문제
    • 비동기 처리 중에 발생하는 예외를 처리하기 어려운 문제
  • CompletableFuture의 주요 기능

    •  비동기 작업 시작 (내부적으로 ExecutorService를 씀, 기본은 ForkJoinPool.commonPool) 
      • supplyAsync(Supplier): 반환 값 있음
      • runAsync(Runnable): 반환값 없음
    • 결과 처리
      • thenApply(fn): 반환 값을 받아서 다른 값을 반환함,  함수형 인터페이스 Function을 파라미터로 받음
      • thenAccpet(consumer): 결과를 소비만 하고 반환 없음 
      • thenRun (Runnable 을 파라미터로 받음): 반환 값을 받지 않고  다른 작업을 실행, 함수형 인터페이스 
    • **결과 합성 조합**
      • thenCompose(fn): 비동기 결과를 다른 CompletableFuture와 연결 (flatMap
      • thenCombine(cf, fn): 두 비동기 결과를 합쳐 새 값 생성
    • 여러 비동기 작업 조합
      • allOf(f1, f2, ...): 여러 작업들을 동시에 실행하고, 모든 작업 결과에 콜백을 실행
      • anyOf(f1, f2, ...): 여러 작업들 중에서 가장 빨리 끝난 하나의 결과에 콜백을 실행
    • 예외 처리
      • exceptionally(fn): 예외 발생 시 대체 값 반환 
      • handle((res, ex) -> ...): 성공/실패 모두 처리

장점

  • 논블로킹 콜백 스타일 지원
  • 동시성 제어 로직을 깔끔하게 표현 가능
  • ExecutorService 직접 다루는 것보다 코드 레벨 추상화 ↑

3. 실제 프로젝트 적용

실제로 프로젝트에 적용한 부분에 대해 설명하겠습니다.

제가 맡은 프로젝트에서는 사용자 위치를 기반으로 반경 10km 내의 도서관을 찾고, 해당 도서관 API를 호출해 책 보유 여부를 확인하는 기능이 있었습니다.
문제는, 주변에 도서관이 50개 이상 있을 경우 최대 50번의 API 호출을 동기적으로 처리해야 했다는 점입니다.
실제로 실행해 보니 한 번 요청에 약 20초가 소요되었고, 여러 사용자가 동시에 요청하면 지연 시간이 기하급수적으로 늘어날 수 있었습니다.
이 문제를 해결하기 위해 멀티쓰레드 + 비동기 처리를 도입했습니다.

@Bean(name = "externalApiTaskExecutor")
public Executor externalApiTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(20);      // 유지할 코어 스레드 수
    executor.setMaxPoolSize(40);       // 최대 스레드 수 (큐가 꽉 차면 증가)
    executor.setQueueCapacity(100);    // 대기 큐 크기
    executor.setThreadNamePrefix("api-call-"); // 스레드 이름(로그/모니터링 용이)
    executor.setRejectedExecutionHandler(
            new ThreadPoolExecutor.CallerRunsPolicy());
    executor.initialize();
    return executor;
}

멀티쓰레드 처리를 위해 저는 ThreadPoolTaskExecutor를 사용하여 스레드 풀(Thread Pool) 을 직접 구성했습니다.
스프링부트에서는 ThreadPoolTaskExecutor를 Bean으로 등록해두면, @Async("빈이름") 방식으로 손쉽게 사용할 수 있습니다.

이렇게 스레드 풀을 Bean으로 등록해두면, 효율적인 리소스 관리가 가능하고, 동시에 많은 API 호출을 안정적으로 처리할 수 있습니다.

참고) CompletableFuture.supplyAsync는 별도 Executor를 넘기지 않으면 ForkJoinPool.commonPool()(JVM 전역 공유)을 사용합니다.
공용 풀은

  • 같은 JVM의 다른 비동기 작업들과 경쟁하고,
  • 풀의 parallelism 한도에 묶여, 서비스 트래픽이 몰릴 때 실제 병렬 처리량이 기대보다 낮아질 수 있습니다.
  @Async("externalApiTaskExecutor")
    public CompletableFuture<List<AbstractMap.SimpleEntry<String, BookSearchReseponseDto>>>
    getCompletableFutureList(LocationDto locationDto, List<String> libraryCodes) {

        List<CompletableFuture<AbstractMap.SimpleEntry<String, BookSearchReseponseDto>>> futures =
                libraryCodes.stream()
                        .map(libCode -> CompletableFuture
                                .supplyAsync(() -> callBookExist(libCode, locationDto.getIsbn()), externalApiTaskExecutor)
                                .completeOnTimeout(null, 3, java.util.concurrent.TimeUnit.SECONDS)
                                .exceptionally(ex -> {
                                    log.warn("API 실패: {}", libCode, ex);
                                    return null;
                                })
                        ).toList();
        return CompletableFuture
                .allOf(futures.toArray(new CompletableFuture[0]))
                .thenApply(v -> futures.stream()
                        .map(CompletableFuture::join)
                        .filter(Objects::nonNull)
                        .toList());
    }

다음은  비동기와 멀티쓰레드를 구현한 로직입니다.

사용자의 위치 반경 내 여러 도서관 API를 동시에 호출해 책 보유 여부를 확인하는 과정에서 @Async와 CompletableFuture를 활용했습니다.

  • @Async("externalApiTaskExecutor")
    • 앞에서 Bean으로 등록한 externalApiTaskExecutor 스레드 풀을 사용하여, API 호출 메서드를 비동기로 실행할 수 있도록 지정했습니다.
  • CompletableFuture.supplyAsync(...)
    • 각 도서관 API 호출을 별도의 비동기 작업으로 실행해서 값을 반환합니다.
      •  callBookExist(libCode, locationDto.getIsbn()) 뒤에 인자를 주지 않으면 지바가 기본으로 제공하는 ForkJoinPool.commonPool을 쓰기 때문에 직접 생성한 쓰레드풀을 명시했습니다.
      • 이때 직접 만든 스레드 풀을 전달하여 스레드가 동시에 여러 작업을 할 수 있게 구현하였습니다.
  • completeOnTimeout(…, 3초)
    • 특정 도서관 API가 응답이 늦어지더라도. 3초 안에 응답이 없으면 자동으로 타임아웃 처리되도록 했습니다.
      • 지정한 시간안에 Future가 완료되지 않으면 Future 를 성공 상태(null)로 변환시킵니다.
        • 즉 타임아웃이 발생해도 에러가 터지지 않고 정상 완료된 것처럼 처리 됩니다.
  • exceptionally(...)
    • API 호출 중 예외가 발생하더라도 전체 처리가 중단되지 않도록, 로그만 남기고 null을 반환하도록 했습니다.
  • CompletableFuture.allOf(...)
    • 여러 비동기 작업들을 동시에 실행한 뒤, 모든 작업이 완료될 때까지 기다립니다.
  • futures.stream().map(CompletableFuture::join)
    • 모든 결과를 모아 최종적으로 리스트 형태로 반환합니다. 이때 null(실패한 API)은 필터링하여 제외했습니다.

이런 구조를 통해, 최대 50개의 도서관 API를 동시에 요청할 수 있었고, 기존에 동기 방식으로 20초 이상 걸리던 작업이 약 3초로 단축되어  훨씬 빠르게 처리할 수 있었습니다.

읽어 주셔서 감사합니다!