본문 바로가기

프로그래밍(TA, AA)/JVM 언어

[Java] 개선된 자바 동시성(2) - CompletableFuture

- Thread, Future, 자바가 풍부한 동시성 API를 제공하도록 강요하는 진화의 힘

- 비동기 API

- 동시 컴퓨팅의 박스와 채널 뷰

- CompletableFuture 콤비네이터로 박스를 동적으로 연결

 


이런 애플리케이션을 구현하려면 인터넷으로 여러 웹서비스에 접근해야 한다. 하지만 이들 서비스의 응답을 기다리는 동안 연산이 블록되거나 귀중한 CPU 클록 사이클 자원을 낭비하고 싶진 않다. 예를 들어 페이스북의 데이터를 기다리는 동안 트위터 데이터를 처리하지 말란 법은 없다.

 

이 상황은 멀티태스크 프로그래밍의 양면성을 보여준다. 포크/조인 프레임워크와 병렬 스트림은 병렬성의 귀중한 도구다. 이들은 한 태스크를 여러 하위 태스크로 나눠서 CPU의 다른 코어 또는 다른 머신에서 이들 하위 태스크를 병렬로 실행한다.

 

반면 병렬성이 아니라 동시성을 필요로 하는 상황 즉 조금씩 연관된 작업을 같은 CPU에서 동작하는 것 또는 애플리케이션 생산성을 극대화할 수 있도록 코어를 바쁘게 유지하는 것이 목표라면, 원격 서비스나 데이터베이스 결과를 기다리는 스레드를 블록함으로 연산 자원을 낭비하는 일은 피해야 한다.

 

자바는 이런 환경에서 사용할 수 있는 두가지 주요 도구를 제공한다. Futurue 인터페이스와 CompletableFuture 구현은 간단하고 효율적인 문제 해결사다. 최근 자바 9에 추가된 Pub/Sub 프로토콜에 기반한 리액티브 프로그래밍 개념을 따르는 Flow API는 조금 더 정교한 프로그래밍 접근 방법을 제공한다.

 

다음 그림은 동시성/병렬성의 차이를 보여준다. 동시성은 단일 코어 머신에서 발생할 수 있는 프로그래밍 속성으로 실행이 서로 겹칠 수 있는 반면 병렬성은 병렬 실행을 하드웨어 수준에서 지원한다.

f(x) + g(x) 등과 같은 표현식을 어떻게 계산하고 반환하거나 출력하는지 그리고 어떤 다양한 자바 동시성 기능을 이용해 결과를 얻는지 등의 예제를 이용해 대부분의 사상을 설명한다.


멀티코어 CPU에서 효과적으로 프로그래밍을 실행할 필요성이 커지면서 이후 자바 버전에서는 개선된 동시성 지원이 추가되었다. 자바 7에서는 divide and conquer 알고리즘의 포크/조인 구현을 지원하는 java.util.concurrent.RecursiveTask가 추가되었고 자바8에서는 스트림과 새로 추가된 람다 지원에 기반한 병렬 프로세싱이 추가되었다.

 

자바는 Future를 조합하는 기능을 추가하면서 동시성을 강화 ( Future 구현인 자바8 CompletableFuture 참고 ) 했고, 자바9에서는 분산 비동기 프로그래밍을 명시적으로 지원한다. 이들 API는 매쉬업 애플리케이션 즉, 다양한 웹서비스를 이용하고 이들 정보를 실시간으로 조합해 사용자에 제공하거나 추가 웹서비스를 통해 제공하는 종류의 애플리케이션을 개발하는데 필수적인 기초 모델과 툴킷을 제공한다.

 

CompletableFuture와 java.util.concurrent.Flow의 궁극적인 목표는 가능한한 독립적인 태스크를 동시 실행 가능하게 만들면서 멀티코어 또는 여러 기기를 통해 제공되는 병렬성을 쉽게 이용하는 것이다.


