본문 바로가기

프로그래밍(TA, AA)/JVM 언어

[JAVA] 개선된 동시성(1) - CompletableFuture & Reactive Concept

최근 소프트웨어 개발 방법 추세

  1. 애플리케이션을 실행하는 하드웨어 관련. 애플리케이션을 어떻게 구성하는가 (상호 작용 관점에서)

  2. 인터넷 서비스에서 사용하는 애플리케이션의 증가

 

멀티코어 프로세서가 발전하면서 애플리케이션의 속도는 멀티 코어 프로세서를 얼마나 잘 활용할 수 있도록 소프트웨어를 개발하는가에 따라 달라질 수가 있게 되었다. 한 개의 큰 태스크를 병렬로 실행할 수 있는 개별 하위 태스크로 분리할 수도 있다.

 

자바7부터는 포크/조인 프레임워크가 존재하며, 자바8에 추가된 병렬 스트림으로 스레드에 비해 단순하고 효과적인 방법으로 병렬 실행을 달성할 수도 있다.

 

마이크로서비스 아키텍처 선택이 지난 몇년간 증가했다. 하나의 거대한 애플리케이션 대신 작은 서비스로 애플리케이션을 나누는 것이다. 서비스가 작아진 대신 네트워크 통신이 증가한다. 그리고 공개 API를 통해 더 많은 인터넷 서비스를 접할 수 있게 되었다. 최근 애플리케이션 추세는 다양한 소스의 콘텐츠를 가져와서 합치는 매시업(mashup) 형태가 될 가능성이 있다.

 

이런 애플리케이션을 구현하려면 인터넷으로 여러 웹서비스에 접근해야 한다. 하지만 서비스의 응답을 기다리는 동안 연산이 블록되거나 CPU 클록 사이클 자원을 낭비하고 싶지는 않을 것이다. 이 상황은 멀티태스크 프로그래밍의 양면성을 보여준다. 포크/조인 프레임워크와 병렬 스트림은 병렬성의 유용한 도구이다. 이들은 한 태스크를 여러 하위 태스크로 나눠서 CPU의 다른 코어 또는 다른 머신에서 이들 하위 태스크를 병렬로 실행한다.

 

반면, 병렬성이 아니라 동시성을 필요로 하는 상황 즉 조금씩 연관된 작업을 같은 CPU에서 동작하는 것 또는 애플리케이션의 생산성을 극대화할 수 있도록 코어를 바쁘게 유지하는 것이 목표라면, 원격 서비스나 데이터베이스 결과를 기다리는 스레드를 블록처리함으로 연산 자원을 낭비하는 일은 피해야 한다.

 

자바는 이런 환경에서 두가지 주요 도구를 제공한다. 하나는 Future 인터페이스로 자바8의 CompletableFuture 구현은 간단하고 효율적인 문제 해결책이 된다. 최근 자바 9에 추가된 발행 구독 프로토콜에 기반한 리액티브 프로그래밍 개념을 따르는 Flow API는 조금더 정교한 프로그래밍 접근 방법을 제공한다.

 

다음은 동시성과 병렬성의 차이를 보여준다. 동시성은 단일 코어 머신에서 발생할 수 있는 프로그래밍 속성으로 실행이 서로 겹칠 수 있는 반면 병렬성은 병렬 실행을 하드웨어 수준에서 지원한다.

 

'f(x)+g(x)' 등과 같은 표현식(f(x), g(x)의 연산은 시간이 오래걸린다)을 어떻게 계산하고 반환하거나 출력하는지 그리고 어떤 자바 동시성 기능을 이용해 결과를 얻는지를 확인해보자.


동시성을 구현하는 자바의 진화

자바의 동시 프로그래밍 지원은 하드웨어, 소프트웨어 시스템, 프로그래밍 컨셉의 맞춰 진화해왔다. 처음에 자바는 Runnable과 Thread를 동기화된 클래스와 메서드를 이용해 잠갔다. 2004년 자바5는 좀더 표현력있는 동시성을 지원하는 특히 스레드 실행과 태스크 제출을 분리하는 ExecutorService 인터페이스, 높은 수준의 타입 즉, Runnable, Thread의 변형을 반환하는 'Callable<T>', 'Future<T>', 'Generic' 등을 지원했다. ExecutorServices는 Runnable과 Callable 둘다 실행할 수 있다. 이런 기능들 덕분에 멀티코어 CPU에서 손쉽게 병렬 프로그래밍을 구현할 수 있게 되었다.

 

executorService 인터페이스는 Executor 인터페이스를 상속받으며 Callable을 실행하는 submit이라는 메소드를 포함한다. Executor 인터페이스는 Runnable을 실행할 수 있는 execute 메서드만 포함한다. 

 

멀티코어 CPU에서 효과적으로 프로그래밍을 실행할 필요성이 커지면서 이후 자바 버전에서는 개선된 동시성 지원이 추가되었다. 자바 7에서는 분할 그리고 정복 알고리즘의 포크/조인 구현을 지원하는 java.util.concurrent.RecursiveTask가 추가되었고, 자바 8에서는 스트림과 새로 추가된 람다 지원에 기반한 병렬 프로세싱이 추가되었다.

 

자바는 Future를 조합하는 기능(Future 구현인 CompletableFuture)을 추가하면서 동시성 강화했고, 자바 9에서는 분산 비동기 프로그래밍을 명시적으로 지원한다. 이들 API는 매쉬업 애플리케이션 즉, 다양한 웹서비스를 이용하고 이들 정보를 실시간으로 조합해 사용자에게 제공하거나 추가 웹서비스를 통해 제공하는 종류의 애플리케이션을 개발하는데 필수적인 기초 모델과 툴킷을 제공한다. 

 

이 과정을 리액티브 프로그래밍이라고 부르며 자바 9에서는 발행-구독 프로토콜(java.util.concurrent.Flow 인터페이스)로 이를 지원한다. CompletableFuture와 java.util.concurrent.Flow의 궁극적인 목표는 가능한한 동시에 실행할 수 있는 독립적인 태스크를 가능하게 만들면서 멀티코어 또는 여러 기기를 통해 제공되는 병렬성을 쉽게 이용하는 것이다.

1) 스레드와 높은 수준의 추상화

프로세스는 운영체제에 한 개 이상의 스레드 즉, 본인이 가진 프로세스와 같은 주소 공간을 공유하는 프로세스를 요청함으로 태스크를 동시에 또는 협력적으로 실행할 수 있다. 멀티코어 설정에서는 스레드의 도움 없이 프로그램이 노트북의 컴퓨팅 파워를 모두 활용할 수 없다. 각 코어는 한 개 이상의 프로세스나 스레드에 할당될 수 있지만 프로그램이 스레드를 사용하지 않는다면 효율성을 고려해 여러 프로세서 코어 중 한 개만을 사용할 것이다.

 

실제로 네 개의 코어를 가진 CPU에서 이론적으로는 프로그램을 네 개의 코어에서 병렬로 실행함으로 실행 속도를 네 배까지 향상시킬 수 있다(물론 오버헤드로 인해 실제 네 배가 되긴 어렵다).

long sum = 0;
for (int i = 0; i < 1000000; i++) {
    sum += stats[i];
}