사용할 수 있는 여러 다중 처리 리소스(CPU 등)를 고려해 프로그램이 이들 자원을 고수준(스레드를 이용한 복잡하고 유지보수하기 어려운 잘못된 구조를 회피함)으로 효과적으로 활용할 수 있도록 최신의 동시성 기법을 살펴봤다.

 

병렬 스트림과 포크/조인 기법을 이용해 컬렉션을 반복하거나 divide and conquer 알고리즘을 활용하는 프로그램에서 높은 수준의 병렬을 적용할 수 있음을 확인했다. 이에 더해 이들 기법을 이용하면 코드를 병렬로 실행할 수 있는 가능성이 열린다.

 

자바8, 자바9에서는 CompletableFuture와 리액티브 프로그래밍 패러다임 두가지 API를 제공한다. 


1. Future의 단순 활용

자바5부터는 미래의 어느 시점에 결과를 얻는 모델에 활용할 수 있도록 Future 인터페이스를 제공하고 있다. 비동기 계산을 모델링하는데 Future를 이용할 수 있으며, Future는 계산이 끝났을때 결과에 접근할 수 있는 참조를 제공한다. 시간이 걸릴수 있는 작업을 Future 내부로 설정하면 호출자 스레드가 결과를 기다리는 동안 다른 유용한 작업을 수행할 수 있다. Future는 저수준의 스레드에 비해 직관적으로 이해하기 쉽다는 장점이 있다. Future를 이용하려면 시간이 오래 걸리는 작업을 Callable 객체 내부로 감싼 다음에 ExecutorService에 제출해야 한다.

팩토리 메서드 supplyAsync로 CompletableFuture 만들기

CompletableFuture를 직접 만드는 것 보다 좀 더 간단하게 CompletableFuture를 만드는 방법도 있다. 예를 들어 getPriceAsync 메서드를 다음처럼 간단하게 한 행으로 구현할 수 있다.

public Future<Double> getPriceAsync(String product) {
    return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}

supplyAsync 메서드는 Supplier를 인수로 받아서 CompletableFuture를 반환한다. CompletableFuture는 Supplier를 실행해서 비동기적으로 결과를 생성한다. ForkJoinPool의 Executor 중 하나가 Supplier를 실행할 것이다. 두번째 인수를 받는 오버로드 버전의 supplyAsync 메서드를 이용해서 다른 Executor를 지정할 수 있다.

 

CompletableFuture의 팩토리 메서드에 Executor를 선택적으로 전달할 수 있다. Executor를 적절하게 사용하면 애플리케이션의 성능을 개선할 수 있다.

 

지금부터는 Shop 클래스에서 구현한 API를 제어할 권한이 우리에게 없는 상황이며 모든 API는 동기 방식의 블록 메서드라고 가정할 것이다. 실제로 몇몇 서비스에서 제공하는 HTTP API는 이와 같은 방식으로 동작한다. 블록 메서드를 사용할 수 밖에 없는 상황에서 비동기적으로 여러 상점에 질의하는 방법, 즉 한 요청의 응답을 기다리며 블록하는 상황을 피해 최저가격 검색 애플리케이션의 성능을 높일 수 있는 방법을 살펴보자.

1.2. CompletableFuture로 비동기 호출 구현하기

팩토리 메서드 supplyAsync로 CompletableFuture를 만들 수 있음을 배웠다.

List<CompletableFuture<String>> priceFutures = 
        shops.stream()
        .map(shop -> CompletableFuture.supplyAsync(
            () -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product))))
        .collect(toList());

위 코드로 CompletableFuture를 포함하는 리스트 List<CompletableFuture<String>>를 얻을 수 있다. 리스트의 CompletableFuture는 각각 계산 결과가 끝난 상점의 이름 문자열을 포함한다. 하지만 우리가 재구현하는 findPrices 메서드의 반환 형식은 List<String>이므로 모든 CompletableFuture의 동작이 완료되고 결과를 추출한 다음에 리스트를 반환해야 한다.

 

두번째 map 연산을 List<CompletableFuture<String>>에 적용할 수 있다. 즉, 리스트의 모든 CompletableFuture에 join을 호출해서 모든 동작이 끝나기를 기다린다. CompletableFuture 클래스의 join 메서드는 Future 인터페이스의 get 메서드와 같은 의미를 갖는다. 다만 join은 아무 예외도 발생시키지 않는다는 점이 다르다. 따라서 두번째 map의 람다 표현식을 try/catch로 감쌀 필요가 없다. 다음은 findPrices를 재구현한 코드다.

public List<String> findPrices(String product) {
    List<CompletabeFuture<String>> priceFutures = 
            shops.stream()
            // CompletableFuture로 각각의 가격을 비동기적으로 계산한다.
            .map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is " + shop.getPrice(product)))
            .collect(Collectors.toList());

    return priceFutures.stream()
             // 모든 비동기 동작이 끝나길 기다린다.
             .map(CompletableFuture::join)
             .collect(toList());
}

두 map 연산을 하나의 스트림 처리 파이프라인으로 처리하지 않고 두 개의 스트림 파이프라인으로 처리했다는 사실에 주목하자. 스트림 연산은 게으른 특성이 있으므로 하나의 파이프라인으로 연산을 처리했다면 모든 가격 정보 요청 동작이 동기적, 순차적으로 이루어지는 결과가 된다.

CompletableFuture로 각 상점의 정보를 요청할 때 기존 요청 작업이 완료되어야 join이 결과를 반환하면서 다음 상점으로 정보를 요청할 수 있기 때문이다. 위 그림은 이 과정을 자세히 보여준다.

 

그림의 윗부분은 순차적으로 평가를 진행하는 단일 파이프라인 스트림 처리 과정을 보여준다. 즉, 이전 요청의 처리가 완전히 끝난 다음에 새로 만든 CompletableFuture가 처리된다. 반면 아래쪽은 우선 CompletableFuture를 모은 다음에 다른 작업과는 독립적으로 각자의 작업을 수행하는 모습을 보여준다.

스트림 병렬화와 CompletableFuture 병렬화

지금까지 컬렉션 계산을 병렬화하는 두가지 방법을 살펴봤다. 하나는 병렬 스트림으로 변환해서 컬렉션을 처리하는 방법이고 다른 하나는 컬렉션을 반복하면서 CompletableFuture 내부의 연산으로 만드는 것이다. CompletableFuture를 이용하면 전체적인 계산이 블록되지 않도록 스레드풀의 크기를 조절할 수 있다.

- I/O가 포함되지 않은 계산 중심의 동작을 실행할 때는 스트림 인터페이스가 가장 구현하기 간단하며 효율적일 수 있다(모든 스레드가 계산 작업을 수행하는 상황에서는 프로세스 코어 수 이상의 스레드를 가질 필요가 없다).

- 반면 작업이 I/O를 기다리는 작업을 병렬로 실행할때는 CompletableFuture가 더 많은 유연성을 제공하며 대기/계산의 비율에 적합한 스레드 수를 설정할 수 있다. 특히 스트림의 게으른 특성 때문에 스트림에서 I/O를 실제로 언제 처리할지 예측하기 어려운 문제도 있다.

1.3. 더 확장성이 좋은 해결 방법

CompletableFuture는 병렬 스트림 버전에 비해 작업에 이용할 수 있는 다양한 Executor를 지정할 수 있다는 장점이 있다. 따라서 Executor로 스레드 풀의 크기를 조절하는 등 애플리케이션에 맞는 최적화된 설정을 만들 수 있다.