위 코드는 한개의 코어로 며칠동안 작업을 수행한다. 반면 아래 코드는 각각의 범위를 각각의 스레드에서 실행시키는 방식이다.

// 첫번째 스레드
long sum0 = 0;
for (int i = 0; i < 250000; i++) {
    sum0 += stats[i];
}

// ... 생략 ...

// 마지막 스레드
long sum3 = 0;
for (int i = 750000; i < 1000000; i++) {
    sum3 += stats[i];
}

메인 프로그램은 네 개의 스레드를 완성하고 자바의 .start()로 실행한 다음, .join()으로 완료될때까지 기다렸다가 전체합(sum = sum0 + ... + sum3;)을 계산하게 된다. 자바 스트림은 외부 반복(명시적 루프) 대신 내부 반복을 통한 손쉬운 병렬성을 제공한다.

sum = Arrays.stream(stats).parallel().sum();

 

결론적으로 병렬 스트림 반복은 명시적으로 스레드를 사용하는 것에 비해 높은 수준의 개념이다. 스트림을 이용해 스레드 사용 패턴을 추상화할 수 있다. 스트림으로 추상화하는 것은 디자인 패턴을 적용하는 것과 비슷하며, 쓸모없는 코드가 라이브러리 내부로 구현되면서 복잡성도 줄어든다는 장점이 더해진다.

 

자바 7의 java.util.concurrent.ReducrsiveTask은 포크/조인 스레드 추상화로 분할 정복 알고리즘을 병렬화하면서 멀티코어 머신에서 배열의 합을 효율적으로 계산하는 높은 수준의 방식을 제공했다.

 

추가적인 스레드 추상화를 살펴보기에 앞서 추상화 기반 개념에 해당하는 자바5의 ExecutorService 개념과 스레드 풀을 살펴보도록 한다.

2) Executor와 스레드 풀

자바5는 Executor 프레임워크와 스레드 풀을 통해 스레드 사용성을 높은 수준으로 끌어올리는 즉, 자바 프로그래머가 태스크 제출과 실행을 분리할 수 있는 기능을 제공했다.

 

스레드의 문제

자바 스레드는 직접 운영체제 스레드에 접근한다. 운영체제 스레드를 만들고 종료하려면 비싼 비용(페이지 테이블과 관련한 상호작용)을 치러야 하며, 운영체제 스레드의 숫제는 제한되어 있다. 운영체제가 지원하는 스레드 수를 초과해 사용하면 자바 애플리케이션이 예상치 못한 방식으로 크래시될 수 있으므로 기존 스레드가 실행되는 상태에서 계속 새로운 스레드를 만드는 상황이 일어나지 않도록 주의해야 한다.

 

보통 운영체제와 자바의 스레드 갯수가 하드웨어 스레드 갯수보다 많으므로 일부 운영 체제 스레드가 블록되거나 자고 있는 상황에서 모든 하드웨어 스레드가 코드를 실행하도록 할당된 상황에 놓을 수 있다. 예를 들어 2016 인텔 코어 i7-6900K 서버 프로세서는 8개의 코어를 가지며 각 코어는 두개의 대칭 멀티프로세싱(SMP: symmetric multiprocessing) 하드웨어 스레드를 포함하므로 하드웨어 스레드를 총 16개를 포함하는데 서버에는 프로세서를 여러개 포함할 수 있으므로 하드웨어 스레드 64개를 보통 보유할 수 있다. 반면 노트북은 하드웨어 스레드를 한두개 가지므로 다양한 기기에서 실행할 수 있는 프로그램에서는 미리 하드웨어 스레드 갯수를 추측하지 않는 것이 좋다. 한편 주어진 프로그램에서 상요할 최적의 자바 스레드 갯수는 사용할 수 있는 하드웨어 코어의 갯수에 따라 달라진다.

 

스레드 풀이 더 좋은 이유

자바 ExecutorService는 태스크를 제출하고 나중에 결과를 수집할 수 있는 인터페이스를 제공한다. 프로그램은 newFixedThreadPool 같은 팩토리 메서드 중 하나를 이용해 스레드 풀을 만들어 사용할 수 있다.

ExecutorService newFixedThreadPool(int nThreads)

이 메서드는 워커 스레드라 불리는 nThreads를 포함하는 ExecutorService를 만들고 이들을 스레드 풀에 저장한다. 스레드 풀에서 사용하지 않는 스레드로 제출된 태스크를 먼저 온 순서대로 실행한다. 이들 태스크 실행이 종료되면 이들 스레드를 풀로 반환한다. 이 방식의 장점은 하드웨어에 맞는 수의 태스크를 유지함과 동시에 수 천개의 태스크를 스레드 풀에 아무 오버헤드 없이 제출할 수 있다는 점이다. 큐의 크기 조정, 거부 정책, 태스크 종류에 따른 우선순위 등 다양한 설정을 할 수 있다.

 

프로그래머가 코드를 통해 태스크(Runnable이나 Callable)을 제공하면 스레드가 이를 실행한다.

 

스레드 풀이 나쁜 이유

거의 모든 관점에서 스레드를 직접 사용하는 것보다 스레드 풀을 이용하는 것이 바람직하지만 두가지 "사항"을 주의해야 한다.

 

k 스레드를 가진 스레드 풀은 오직 k만큼의 스레드를 동시에 실행할 수 있다. 초과로 제출된 태스크는 큐에 저장되며 이전에 태스크 중 하나가 종료되기 전까지는 스레드에 할당하지 않는다. 불필요하게 많은 스레드를 만드는 일을 피할 수 있으므로 보통 이 상황은 아무 문제가 되지 않지만 잠을 자거나 I/O를 기다리거나 네트워크 연결을 기다리는 태스크가 있다면 주의해야 한다. I/O를 기다리는 블록 상황에서 이들 태스크가 워커 스레드에 할당된 상태를 유지하지만 아무 작업도 하지 않게 된다. 그러면 나머지 15개의 태스크를 두 스레드가 실행해야 하므로 작업 효율성이 예상보다 절반 가까이 떨어지게 된다. 처음 제출한 태스크가 기존 실행 중인 태스크가 나중의 태스크 제출을 기다리는 상황(Future의 일반적인 패턴)이라면 데드락에 걸릴 수도 있다. 핵심은 블록(자거나 이벤트를 기다리는)할 수 있는 태스크는 스레드 풀에 제출하지 말아야 한다는 것이지만 항상 이를 지킬 수 있는 것은 아니다.

 

중요한 코드를 실행하는 스레드가 죽는 일이 발생하지 않도록 보통 자바 프로그램은 main이 반환하기 전에 모든 스레드의 작업이 끝나길 기다린다. 따라서 프로그램을 종료하기 전에 모든 스레드 풀을 종료하는 습관을 갖는 것이 중요하다. (풀의 워커 스레드가 만들어진 다음 다른 태스크 제출을 기다리면서 종료되지 않은 상태일 수 있으므로) 보통 장기간 실행하는 인터넷 서비스를 관리하도록 오래 실행되는 ExecutorService를 갖는 것은 흔한일이다. 자바는 이런상황을 다룰 수 있도록 Thread.setDaemon 메서드를 제공한다.