private final Executor executor = 
        Executors.newFixedThreadPool(Math.min(shops.size(), 100),  // 상점 수만큼의 스레드를 갖는 풀을 생성한다
                                     new ThreadFactory() {
                                         public Thread newThread(Runnable r) {
                                             Thread t = new Thread(r);
                                             t.setDaemon(true);    // 프로그램 종료를 방해하지 않는 데몬 스레드를 사용한다
                                             return t;
                                         }
                                     });

// 애플리케이션의 특성에 맞는 Executor 적용
CompletabeFuture.supplyAsync(() -> shop.getName() + " price is " +
                                   shop.getPrice(product), executor);

애플리케이션의 특성에 맞는 Executor를 만들어 CompleteFuture를 활용하는 것이 바람직하다. 비동기 동작을 많이 사용하는 상황에서는 이 기법이 가장 효과적일 수 있음을 기억하자.

 

지금까지는 Future 내부에서 수행하는 작업이 모두 일회성 작업이었다. 다음 절에서는 스트림 API에서 배웠던 것처럼 선언형으로 여러 비동기 연산을 CompletableFuture로 파이프라인화 하는 방법을 설명한다.

1.4. 비동기 작업 파이프라인 만들기

// Discount 서비스를 이용하는 가장 간단한 findPrices 구현
public List<String> findPrices(String product) {
    return shops.stream()
            .map(shop -> shop.getPrice(product))  // 각 상점에서 할인 전 가격 얻기
            .map(Quote::parse)                    // 상점에서 반환한 문자열을 Quote 객체로 반환한다.
            .map(Discount::applyDiscount)         // Discount 서비스를 이용해서 각 Quote에 할인을 적용한다.
            .collect(toList());
}

// CompletableFuture로 findPrices 메서드를 비동기적으로 재구현
public List<String> findPrices(String product) {
    List<CompletableFuture<String>> priceFutures = 
        shops.stream()
             .map(shop -> CompletableFuture.supplyAsync(
                              () -> shop.getPrice(product), executor))
             .map(future -> future.thenApply(Quote::parse))
             .map(future -> future.thenCompose(quote ->
                          CompletableFuture.supplyAsync
                              () -> Discount.applyDiscount(quote), executor))
             .collect(toList());
}

가격정보 얻기 (shop.getPrice())

첫번째 연산은 이 장의 다양한 예제에서 많이 사용한 코드다. 즉, 팩토리 메서드 supplyAsync에 람다 표현식을 전달해서 비동기적으로 상점에서 정보를 조회했다. 첫번째 변환의 결과는 Stream<CompletableFuture<String>>이다. 각 CompletableFuture는 작업이 끝났을때 해당 상점에서 반환하는 문자열 정보를 포함한다. 그리고 커스텀 Executor로 CompletableFuture를 설정한다.

Quote 파싱하기 (new Quote(price))

thenApply 메서드는 CompletableFuture가 끝날때까지 블록하지 않는다는 점을 주의해야 한다. 즉, CompletableFuture가 동작을 완전히 완료한 다음에 thenApply 메서드로 전달된 람다 표현식을 적용할 수 있다. 따라서 CompletableFuture<String>을 CompletableFuture<Quote>로 변환할 것이다. 이는 마치 CompletableFuture의 결과물로 무엇을 할지 지정하는 것과 같다.

CompletableFuture를 조합해서 할인된 가격 계산하기 (applyDiscount(quote))

람다 표현식으로 이 동작을 팩토리 메서드 supplyAsync에 전달할 수 있다. 그러면 다른 CompletableFuture가 반환된다. 결국 두 가지 CompletableFuture로 이루어진 연쇄적으로 수행되는 두 개의 비동기 동작을 만들 수 있다.

 - 상점에서 가격 정보를 얻어와서 Quote로 변환하기

 - 변환된 Quote를 Discount 서비스로 전달해서 할인된 최종가격 획득하기

 