3) 스레드의 다른 추상화: 중첩되지 않은 메소드 호출

병렬 스트림 처리와 포크/조인 프레임워크에서의 동시성과 현재의 동시성이 어떻게 다른지 명확히 알아보도록 전자 동시성에서는 한 개의 특별한 속성 즉, 태스크나 스레드가 메서드 호출 안에서 시작되면 그 메서드 호출은 반환하지 않고 작업이 끝나기를 기다렸다. 다시 말해 스레드 생성과 join()이 한 쌍처럼 중첩된 메서드 호출 내에 추가 되어 있다. 이를 엄격한 포크/조인 이라고 부른다.

 

시작된 태스크를 내부 호출이 아니라 외부 호출에서 종료하도록 기다리는 좀더 여유로운 방식의 포크/조인을 사용해도 비교적 안전하다. 여유로운 포크조인에 나타난대로 제공된 인터페이스를 사용자는 일반호출로 간주할 수도 있다. 

사용자 메서드 호출에 의해 스레드가 생성되고 메서드를 벗어나거나 계속 실행되는 동시성 형태도 존재한다.

이런 종류, 특히 메서드 호출자에 기능을 제공하도록 메서드가 반환된 후에도 만들어진 태스크 실행이 계속되는 메서드를 비동기 메서드라고 한다. 자바 8, 자바 9로 이런 메서드를 보다 손쉽게 다룰 수 있다. 우선은 이들 메서드를 사용할때 어떤 위험성이 있는지 확인해보자.

  - 스레드 실행은 메서드를 호출한 다음의 코드와 동시에 실행되므로 데이터 경쟁 문제를 일으키지 않도록 주의해야 한다.

  - 기존 실행 중이던 스레드가 종료되지 않은 상황에서 자바의 main() 메서드가 반환하면 어떻게 될까? 다음과 같은 두가지 방법이 있는데 어느 방법도 안전하지 못하다.

     1) 애플리케이션을 종료하지 못하고 모든 스레드가 실행을 끝낼때까지 기다린다.

     2) 애플리케이션 종료를 방해하는 스레드를 강제종료 시키고 애플리케이션을 종료한다.

 

첫번째 방법에서는 잊고서 종료를 못한 스레드에 의해 애플리케이션이 크래시될 수 있다. 또다른 문제로 디스크에 쓰기 I/O 작업을 시도하는 일련의 작업을 중단했을때 이로 인해 외부 데이터의 일관성이 파괴될 수 있다. 이들 문제를 피하려면 애플리케이션에서 만든 모든 스레드를 추적하고 애플리케이션을 종료하기 전에 스레드 풀을 포함한 모든 스레드를 종료하는 것이 좋다.

 

자바 스레드는 setDaemon() 메서드를 이용해 데몬 또는 비데몬으로 구분시킬 수 있다. 데몬 스레드는 애플리케이션이 종료될때 강제 종료되므로 디스크의 데이터 일관성을 파괴하지 않는 동작을 수행할때 유용하게 활용할 수 있는 반면, main() 메서드는 모든 비데몬 스레드가 종료될때까지 프로그램을 종료하지 않고 기다린다.

 

일반적으로 모든 하드웨어 스레드를 활용해 병렬성의 장점을 극대화하도록 프로그램 구조를 만드는 것 즉, 프로그램을 작은 태스크 단위로 구조화하는 것이 목표다. (하지만 태스크 변환 비용을 고려해 너무 작은 크기는 아니어야 한다)


동기 API와 비동기 API

자바8 스트림을 이용해 명시적으로 병렬 하드웨어를 이용할 수 있다. 두가지 단계로 병렬성을 이용할 수 있다. 첫번째로 외부 반복(명시적 for 루프)을 내부 반복(스트림 메서드 사용)으로 바꿔야 한다. 그리고 스트림에 parallel() 메서드를 이용하므로 자바 런타임 라이브러리가 복잡한 스레드 작업을 하지 않고 병렬로 요소가 처리되도록 할 수 있다. 루프가 실행될때 추측에 의존해야 하는 프로그래머와 달리 런타임 시스템은 사용할 수 있는 스레드를 더 정확하게 알고 있다는 것도 내부 반복의 장점이다.

 

int f(int x);
int g(int x);

int y = f(x);
int z = g(x);

System.out.println(y + z);

루프 기반의 계산을 제외한 다른 상황에서도 병렬성이 유용할 수 있다. f와 g를 실행하는데 오랜 시간이 걸린다고 가정하자. f, g의 작업을 컴파일러가 완전하게 이해하기 어려우므로 보통 자바 컴파일러는 코드 최적화와 관련한 아무 작업도 수행하지 않을 수 있다. f와 g가 서로 상호작용하지 않는다는 사실을 알고 있거나 상호작용을 전혀 신경쓰지 않는다면 f와 g를 별도의 CPU 코어로 실행함으로 f와 g 중 오래 걸리는 작업의 시간으로 합계 구하는 시간을 단축할 수 있다. 별도의 스레드로 f와 g를 실행해 이를 구현할 수 있다. 의도는 좋지만 단순했던 코드가 복잡하게 변한다.

class ThreadExample {
    
    public static void main(String[] args) throws InterruptedException {
        int x = 1337;
        Result result = new Result();
        
        Thread t1 = new Thread(() -> { result.left = f(x); });
        Thread t2 = new Thread(() -> { result.right = g(x); });
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(result.left + result.right);
    }
    
    private static class Result {
        private int left;
        private int right;
    }
}

Runnable 대신 Future API 인터페이스를 이용해 코드를 더 단순화할 수 있다. 이미 ExecutorService로 쓰레드 풀을 설정했다고 가정하면 다음처럼 코드를 구현할 수 있다.

public class ExecutorServiceExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        int x = 1337;
        
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        Future<Integer> y = executorService.submit(() -> f(x));
        Future<Integer> z = executorService.submit(() -> g(x));
        System.out.println(y.get() + z.get());
        
        executorService.shutdown();
    }
}

위 코드에도 여전히 복잡성이 존재한다. 명시적 반복으로 병렬화를 수행하던 코드를 스트림을 이용해서 내부 반복으로 바꿨던 것처럼 이 문제를 해결할 수 있다. 비동기 API라는 기능으로 API를 바꿔서 해결할 수 있다.

 

동기 API는 보통 결과가 나올때까지 물리적인 반환을 지연시킴으로 블로킹 API로도 알려져 있다. 반면 비동기 API는 블록하지 않는 I/O를 구현한다. 비동기 API는 보통 결과를 기다리지 않고 I/O 작업을 시작시킨다.

1) Future 형식 API

Future<Integer> f(int x);
Future<Integer> g(int x);

Future<Integer> y = f(x);
Futuer<Integer> z = g(x);
System.out.println(y.get() + z.get());

메서드 f는 호출 즉시 자신의 원래 바디를 평가하는 태스크를 포함하는 Future를 반환한다. 마찬가지로 메소드 g도 Future를 반환하며 세번째 코드는 get() 메서드를 이용해 두 Future가 완료되어 결과가 합쳐지기를 기다리게 된다.

 

API는 그대로 유지하고 g를 그대로 호출하면서 f에만 Future를 적용할 수 있다. 하지만 조금 더 큰 프로그램에서는 두가지 이유로 이런 방식을 사용하지 않는다.

  - 다른 상황에서는 g에도 Future 형식이 필요할 수 있으므로 API 형식을 통일하는 것이 바람직하다.

  - 병렬 하드웨어로 프로그램 실행 속도를 극대화하려면 여러 작은 하지만 합리적인 크기의 태스크로 나누는 것이 좋다.

2) 리액티브 형식 API

두번째 대안에서 핵심은 f, g의 시그니처를 바꿔서 콜백 형식의 프로그래밍을 이용하는 것이다.

void f(int x, IntConsumer dealWithResult);

처음에는 두번째 대안이 이상해 보일 수 있다. f가 값을 반환하지 않는데 어떻게 프로그램이 동작할까? f에 추가 인수로 콜백(람다)을 전달해서 f의 바디에서는 return 문으로 결과를 반환하는 것이 아니라 결과가 준비되면 이를 람다로 호출하는 태스크를 만들면 된다. 다시 말해 f는 바디를 실행하면서 태스크를 만든 다음 즉시 반환하므로 코드 형식이 다음처럼 바뀐다.

public class CallbackStyleExample {
    public static void main(String[] args) {
        
        int x = 1337;
        Result result = new Result();
        
        f(x, (int y) -> {
            result.left = y;
            System.out.println((result.left + result.right));
        });
        
        g(x, (int z) -> {
            result.right = z;
            System.out.println((result.left + result.right));
        });
    }
}

f와 g의 호출 합계를 정확하게 출력하지 않고 상황에 따라 먼저 계산된 결과를 출력한다. 락을 사용하지 않으므로 값을 두 번 출력할 수 있을 뿐더러 때로는 +에 제공된 두 피연산자가 println이 호출되기 전에 업데이트될 수도 있다. 다음처럼 두가지 방법으로 이 문제를 보완할 수 있다.

  - if-then-else를 이용해 적절한 락을 이용해 두 콜백이 모두 호출되었는지 확인한 다음 println을 호출해 원하는 기능을 수행할 수 있다.

  - 리액티브 형식의 API는 보통 한 결과가 아니라 일련의 이벤트에 반응하도록 설계되었으므로 Future를 이용하는 것이 더 적절하다.

 

리액티브 형식의 프로그래밍으로 메서드 f와 g는 dealWithResult 콜백을 여러번 호출 할 수 있다. 원래의 r, g 함수는 오직 한번만 return을 사용하도록 되어있다. 마찬가지로 Future도 한번만 완료되면 그 결과는 get()으로 얻을 수 있다. 리액티브 형식의 비동기 API는 자연스럽게 일련의 값(스트림으로 연결)을, Future 형식의 API는 일회성의 값을 처리하는데 적합하다.

 

어떤 API를 사용할 것인지 미리 잘 생각해야 한다. 하지만 API는 명시적으로 스레드를 처리하는 코드에 비해 사용 코드를 더 단순하게 만들어주며 높은 수준의 구조를 유지할 수 있게 도와준다. 또한 (a)계산이 오래 걸리는 메소드, (b)네트워크나 사람의 입력을 기다리는 메소드에 이들 API를 잘 활용하면 애플리케이션의 효율성이 크게 향상된다. (b)의 상황에서는 리소스를 낭비하지 않고 효율적으로 하단의 시스템을 활용할 수 있다는 장점을 추가로 제공한다.

3) 잠자기(그리고 기타 블로킹 동작)는 해로운 것으로 간주

사람과 상호작용하거나 어떤 일이 일정 속도로 제한되어 일어나는 상황의 애플리케이션을 만들때 자연스럽게 sleep() 메서드를 사용할 수 있다. 하지만 스레드는 잠들어도 여전히 시스템 자원을 점유한다. 스레드를 단지 몇개 사용하는 상황에서는 큰 문제가 아니지만 스레드가 많아지고 그 중 대부분이 잠을 잔다면 문제가 심각해진다.

 

스레드 풀에서 잠을 자는 태스크는 다른 태스크가 시작되지 못하게 막으므로 자원을 소비한다는 사실을 기억하도록 한다. (운영체제가 이들 태스크를 관리하므로 일단 스레드로 할당된 태스크는 중지시키지 못한다)

 

물론 스레드 풀에서 잠자는 스레드만 실행을 막는 것은 아니다. 모든 블록 동작도 마찬가지다. 블록 동작은 다른 태스크가 어떤 동작을 완료하기를 기다리는 동작(예를 들어, Future에 get() 호출)과 외부 상호작용(예를 들어, 네트워크, 데이터베이스 서버에서 읽기 작업을 기다리거나, 키보드 입력같은 사람의 상호작용을 대기)을 기다리는 동작 두가지로 구분할 수 있다.

 

이 상황에서 무엇을 할 수 있을까? 이상적으로는 절대 태스크에서 기다리는 일을 만들지 말거나 아니면 코드에서 예외를 일으키는 방법으로 이를 처리할 수 있다. 태스크를 앞과 뒤 두부분으로 나누고 블록되지 않을때만 뒷부분을 자바가 스케줄리하도록 요청할 수 있다.

 

다음은 한 개의 작업을 갖는 코드이다.

work1();
Thread.sleep(10000);
work2();

이를 아래 코드와 비교해보자.

public class ScheduledExecutorServiceExample {
    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
        
        work1();
        // work1()이 끝난 다음 10초 뒤에 work2()를 개별 태스크로 스케줄함.
        scheduledExecutorService.schedule(ScheduledExecutorServiceExample::work2, 10, TimeUnit.SECONDS);;
        
        scheduledExecutorService.shutdown();
    }
    
    public static void work1() {
        System.out.println("Hello from Work1!");
    }
    
    public static void work2() {
        System.out.println("Hello from Work2!");
    }
}

두 태스크 모두 스레드 풀에서 실행된다고 가정해보자. 첫번재 코드는 어떻게 실행될까? 먼저 코드는 스레드 풀 큐에 추가되며 나중에 차례가 되면 실행된다. 하지만 코드가 실행되면 워커 스레드를 점유한 상태에서 아무것도 하지 않고 10초를 잔다. 그리고 깨어나서 work2()를 실행한 다음 작업을 종료하고 워커 스레드를 해제한다. 반면에 두번째 코드는 work1()을 실행하고 종료한다. 하지만 work2()가 10초 뒤에 실행될 수 있도록 큐에 추가한다.

 

두번째 코드가 더 좋은 이유는 무엇일까? 두 코드 모두 같은 동작을 수행한다. 두 코드의 다른 점은 첫번째 코드는 sleep  상태에서 스레드 자원을 점유하는 반면, B는 다른 작업이 실행될 수 있도록 허용한다는 점이다(스레드를 사용할 필요가 없이 메모리만 조금더 사용)

 

태스크를 만들때는 이런 특징을 잘 활용해야 한다. 태스크가 실행되면 귀중한 자원을 점유하므로 태스크가 끝나서 자원을 해제하기 전까지 태스크를 계속 실행해야 한다. 태스크를 블록하는 것보다는 다음 작업을 태스크로 제출하고 현재 태스크를 종료하는 것이 바람직하다.

 