자바8의 CompletableFuture API는 이와 같이 두 비동기 연산을 파이프라인으로 만들 수 있도록 thenCompose 메서드를 제공한다. thenCompose 메서드는 첫번째 연산의 결과를 두번째 연산으로 전달한다. 즉, 첫번째 CompletableFuture에 thenCompose 메서드를 호출하고 Function에 넘겨주는 식으로 두 CompletableFuture를 조합할 수 있다. Function은 첫번째 CompletableFuture 반환 결과를 인수로 받고 두번째 CompletableFuture를 반환하는데, 두번째 CompletableFuture는 첫번째 CompletableFuture의 결과를 계산의 입력으로 사용한다.

 

세 개의 map 연산 결과 스트림의 요소를 리스트로 수집하면 List<CompletableFuture<String>> 형식의 자료를 얻을 수 있다. 마지막으로 CompletableFuture가 완료되기를 기다렸다가 join으로 값을 추출할 수 있다.

 

CompletableFuture 클래스의 다른 메서드처럼 thenCompose 메서드도 Async로 끝나는 버전이 존재한다. Async로 끝나지 않는 메서드는 이전 작업을 수행한 스레드와 같은 스레드에서 작업을 실행함을 의미하며 Async로 끝나는 메서드는 다음 작업이 다른 스레드에서 실행되도록 스레드 풀로 작업을 제출한다. 예시에서는 스레드 전환 오버헤드가 적게 발생하면서 효율성이 좀 더 좋은 thenCompose를 사용했다.

독립 CompletableFuture와 비독립 CompletableFuture 합치기

실전에서는 독립적으로 실행된 두 개의 CompletableFuture 결과를 합쳐야 하는 상황이 종종 발생한다. 물론 첫번째 CompletableFuture의 동작 완료와 관계없이 두번째 CompletableFuture를 실행할 수 있어야 한다.

 

이런 상황에서는 thenCombine 메서드를 사용하다. thenCombine 메서드는 BiFunction을 두번째 인수로 받는다. BiFunction은 두 개의 CompletableFuture 결과를 어떻게 합칠지 정의한다. thenCompose와 마찬가지로 thenCombine 메서드에도 Async 버전이 존재한다. thenCombinAsync 메서드에서는 BiFunction이 정의하는 조합 동작이 스레드 풀로 제출되면서 별도의 태스크에서 비동기적으로 수행된다.

// 자바8 이후 버전
Future<Double> futurePriceInUSD = 
        CompletableFuture.supplyAsync(() -> shop.getPrice(product))    // 제품가격 정보를 요청하는 첫번째 태스크를 생성한다.
        .thenCombine(
            CompletableFuture.supplyAsync(
                () -> exchangeService.getRate(Money.EUR, Money.USD))
                .completeOnTimeout(DEFAULT_RATE, 1, TimeUnit.SECONDS), // 환전 서비스가 일 초 안에 결과를 제공하지 않으면 기본 환율값을 사용
            (price, rate) -> price * rate   // USD, EUR의 환율 정보를 요청하는 독립적인 두번째 태스크를 생성한다.
        )
        .orTimeout(3, TimeUnit.SECONDS);    // 3초 뒤에 작업이 완료되지 않으면 Future가 TimeoutException을 발생시키도록 설정

// 자바7 버전
ExecutorService executor = Executors.newCachedThreadPool();  // 태스크를 스레드풀에 제출할 수 있도록 ExecutorService를 생성한다.
final Future<Double> futureRate = executor.submit(new Callable<Double>() {
    public Double call() {
        return exchangeService.getRate(Money.EUR, Money.USD);    // EUR, USD 환율 정보를 가져올 Future를 생성한다.
    }
});
Future<Double> futurePriceInUSD = executor.submit(new Callable<Double>() {
    public Double call() {
        double priceInEUR = shop.getPrice(product);    // 두번째 Future로 상점에서 요청 제품의 가격을 검색한다.
        return priceInEUR * futureRate.get();          // 가격을 검색한 Future를 이용해서 가격과 환율을 곱한다.
    }
});