가능하다면 I/O 작업에도 이 원칙을 적용하는 것이 좋다. 고전적으로 읽기 작업을 기다리는 것이 아니라 블록하지 않는 '읽기 시작' 메서드를 호출하고 읽기 작업이 끝나면 이를 처리할 다음 태스크를 런타임 라이브러리에 스케줄하도록 요청하고 종료한다.

 

이런 디자인 패턴을 따르려면 읽기 어려운 코드가 많아지는 것처럼 보일 수 있다. 하지만 자바 CompletableFuture 메서드는 Future.get()을 이용해 명시적으로 블록하지 않고 콤비네이터를 사용함으로 이런 형식의 코드를 러타임 라이브러리 내에 추사한다.

 

마지막으로 스레드의 제한이 없고 저렴하다면 위의 두 코드는 사실상 같다. 하지만 스레드에는 제한이 있고 저렴하지 않으므로 잠을 자거나 블록해야 하는 여러 태스크가 있을때 가능하면 두번째 코드방식을 따르는 것이 좋다.

4) 현실성 확인

새로운 시스템을 설계할때 시스템을 많은 작은 동시 실행되는 태스크로 설계해서 블록할 수 있는 모든 동작을 비동기 호출로 구현한다면 병렬 하드웨어를 최대한 활용할 수 있다. 하지만 현실적으로 '모든 것은 비동기'라는 설계 원칙을 어겨야 한다. 자바 1.4에서부터 비블록 IO 기능(java.nio)을 제공했는데 이들은 조금 복잡하고 잘 알려지지 않았다. 실제로 자바의 개선된 동시성 API를 이용해 유익을 얻을 수 있는 상황을 찾아보고 모든 API를 비동기로 만드는 것을 따지지 말고 개선된 동시성 API를 사용해보아도 좋다.

 

네트워크 서버의 블록/비블록 API를 일관적으로 제공하는 Netty(https://netty.i/) 같은 새로운 라이브러리를 사용하는 것도 도움이 된다.

5) 비동기 API에서의 예외 처리

Future나 리액티브 형식의 비동기 API에서 호출된 메서드의 실제 바디는 별도의 스레드에서 호출되며 이때 발생하는 어떤 에러는 이미 호출자의 실행 범위와는 관계가 없는 상황이 된다. 예상치 못한 일이 일어나면 예외를 발생시켜 다른 동작이 실행되어야 한다. 어떻게 이를 실현할 수 있을까?

 

Future를 구현한 CompletableFuture에서는 런타임 get() 메서드에 예외를 처리할 수 있는 기능을 제공하며 예외에서 회복할 수 있도록 exeptionally() 같은 메서드도 제공한다. 

 

리액티브 형식의 비동기 API에서는 return 대신 기존 콜백이 호출되므로 예외가 발생했을대 실행될 추가 콜백을 만들어 인터페이스를 만들어야 한다. 다음 예제처럼 리액티브 API에 여러 콜백을 포함해야 한다.

void f(int x, Consumer<Integer> dealWithResult, Consumer<Throwable> dealWithException);

f의 바디는 다음을 수행할 수 있다.

dealWithException(e);

콜백이 여러 개면 이를 따로 제공하는 것보다는 한 객체로 이 메서드를 감싸는 것이 좋다. 예를 들어, 자바9 Flow API에서는 여러 콜백을 한 객체(네 개의 콜백을 각각 대표하는 네 메서드를 포함하는 Subscriber<T> 클래스)로 감싼다.

void onComplete()
void onError(Throwable throwable)
void onNext(T item)

값이 있을때(onNext), 도중에 에러가 발생했을 때(onError), 값을 다 소진했거나 에러가 발생해서 더이상 처리할 데이터가 없을때(onComplete) 각각의 콜백이 호출된다. 이전의 f에 이를 적용하면 다음과 같이 시그니처가 바뀐다.

void f(int x, Subscriber<Integer> s);

f의 바디는 다음처럼 Throwable을 가리키는 t로 예외가 일어났음을 가리킨다.

s.onError(t);

박스와 채널 모델

동시성 모델을 가장 잘 설계하고 개념화를 위해 아래 그림을 살펴보도록 한다. 이 기법을 박스와 채널 모델(box-and-channel model) 이라고 부른다. f(x)+g(x)의 계산을 일반화해서 정수와 관련된 간단한 상황이 있다고 가정한다. f나 g를 호출하거나 p 함수에 인수 x를 이용해 호출하고, 그 결과를 q1과 q1에 전달하며 다시 이 두 호출의 결과로 함수 r을 호출한 다음 결과를 출력한다. 

int t = p(x);
System.out.println( r(q1(t), q2(t)) );

겉보기에는 깔끔해보이지만 자바가 q1, q2를 호출하는데, 이는 하드웨어 병렬성 활용과 거리가 멀다.

 

Future를 이용해 f, g를 병렬로 평가하는 방법도 있다.

int t = p(x);
Future<Integer> a1 = executorService.submit(() -> q1(t));
Future<Integer> a2 = executorService.submit(() -> q2(t));
System.out.println( r(a1.get(), a2.get()) );

이 예제에서는 박스와 채널 다이그램의 모양상 p와 r을 Future로 감싸지 않았다. p는 다른 어떤 작업보다 먼저 처리해야 하며, r은 모든 작업이 끝난 다음 가장 마지막으로 처리해야 한다. 위 코드에서 병렬성을 극대화하려면 모든 다섯 함수(p, q1, q2, r, s)를 Future로 감싸야 할 것이다.

 

시스템에서 많은 작업이 동시에 실행되고 있지 않다면 이 방법도 잘 동작할 수 있다. 하지만 시스템이 커지고 각각의 많은 박스와 채널 다이어그램이 등장하고 각각의 박스는 내부적으로 자신만의 박스와 채널을 사용한다면 문제가 달라진다. 이런 경우, 많은 태스크가 get() 메서드를 호출해 Future가 끝나기를 기다리는 상태에 놓을 수 있다. 결과적으로 하드웨어의 병렬성을 제대로 활용하지 못하거나 심지어 데드락에 걸릴 수 있다.

 

또한 대규모 시스템 구조에서 얼마나 많은 수의 get()을 감당할 수 있는지 이해하기 어렵다. 자바8에서는 CompletableFuture와 Combicators를 이용해 문제를 해결한다. 두 Function이 있을때 compose(), andThen() 등을 이용해 다른 Function을 얻을 수 있다. add1은 정수 1을 더하고 dble은 정수를 두배로 만든다고 가정하면 인수를 두배로 만들고 결과에 2를 더하는 Function을 다음처럼 구현할 수 있다.

Function<Integer, Integer> myfun = add1.andThen(dble);

하지만 박스와 채널 다이어그램은 콤비네이터로도 구현할 수 있는데, 자바 Function p, q1, q2, BiFunction r를 사용하면 된다. thenBoth나 thenCombine은 자바 Function과 BiFunction 클래스의 일부가 아니다.

p.thenBoth(q1,q2).thenCombine(r)

콤비네이터와 CompletableFuture의 사상은 매우 비슷하며, get()을 이용해 태스크가 기다리게 만드는일을 피하게 한다.

 