여기서 합치는 연산은 단순한 곱셈이므로 별도의 태스크에서 수행하여 자원을 낭비할 필요가 없다. 따라서 thenCombineAsync 대신 thenCombine 메서드를 사용한다.

자바8 이전의 Future에 비해 CompletableFuture는 람다 표현식을 사용한다. 람다 표현식 덕분에 다양한 동기 태스크, 비동기 태스크를 활용해서 복잡한 연산 수행방법을 선언형 API로 쉽게 정의할 수 있다. 자바7로 구현하면서 실질적으로 CompletableFuture를 이용했을때 얻을 수 있는 코드 가독성의 이점이 무엇인지 확인할 수 있다.


CompletableFuture의 종료에 대응하는 방법

지금까지 다양한 상점에서 물건의 가격 정보를 얻어오는 findPrices 메서드를 구현했는데 현재는 모든 상점에서 가격 정보를 가져온 다음에 그것을 사용할 수 있다. 이제 모든 상점에서 가격 정보를 제공할 때까지 기다리지 않고 각 상점에서 가격 정보를 제공할 때마다 즉시 보여줄 수 있는 최저가격 검색 애플리케이션을 만들어보자.

public Stream<CompletableFuture<String>> findPricesStream(String product) {
    return shops.stream()
            .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
            .map(future -> future.thenApply(Quote::parse))
            .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)));
}

// CompletableFuture 종료에 반응하기
CompletableFuture[] futures = findPricesStream("myPhone")
        .map(f -> f.thenAccept(System.out::println))
        .toArray(size -> new CompletabeFuture[size]);
CompletableFuture.allOf(futures).join();

CompletableFuture에 등록된 동작은 CompletableFuture의 계산이 끝나면 값을 소비한다. 자바8의 CompletableFuture API는 thenAccept라는 메서드로 이 기능을 제공한다. thenAccept 메서드는 연산 결과를 소비하는 Consumer를 인수로 받는다.

 

thenCompose, thenCombine 메서드와 마찬가지로 thenAccept에도 thenAcceptAsync라는 Async 버전이 존재한다. thenAcceptAsync 메서드는 CompletableFuture가 완료된 스레드가 아니라 새로운 스레드를 이용해서 Consumer를 실행한다. 불필요한 콘텍스트 변경은 피하는 동시에 CompletableFuture가 완료되는 즉시 응답하는 것이 좋으므로 thenAcceptAsync를 사용하지 않는다(오히려 thenAcceptAsync를 사용하면 새로운 스레드를 이용할 수 있을때까지 기다려야 하는 상황이 일어날 수 있다).

 

thenAccept 메서드는 CompletableFuture가 생성한 결과를 어떻게 소비할지 미리 지정했으므로 CompletableFuture<Void>를 반환한다. 따라서 네번째 map 연산은 <CompletableFuture<Void>>를 반환한다. 이제 CompletableFuture<Void>가 동작을 끝낼 때까지 딱히 할 수 있는 일은 없다. 이렇게 해서 우리가 원하는 동작을 구현했다.

 

팩토리 메서드 allOf는 CompletableFuture 배열을 입력으로 받아 CompletableFuture<Void>를 반환한다. 전달된 모든 CompletableFuture가 완료되어야 CompletableFuture<Void>가 완료된다. 따라서 allOf 메서드가 반환하는 CompletableFuture에 join을 호출하면 원래 스트림의 모든 CompletableFuture의 실행완료를 기다릴 수 있다.

 

반면 배열의 CompletableFuture 중 하나의 작업이 끝나길 기다리는 상황도 있을 수 있다. 이때는 팩토리 메서드 anyOf를 사용한다. anyOf 메서드는 CompletableFuture 배열을 입력으로 받아서 CompletableFuture<Object>를 반환한다. CompletableFuture<Object>는 처음으로 완료한 CompletableFuture의 값으로 동작을 완료한다.