박스와 채널 모델을 이용하면 생각과 코드를 구조화할 수 있다. 박스와 채널 모델로 대규모 시스템 구현의 추상화 수준을 높일 수 있다. 박스(또는 프로그램의 콤비네이터)로 원하는 연산을 표현(계산은 나중에 이루어짐)하면 계산을 손으로 코딩한 결과보다 더 효율적일 것이다. 콤비네이터는 수학적 함수뿐 아니라 Future와 리액티브 스트림 데이터에도 적용할 수 있다. 박스와 채널 다이어그램의 각 채널을 마블 다이어그램으로 표현할 수도 있다. 박스와 채널 모델은 병렬성을 직접 프로그래밍하는 관점을 콤비네이터를 이용해 내부적으로 작업을 처리하는 관점으로 바꿔준다. 

 

자바 8 스트림은 자료 구조를 반복해야 하는 코드를 내부적으로 작업을 처리하는 스트림 콤비네이터로 바꿔준다.


CompletableFuture와 콤비네이터를 이용한 동시성

동시 코딩 작업을 Future 인터페이스로 생각하고 유도한다는 점이 Future 인터페이스의 문제다. 하지만 주어진 연산으로 Future를 만들고, 이를 실행하고, 종료되길 기다리는 등 Future는 FutureTask 구현을 뛰어넘는 몇가지 동작을 제공했다. 이후 버전의 자바에서는 RecursiveTask 같은 더 구조화된 지원을 제공했다.

 

자바8에서는 Future 인터페이스의 구현인 CompletableFuture를 이용해 Future를 조합할 수 있는 기능을 추가하기도 있다. ComposableFuture가 아니라 CompletableFuture라고 부르는 이유는 뭘까?

 

일반적으로 Future는 실행해서 get()으로 결과를 얻을 수 있는 Callable로 만들어진다. 하지만 CompletableFuture는 실행할 코드 없이 Future를 만들 수 있도록 허용하며 complete() 메서드를 이용해 나중에 어떤 값을 이용해 다른 스레드가 이를 완료할 수 있고 get()으로 값을 얻을 수 있도록 허용한다

 

f(x)와 g(x)를 동시에 실행해 합계를 구하는 코드를 다음처럼 구현할 수 있다.

public class CFComplete {
    
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        int x = 1337;
        
        CompletableFuture<Integer> a = new CompletableFuture<>();
        executorService.submit(() -> a.complete(f(x)));
        int b = g(x);
        System.out.println(a.get() + b);
        
        executorService.shutdown();
    }
}
public class CFComplete {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        int x = 1337;
        
        CompletableFuture<Integer> b = new CompletableFuture<>();
        executorService.submit(() -> b.complete(g(x)));
        int a = f(x);
        System.out.println(a + b.get());
        
        executorService.shutdown();
    }
}

위 두 코드는 f(x)의 실행이 끝나지 않거나 아니면 g(x)의 실행이 끝나지 않는 상황에서 get()을 기다려야 하므로 프로세싱 자원을 낭비할 수 있다. 자바 9의 CompletableFuture를 이용하면 이 상황을 해결할 수 있다.

 

다음과 같은 상황에서 스레드를 완벽하게 활용할 수 있는 태스크를 어떻게 구현할 수 있으지 생각한다. f(x), g(x)를 실행하는 두 개의 활성 스레드가 있는데 한 스레드는 다른 스레드가 return 문을 실행해 종료될때까지 기다렸다가 시작한다. f(x)를 실행하는 한 태스크, g(x)를 실행하는 두 번째 태스크, 합계를 계산하는 세번째 태스크(이전의 두 태스크를 재활용 할 수 있다) 세 개를 이용하는 것이다. 하지만 처음 두 태스크가 실행되기 전까지 세번째 태스크는 실행할 수 없다. Future를 조합해 이 문제를 해결할 수 있다.

 

동작을 조합하는 것은 다른 언어에서는 이미 많이 사용하는 강력한 프로그래밍 구조 사상이지만 자바에서는 자바 8의 람다가 추가되면서 겨우 걸음마를 땐 수준이다. 다음 예제처럼 스트림에 연산을 조합하는 것도 하나의 조합 예이다.

myStream.map(...).filter(...).sum()

compose(), andThen() 같은 메서드를 두 Function에 이용해 다른 Function을 얻을 수도 있다. CompletableFuture<T>에 thenCombine 메서드를 사용하므로 두 연산 결과를 더 효과적으로 이용할 수 있다. thenCombine() 메서드는 다음과 같은 시그니처를 갖고 있다.

CompletableFuture<V> thenCombine(CompletableFuture<U> other, BiFunction<T, U, V> fn)

이 메서드는 두 개의 CompletableFuture 값(T, U 결과형식)을 받아 한 개의 새 값을 만든다. 처음 두 작업이 끝나면 두 결과 모두에 fn을 적용하고 블록하지 않은 상태로 결과 Future를 반환한다.

public class CFCombine {
    public static void main(String[] args) throws ExcutionException, InterruptedException {
        
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        int x = 1337;
        
        CompletableFuture<Integer> a = new CompletableFuture<>();
        CompletableFuture<Integer> b = new CompletableFuture<>();
        CompletableFuture<Integer> c = a.thenCombine(b, (y, z)->y + z);
        executorService.submit(() -> a.complete(f(x)));
        executorService.submit(() -> b.complete(g(x)));
        
        System.out.println(c.get());
        executorService.shutdown();
    }
}

thenCombine 행이 핵심이다. Future a와 Future b의 결과를 알지 못한 상태에서 thenCombine은 두 연산이 끝났을때 스레드 풀에서 실행된 연산을 만든다. 결과를 추가하는 세번째 연산 c는 다른 두 작업이 끝날때가지는 스레드에서 실행되지 않는다(먼저 시작해서 블록되지 않는 점이 특징). 따라서 기존의 두가지 버전의 코드에서 발생했던 블록 문제가 어디서도 일어나지 않는다. Future의 연산이 두번째 종료되는 상황에서 실제 필요한 스레드는 한개지만 스레드 풀의 두 스레드가 여전히 활성 상태이다. 이전의 두 버전에서 y+z 연산은 f(x) 또는 g(x)를 실행(블록될 가능성이 있는)한 같은 스레드에서 실행했다.

상황에 따라서는 get()을 기다리는 스레드가 큰 문제가 되지 않으므로 기존 자바8의 Future를 이용한 방식도 해결 방법이 될 수 있다. 하지만 어떤 상황에서는 많은 수의 Future를 사용해야 한다. 이런 상황에서는 CompletableFuture와 콤비네이터를 이용해 get()에서 블록하지 않을 수 있고 그렇게 함으로 병렬 실행의 효율성은 높이고 데드락을 피하는 방안으로 코드를 구현할 수 있다.


발행-구독 그리고 리액티브 프로그래밍

Future와 CompletableFuture은 독립적 실행과 병렬성이라는 정식적 모델에 기반한다. 연산이 끝나면 get()으로 Future의 결과를 얻을 수 있다. 따라서 Future는 한 번만 실행해 결과를 제공한다.

 

반면 리액티브 프로그래밍은 시간이 흐르면서 여러 Future 같은 객체를 통해 여러 결과를 제공한다. 온도계 객체는 매 초마다 온 값을 반복적으로 제공한다. 또 다른 예로 웹서버 컴포넌트 응답을 기다리는 리스너 객체를 생각할 수 있다. 이 객체는 네트워크에 HTTP 요청이 발생하길 기다렸다가 이후에 결과데이터를 생산한다. 그리고 다른 코드에서 온도 값 또는 네트워크 결과를 처리한다. 그리고 온도계와 리스너 객체는 다음 결과를 처리할 수 있도록 온도 결과나 다른 네트워크 요청을 기다린다.

 

이 두 예제에서 Future 같은 동작이 모두 사용되었지만 한 예제에서는 한 번의 결과가 아니라 여러 번의 결과가 필요하다. 또한, 네트워크 결과의 경우 모든 결과가 똑같이 중요한 반면 온도계 예제에서는 대부분의 사람에게 가장 최근의 온도만 중요하다. 이런 종류의 프로그래밍을 리액티브라고 부르는 이유가 뭘까? 이는 낮은 온도를 감지했을때 이에 반응(react)하는 부분이 존재하기 때문이다.

 

프로그램이 스트림 모델에 잘 맞는 상황이라면 가장 좋은 구현이 될 수 있다. 하지만 보통 리액티브 프로그래밍 패러다임은 비싼편이다. 주어진 자바 스트림은 한 번의 단말 동작으로 소비될 수 있다. 스트림 패러다임은 두 개의 파이프라인으로 값을 분리(포크처럼)하기 어려우며 두 개로 분리된 스트림에서 다시 결과를 합치기도(조인처럼) 어렵다. 스트림은 선형적인 파이프라인 처리 기법에 알맞다.

 

자바 9에서는 java.util.concurrent.Flow의 인터페이스에 발행-구독 모델을 적용해 리액티브 프로그래밍을 제공한다. 자바9 Flow API는 다음 세가지로 간단하게 정리할 수 있다.

  - 구독자가 구독할 수 있는 발행자.

  - 이 연결을 구독(subscription)이라 한다.

  - 이 연결을 이용해 메시지(또는 이벤트로 알려짐)을 전송한다.

1) 두 플로를 합치는 예제

두 정보 소스로부터 발생하는 이벤트를 합쳐서 다른 구독자가 볼 수 있도록 발행하는 예를 통해 발행-구독의 특징을 간단하게 확인할 수 있다. 스프레드 시트 셀에서 "=C1+C2" 공식을 포함하는 셀 C3을 만들어 본다. C1이나 C2의 값이 갱신되면 C3에도 새로운 값이 반영된다.

private class SimpleCell {
    private int value = 0;
    private String name;
    public SimpleCell(String name) {
        this.name = name;
    }
}
SimpleCell c2 = new SimpleCell("C2");
SimpleCell c1 = new SimpleCell("C1");

c1이나 c2의 값이 바뀌었을때 c3가 두값을 더하도록 어떻게 지정할 수 있을까? c2과 c2에 이벤트가 발생했을때 c3를 구독하도록 만들어야 한다. 그러려면 다음과 같은 인터페이스 Publisher<T>가 필요하다.

interface Publisher<T> {
    void subscribe(Subscriber<? super T> subscriber);
}

이 인터페이스는 통신할 구독자를 인수로 받는다. Subscriber<T> 인터페이스는 onNext라는 정보를 전달할 단순 메서드를 포함하며 구현자가 필요한대로 이 메서드를 구현할 수 있다.

interface Subscriber<T> {
    void onNext(T t);
}

이 두개념을 어떻게 결합할 수 있을까? 사실 Cell은 Publisher(셀의 이벤트에 구독할 수 있음)이며 동시에 Subscriber(다른 셀의 이벤트에 반응함)임을 알 수 있다.

private class SimpleCell implements Publisher<Integer>, Subscriber<Integer> {
    private int value = 0;
    private String name;
    private List<Subscriber> subscribers = new ArrayList<>();
    
    public SimpleCell(String name) {
        this.name = name;
    }
    
    @Override
    public void subscribe(Subscriber<? super Integer> subscriber) {
        subscribers.add(subscriber);
    }
    
    // 새로운 값이 있음을 모든 구독자에게 알리는 메서드
    private void notifyAllSubscribers() {
        subscribers.forEach(subscriber -> subscriber.onNext(this.value));
    }
    
    @Override
    public void onNext(Integer newValue) {
        this.value = newValue; // 구독한 셀에 새 값이 생겼을때, 값을 갱신해서 반응함.
        System.out.println(this.name + ":" + this.value); // 값을 콘솔로 출력하지만 UI의 셀을 갱신할 수 있음
        notifyAllSubscribers(); // 값이 갱신되었음을 모든 구독자에게 알림.
    }
}

 

다음 간단한 예제를 시도해보자.

SimpleCell c3 = new SimpleCell("C3");
SimpleCell c2 = new SimpleCell("C2");
SimpleCell c1 = new SimpleCell("C1");

c1.subscribe(c3);

c1.onNext(10); // C1의 값을 10으로 갱신
c2.onNext(20); // C2의 값을 20으로 갱신

C3는 직접 C1을 구독하므로 다음과 같은 결과(C1:10, C3:10, C2:20)가 출력된다. 그럼 연산에 대한 내용은 어떻게 구현하는지도 살펴보자.

public class ArithmeticCell extends SimpleCell {
    private int left;
    private int right;
    
    public ArithmeticCell(String name) {
        super(name);
    }
    
    public void setLeft(int left) {
        this.left = left;
        onNext(left + this.right); // 셀 값을 갱신하고 모든 구독자에게 알림
    }
    
    public void setRight(int right) {
        this.right = right;
        onNext(right + this.left); // 셀 값을 갱신하고 모든 구독자에게 알림
    }
}

다음처럼 조금 더 실용적인 예제로 시도할 수 있다.

ArithmeticCell c3 = new ArithmeticCell("C3");
SimpleCell c2 = new SimpleCell("C2");
SimpleCell c1 = new SimpleCell("C1");

c1.subscribe(c3::setLeft);
c2.subscribe(c3::setRight);

c1.onNext(10);  // C1의 값을 10으로 갱신
c2.onNext(20);  // C2의 값을 20으로 갱신
c1.onNext(15);  // C1의 값을 15으로 갱신

/*
 * 결과 콘솔
 * C1:10
 * C3:10
 * C2:20
 * C3:30
 * C1:15
 * C3:35
 */

결과를 통해 C1의 값이 15로 갱신되었을때 C3이 즉시 반응해 자신의 값을 갱신한다는 사실을 확인할 수 있다. 발행자-구독자 상호작용의 장점은 발행자-구독자의 그래프를 설정할 수 있다는 점이다. "C5=C3+C4"처럼 C3와 C4에 의존하는 새로운 셀 C5를 만들수 있다.

ArithmeticCell c5 = new ArithmeticCell("C5");
ArithmeticCell c3 = new ArithmeticCell("C3");

SimpleCell c4 = new SimpleCell("C4");
SimpleCell c2 = new SimpleCell("C2");
SimpleCell c1 = new SimpleCell("C1");

c1.subscribe(c3::setLeft);
c2.subscribe(c3::setRight);

c3.subscribe(c5::setLeft);
c4.subscribe(c5::setRight);

 

데이터가 발행자(생산자)에서 구독자(소비자)로 흐름에 착안해 개발자는 이를 upstream 또는 downstream 이라고 부른다. 위 예제에서 데이터 newValue는 업스트림 onNext() 메서드로 전달되고 notifyAllSubscribers() 호출을 통해 다운스트림 onNext() 호출로 전달된다.

 

실생활에서 플로를 사용하려면 onNext 이벤트 외에 onError나 onComplete 같은 메서드를 통해 데이터 흐름에서 예외가 발생하거나 데이터 흐름이 종료되었음을 알 수 있어야 한다. 자바 9 플로 API의 Subscriber에서는 실제 onError와 onComplete를 지원한다. 기존의 옵저버 패턴에 비해 새로운 API 프로토콜 더욱 강력해졌다.

 

간단하지만 플로 인터페이스의 개념을 복잡하게 만든 두가지 기능은 압력과 역압력이다. 처음에는 이 두 기능이 별로 중요해 보이지 않을 수 있지만 스레드 활요에서 이들 기능은 필수다. 온도계가 매 초마다 온도를 보고했는데 기능이 업그레이드되면서 매 밀리초마다 온도계를 보고한다고 생각해보자. 이렇게 빠른 속도로 발생하는 이벤트를 아무 문제없이 처리할 수 있을까? 이런 상황을 압력(pressure) 라고 부른다.

 

공에 담긴 메시지를 포함하는 수직 파이프를 상상해보자. 이런 상황에서는 출구로 추가될 공의 숫자를 제한하는 역압력 같은 기법이 필요하다. 자바9 플로 API에서는 발행자가 무한의 속도로 아이템을 방출하는 대신 요청했을때만 다음 아이템을 보내도록 하는 request() 메소드를 제공한다. (push 모델이 아니라 pull 모델)

2) 역압력

위와 같은 방식으로 Subscriber 객체(onNext, onError, onComplete 메서드)를 통해 Publisher에게 전달해 발행자가 필요한 메서드를 호출할 수 있다. 이 객체는 Publisher에서 Subscriber로 정보를 전달한다. 정보의 흐름 속도를 역압력(흐름 제어)으로 제어 즉 Subscriber에서 Publisher로 정보를 요청해야 할 필요가 있을 수 있다. Publisher는 여러 Subscriber를 갖고 있으므로 역압력 요청이 한 연결에만 영향을 미쳐야 한다는 것이 문제가 될 수 있다. 자바9 플로 API의 Subscriber 인터페이스는 다음 메서드를 포함한다.

void onSubscribe(Subscription subscription);

Publisher와 Subscriber 사이에 채널이 연결되면 첫 이벤트로 이 메서드가 호출된다. Subscription 객체는 다음처럼 Subscriber와 Publisher와 통신할 수 있는 메서드를 포함한다.

interface Subscription {
    void cancel();
    void request(long n);
}

(콜백을 통한 역방향 소통 효과?) Publisher는 Subscription 객체를 만들어 Subscriber로 전달하면 Subscriber는 이를 이용해 Publisher로 정보를 보낼 수 있다.

3) 실제 역압력의 간단한 형태

한 번에 한 개의 이벤트를 처리하도록 발행-구독 연결을 구성하려면 다음과 같은 작업이 필요하다.

  - Subscriber가 OnSubscribe로 전달된 Subscription 객체를 subscription 같은 필드에 로컬로 저장한다.

  - Subscriber가 수많은 이벤트를 받지 않도록 onSubscribe, onNext, onError의 마지막 동작에 channel.request(1)을 추가해 오직 한 이벤트만 요청한다.

  - 요청을 보낸 채널에만 onNext, onError 이벤트를 보내도록 Publisher의 notifyAllSubscribers 코드를 바꾼다. (보통 여러 Subscriber가 속도를 유지할 수 있도록 Publisher는 새 Subscription을 만들어 각 Subscriber와 연결한다.)

 

구현이 간단해 보일 수 있지만 역압력을 구현하려면 여러가지 장단점을 생각해야 한다.

  - 여러 Subscriber가 있을때 이벤트를 가장 느린 속도로 보낼 것인가? 아니면 각 Subscriber에게 보내지 않은 데이터를 저장할 별도의 큐를 가질 것이다.

  - 큐가 너무 커지면 어떻게 해야 할까?

  - Subscriber가 준비 안되었다면 큐의 데이터를 폐기할 것인가?

 

위 질문의 답변은 데이터의 성격에 따라 달라진다. 한 온도 데이터를 잃어버리는 것은 그리 대수로운 일이 아니지만 은행 계좌에서 크레딧이 사라지는 것은 큰일이다. 당김 기반 리액티브 역압력이라는 개념을 들어본 적인 있을 것이다. 이 기법에서는 Subscriber가 Publisher로부터 요청을 당긴다는 의미에서 reactive pull-based이라 불린다. 결과적으로 이런 방식으로 역압력을 구현할 수 있다.


리액티브 시스템 vs 리액티브 프로그래밍

리액티브 시스템과 리액티브 프로그래밍은 다른 의미를 가지고 있다. 리액티브 시스템은 런타임 환경이 변화에 대응하도록 전체 아키텍처가 설계된 프로그램을 가리킨다. 리액티브 시스템이 가져야할 공식적인 속성은 반응성, 회복성, 탄력성 세가지 속성으로 요약할 수 있다. -> Ractive manifesto(http://www.reactivemanifesto.org)

 

반응성은 리액티브 시스템이 큰 작업을 처리하느라 간단한 질의의 응답을 지연하지 않고 실시간으로 입력에 반응하는 것을 의미한다. 회복성은 한 컴보넌트의 실패로 전체 시스템이 실패하지 않음을 의미한다. 네트워크가 고장났어도 이와 관계가 없는 질의에는 아무 영향이 없어야 하며 반응이 없는 컴포넌트를 향한 질의가 있다면 다른 대안 컴포넌트를 찾아야 한다. 탄력성은 시스템이 자신의 작업 부하에 맞게 적응하며 작업을 효율적으로 처리함을 의미한다. 각 큐가 원활하게 처리될 수 있도록 다양한 소프트웨어 서비스와 고나련된 작업자 스레드를 적절하게 재배치할 수 있다.

 

여러가지 방법으로 이런 속성을 구현하 ㄹ수 있지만 java.util.concurrent.Flow 관련된 자바 인터페이스에서 제공하는 리액티브 프로그래밍 형식을 이용하는 것도 주요 방법 중 하나이다. 이들 인터페이스 설계는 Reactive Manifesto의 네번째이자 마지막 속성 즉 메시지 주도 속성을 반영한다. 메시지 주도 시스템은 박스와 채널 모델에 기반한 내부 API를 갖고 있는데 여기서 컴포넌트는 처리할 입력을 기다리고 결과를 다른 컴포넌트로 보내면서 시스템이 반응한다.

 

 

Source >> "모던 자바 인액션" 15장