본문 바로가기

프로그래밍(TA, AA)/JVM 언어

[Spring] Web on Reactive Stack 문서(2) - WebClient, WebSocket

2. 웹클라이언트 (WebClient)

스프링 웹플럭스는 HTTP 요청을 위한 리액티브한 논 블로킹 WebClinet를 포함한다. 이 클라이언트는 선언적 구성을 위한 리액티브 타입을 사용하는 함수형의 능수능란한 API를 가지고 있다. 리액티브 라이브러리를 보라. 웹플럭스 클라이언트와 서버는 동일한 논 블로킹 코덱에 의존하여 요청과 응답 내용을 인코딩 및 디코딩한다.

 

WebClient는 내부적으로 HTTP 클라이언트 라이브러리에게 위임한다. 기본적으로 Reactor Netty를 사용하고, 제티 리액티브 HttpClient를 위한 내장형 지원이 있다. 그리고 ClientHttpConnector를 통해 다른 라이브러리도 사용 가능하다.


2.1. 설정 (Configuration)

WebClient를 생성하는 가장 단순한 방법은 스태틱 팩토리 메서드를 사용하는 것이다.

  - WebClient.create()

  - WebClinet.create(String baseUrl)

 

위의 두 메서드는 리액터 네티를 디폴트 세팅으로 사용한다. 클래스패스에 io.projectreactor.netty:reactor-netty가 존재해야 한다.

 

WebClient.builder() 를 몇가지 옵션과 함께 사용할 수도 있다.

  - uriBuilderFactory: 기본 URL으로 사용하기 위한 커스터마이징된 UriBuilderFactory.

  - defaultHeader: 모든 요청에 대한 헤더.

  - defaultCookie: 모든 요청에 대한 쿠키.

  - defaultRequest: 모든 요청을 커스터마이징하기 위한 Consumer.

  - filter: 모든 요청에 대한 클라이언트 필터

  - exchangeStrategies: HTTP 메시지 reader/writer 커스터마이징.

  - clientConnector: HTTP 클라이어늩 라이브러리 세팅.

 

다음 예제는 HTTP 코덱을 설정한다.

val webClient = WebClient.builder()
        .exchangeStrategies { strategis ->
            strategis.codecs {
                // ...
            }
        }
        .build()

 

한번 만들어진 WebClient 인스턴스는 불변형이다. 하지만 원본 인스턴스에 영향을 주지 않으면서 인스턴스를 복제하고 변경된 복제본을 만드는 일이 가능하다. 다음은 그 예제이다.

val client1 = WebClient.builder()
        .filter(filterA).filter(filterB).build()
val client2 = client1.mutate()
        .filter(filterC).filter(filterD).build()

// client1 has filterA, filterB
// client2 has filterA, filterB, filterC, filterD

2.1.1. MaxInMemorySize

스프링 웹플럭스는 애플리케이션의 메모리 이슈를 피하기 위해 코덱의 데이터 인 메모리 버퍼링 사이즈에 제한을 둘 수 있다. 이 제한의 디폴트 값은 256KB 이며, 이 값으로 충분하지 못할때는 다음 메시지를 만나게 된다.

# org.springframework.core.io.buffer.DataBufferLimitException: Exceeded limit on max bytes to buffer

 

모든 디폴트 코덱에 적용되는 제한값을 설정할 수 있다.

val webClient = WebClient.builder()
        exchangeStraegies { builder ->
            builder.codes {
                it.defaultCodes().maxInMemorySize(2 * 1024 * 1024)
            }
        }
        .build()

2.1.2. 리액터 네티 (Reactor Netty)

리액터 네티 설정 커스터마이징을 위한 미리 설정된 HttpClient를 제공한다.

val httpClient = HttpClient.create().secure { ... }

val webClient = WebClient.builder()
        .clientConnector(ReactorClientHttpConnector(httpClient))
        .build()

Resources

기본적으로, HttpClient는 이벤트 루프 쓰레드와 커넥션 풀을 포함하여 reactor.netty.http.HttpResources에 있는 전역 리액터 네티에 속해있다. 이벤트 루프 동시성에는 고정된 공유 자원들이 선호되므로, 이 모드가 권고된다. 이 모드에서는 전역적 자원은 프로세스가 종료될때까지 유효하다.

 

서버가 프로세스에 맞추어져 있다면, 일반적으로 명시적 셧다운이 필요하지 않다. 하지만 만약 서버가 프로세스 안에서 시작되고 정지될 수 있다면(예를 들어, WAR로 배포된 스프링 MVC 애플리케이션), 여기에는 ReactorResourceFactory를 globalResources=true(디폴트) 로 설정한 스프링 관리 빈을 선언하여 리액터 네티 전역 자원이 스프링 ApplicationContext의 클로징 시점에 셧다운되도록 할 수 있다.

@Bean
fun reactorResourceFactory() = ReactorResourceFactory()

 

또한 글로벌 리액터 네티 자원에 속하지 않도록 할 수도 있는데, 이 모드에서는 리액터 네티 클라이언트와 서버 인스턴스가 공유된 자원을 사용하도록 하는 것은 설정하는 사람의 몫이 된다. 다음은 그 예제이다.

@Bean
fun resourceFactory() = ReactorResourceFactory().apply {
    isUseGlobalResources = false // 전역 자원과 독립된 자원을 생성한다.
}

@Bean
fun webClient(): WebClient {
    
    val mapper: (HttpClient) -> HttpClient = {
        // Futher customizations...
    }
    
    val connector = ReactorClientHttpConnector(resourceFactory(), mapper)  // 자원 팩토리로 ReactorClientHttpConnector 생성자를 사용한다.
    
    return WebClient.builder().clientConnector(connector).build()  // 커넥터를 WebClient.Builder 에 연결한다.
}

타임아웃(Timeouts)

커넥션 타임아웃은 다음과 같이 설정한다.

import io.netty.channel.ChannelOption

val httpClient = HttpClient.create()
        .tcpConfiguration { it.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) }

 

read/write 타임아웃은 다음과 같이 설정한다.

import io.netty.handler.timeout.ReadTimeoutHandler
import io.netty.handler.timeout.WriteTimeoutHandler

val httpClient = HttpClient.create().tcpConfiguration {
    it.doOnConnected { conn -> conn
            .addHandlerLast(ReadTimeoutHandler(10))
            .addHandlerLast(WriteTimeoutHandler(10))
    }
}

2.1.3. 제티 (Jetty)

다음 예제는 제티 HttpClient 설정 커스터마이징을 보여준다.

val httpClient = HttpClient()
httpClient.cookieStore = ...
val connector = JettyClientHttpConnector(httpClient)

val webClient = WebClient.builder().clientConnector(connector).build()

 

기본적으로 HttpClient는 자신의 리소스(Executor, ByteBufferPool, Scheduler)를 사용한다. 이 리소스들은 프로세스가 종료되거나 stop() 이 호출될때까지 유효하다.

 

리소스를 다수의 제티 클라이언트(그리고 서버) 인스턴스 사이에서 공유할 수 있고, JettyResourceFactory를 스프링 관리 빈으로 선언함으로써 스프링 ApplicationConext가 클로징될때 셧다운되도록 할 수 있다. 다음은 그 예제이다.

@Bean
fun resourceFactory() = JettyResourceFactory()

@Bean
fun webClient(): WebClient {
    
    val httpClient = HttpClient()
    // Further customizations...
    
    val connector = JettyClientHttpConnector(httpClient, resourceFactory()) // JettyClientHttpConnector 생성자에 리소스 팩토리를 사용한다.
    
    return WebClient.builder().clientConnector(connector).build()  // 커넥터를 WebClient.Builder에 연결한다.
}

2.2. retrieve()

retrieve() 메서드는 응답 본문을 얻고 디코딩하기 위한 가장 쉬운 방법이다. 다음은 그 예제이다.

val client = WebClient.create("https://example.org")

val result = client.get()
        .uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
        .retrieve()
        .awaitBody<Person>()

 

응답으로부터 디코딩된 객체의 스트림을 얻을 수도 있다.

val result = client.get()
        .uri("/quotes").accept(MediaType.TEXT_EVENT_STREAM)
        .retrieve()
        .bodyToFlow<Quote>()

 

기본적으로 상태 코드 4xx 혹은 5xx 응답의 결과는 WebClientResponseException 혹은 WebClientResponseException.BadRequest, WebClientResponseException.NotFound, 기타 등등과 같은 HTTP 상태 특징적 서브클래스가 된다. onStatus 메서드를 사용하여 결과 익셉션을 커스터마이징할 수 있다.

val result = client.get()
        .url("/persons/{id}", id).accpet(MediaType.APPLICATION_JSON)
        .retrieve()
        .onStatus(HttpStatus::is4xxClientError) { ... }
        .onStatus(HttpStatus::is5xxServerError) { ... }
        .awaitBody<Person>()

 

onStatus가 사용될때, 그 응답이 내용을 가지고 있다고 예상된다면 onStatus 콜백은 이를 소비해야한다. 그렇지 않으면 응답 내용은 리소스를 릴리즈하기 위해 자동으로 비워진다.


2.3. exchange()

exchange() 메서드는 retrieve 메서드보다 더 많은 기능을 제공한다. 다음 예제는 retrieve() 와 동등하면서도 ClientResponse에의 접근을 제공한다.

val result = client.get()
        .uri("/persons/{id}", id).accpet(MediaType.APPLICATION_JSON)
        .awaitExchange()
        .awaitBody<Person>()

 

이 레벨에서는 완전한 ResponseEntity를 생성할 수도 있다.

val result = client.get()
        .uri("/persons/{id}", id).accpet(MediaType.APPLICATION_JSON)
        .awaitExchange()
        .toEntity<Person>()

 

retrieve() 와는 달리, exchange()를 사용하면 4xx, 5xx 응답에 대한 자동적인 에러 시그널이 존재하지 않는다. 직접 상태 코드를 체크하여 그 뒤의 동작을 결정해야 한다.

 

exchange() 사용시 응답 본문은 언제나 소비되거나 릴리즈되어야 한다. 익셉션이 발생하더라도 마찬가지이다. 보통, 본문을 원하는 타입의 객체로 컨버팅하기 위해 ClientResponse의 bodyTo* 혹은 toEntity*를 실행하는데, releaseBody()를 사용해서 본문 컨텐츠를 소비하지 않고 버리는 일도 가능하다. 아니면 toBodilessEntity()를 사용해서 상태와 헤더만을 얻을 수도 있다(본문을 버린다).

 

그리고, bodyToMono(Void.class)가 있다. 이 방법은 오직 응답 컨텐츠가 없다고 예상되는 경우에 한하여 사용되어야 한다. 만일 응답이 컨텐츠를 가지고 있다면 그 커넥션은 닫히고, 커넥션 풀로 반환되지 않는다. 왜냐하면 이 커넥션은 재사용 가능한 상태로 남지 않기 때문이다.


2.4. 요청 본문 (Request Body)

요청 본문은 Mono 혹은 코틀린 코루틴 Deferred와 같이, ReactiveAdapterRegistry가 핸들링하는 어떠한 비동기 타입으로부터든 인코딩 가능하다. 다음은 그 예제이다.

val personDeferred: Deferred<Person> = ...

client.post()
        .uri("/persons/{id}", id)
        .contentType(MediaType.APPLICATION_JSON)
        .body<Person>(personDeferred)
        .retrieve()
        .awaitBody<Unit>()

 

또한 인코딩된 객체의 스트림을 가질 수도 있다.

val people: Flow<Person> = ...

client.post()
        .uri("/persons/{id}", id)
        .contentType(MediaType.APPLICATION_JSON)
        .body(people)
        .retrieve()
        .awaitBody<Unit>()

 

아니면, 실제 값을 가진 경우 bodyValue 메서드를 사용할 수도 있다.

val person: Person = ...

client.post()
        .uri("/persons/{id}", id)
        .contentType(MediaType.APPLICATION_JSON)
        .bodyValue(person)
        .retrieve()
        .awaitBody<Unit>()

2.4.1. 폼 데이터(Form Data)

폼 데이터를 전송하기 위해 요청 본문으로 MultiValueMap<String, String>을 사용할 수 있다. 본문 컨텐츠는 FormHttpMessageWriter에 의해 자동으로 application/x-www-form-urlencoded로 설정된다. 다음 예제는 MultiValueMap<String, String> 을 사용한다.

val formData: MultiValueMap<String, String> = ...

client.post()
        .uri("/path", id)
        .bodyValue(formData)
        .retrieve()
        .awaitBody<Unit>()

 

BodyInserters를 사용하여 폼 데이터 인라인으로 생성할 수도 있다.

import org.springframework.web.reactive.function.BodyInserters.*

client.post()
        .uri("/path", id)
        .body(fromFormDAta("k1", "v1").with("k2", "v2"))
        .retrieve()
        .awaitBody<Unit>()

2.4.2. 멀티파트 데이터(Multipart Data)

멀티파트 데이터를 전송하기 위해 파트 컨텐츠 혹은 파트에 대한 컨텐츠나 헤더를 나타내는 HttpEntity 인스턴스를 나타내는 Object 인스턴스를 값으로 가진 MultiValueMap<String, ?>을 사용할 수 있다. MultipartBodyBuilder는 멀티파트 요청을 준비하기 위한 편리한 API를 제공한다. 다음 예제는 MultiValueMap<String, ?>을 생성한다.

val builder = MultipartBodyBuilder().apply {
    part("fieldPart", "fieldValue")
    part("filePart1", new FileSystemResource("...logo.png"))
    part("jsonPart", new Person("Jason"))
    part("myPart", part)  // Part from a server request
}

val pars = builder.build()

 

대부분의 경우, 각 파트에 대한 Content-Type을 지정할 필요는 없다. 컨텐츠 타입은 시리얼라이징을 위해 선택된 HttpMessageWriter, 혹은 파일 확장자에 기반한 Resource를 바탕으로 자동적으로 결정된다. 필요에 따라서 오버로딩된 빌더 part 메소드를 통해 각 파트가 사용할 MediaType을 명시적으로 제공할 수 있다.

 

한 번 MultiValueMap 이 준비되면, 이를 WebClient에게 전달하는 가장 쉬운 방법은 body 메서드를 사용하는 것이다.

val builder: MultipartBodyBuilder = ...

client.post()
        .uri("/path", id)
        .body(builder.build())
        .retrieve()
        .awaitBody<Unit>()

 

MultiValueMap이 최소 하나의 non-String 값을 포함한다면 Content-Type을 multipart/form-data 로 세팅할 필요가 없다. 이 때의 값은 일반 폼 데이터(application/x-www-form-urlencoded)를 나타낼 수도 있다. 이는 언제나 MultipartBodyBuilder를 사용하는 경우가 된다. 이 빌더는 HttpEntity 래퍼를 보장한다.

 

MultipartBodyBuilder 의 대안으로, 내장형 BodyInserter를 통한 인라인 스타일의 멀티파트 컨텐츠를 제공할 수도 있다.

import org.springframework.web.reactive.function.BodyInserters.*

client.post()
        .uri("/path", id)
        .body(fromMultipartData("filePart", "value").with("filePart", resource))
        .retrieve()
        .awaitBody<Unit>()

2.5. 클라이언트 필터 (Client Filters)

요청을 인터셉팅하고 변경하기 위해 WebClient.Builder를 사용해서 클라이언트 필터(ExchangeFilterFunction)를 등록할 수 있다. 다음은 그 예제이다.

val client = WebClient.builder()
        .filter { request, next -> 
            
            val filtered = ClientRequest.from(request)
                    .header("foo", "bar")
                    .build()
            
            next.exchange(filtered)
        }
        .build()

 

필터는 인증과 같은 크로스커팅 관심사를 다루기 위해 사용될 수 있다. 다음 예제는 필터를 사용하여 스태틱 팩토리 메서드를 통해 기본 인증을 처리한다.

import org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication

val client = WebClient.builder()
        .filter(basicAuthentication("user", "password"))
        .build()

 

필터는 모든 요청에 글로벌하게 적용된다. 특정 요청에 대한 필터의 동작을 변경하려면, 체인 안의 모든 필터가 접근하는 ClientRequest에 요청 어트리뷰트를 추가할 수 있다. 다음 그 예제이다.

val client = WebClient.builder()
                .filter { request, _ ->
                    val usr = request.attributes()["myAttribute"];
                    // ...
                }.build()

client.get().uri("https://example.org/")
        .attribute("myAttribute", "...")
        .retrieve()
        .awaitBody<Unit>()

 

기존 WebClient를 복제하거나, 새 필터를 추가하거나, 이미 등록된 필터를 제거하는 것도 가능하다. 다음은 인덱스 0에 기본 인증 필터를 추가한다.

val client = webClient.mutate()
        .filters { it.add(0, basicAuthentication("user", "password")) }
        .build()

2.6. 동기식 사용 (Synchronous Use)

WebClient 결과를 위한 끝에 블로킹하는 방법으로 동기 방식을 사용할 수 있다.

val person = runBlocking {
    client.get().uri("/person/{id}", i).retrieve()
            .awaitBody<Person>()
}

val persons = runBlocking {
    client.get().uri("/persons").retrieve()
            .bodyToFlow<Person>()
            .toList()
}

 

하지만 다수의 호출이 필요할 경우, 각 응답에 대한 개별적인 블로킹은 피하는 것이 더 효율적이다. 대신, 결합된 결과를 기다린다.

val data = runBlocking {
        val personDeferred = async {
            client.get().uri("/person/{id}", personId)
                    .retrieve().awaitBody<Person>()
        }
        
        val hobbiesDeferred = async {
            client.get().uri("/person/{id}/hobbies", personId)
                    .retrieve().bodyToFlow<Hobby>().toList()
        }
        
        mapOf("person" to personDeferred.await(), "hobbies" to hobbiesDeferred.await())
    }

 

많은 수의 원격 호출, 잠재적으로 중첩된, 상호 의존적이면서 끝까지 블로킹하지 않는 리액티브 파이프라인을 만드는 수많은 패턴과 오퍼레이터가 존재한다.

 

Flux 또는 Mono를 사용하여, 스프링 MVC 혹은 스프링 웹플럭스 컨트롤러에서 절대로 블로킹하지 않을 수 있다. 간단히 컨트롤러 메서드로부터 리액티브 타입을 반환하는 것이다. 같은 원칙은 코틀린 코루틴과 스프링 웹플럭스에 동일하게 적용된다. 서스펜딩 함수를 사용하거나 컨트롤러 메서드에서 Flow를 반환한다.


2.7. 테스팅(Testing)

WebClient를 사용하는 코드를 테스트하기 위해, OkHttp MockWebServer와 같은 목 웹서버를 사용할 수 있다. 이 서버의 사용 예제를 보려면 스프링 프레임워크 테스트 슈트의 WebClientIntegrationTests, 혹은 OkHttp 리파지토리의 static-server 샘플픙ㄹ 체크아웃 해보는것도 좋다.


3. 웹소켓(WebSockets)

이 부분은 리액티브 스택 웹소켓 메시진 지원에 대해 다룬다.


3.1. 웹소켓이란

웹소켓 프로토콜, RFC 6455는 단일 TCP 커넥션 위로 클라이언트와 서버 사이의 전 양방의 양방향 통신 채널을 설정하는 표준적인 방법을 제공한다. 이는 HTTP 와는 다른 TCP 프로토콜이지만, HTTP를 근간으로 동작하도록 설계되었다. 80과 443 포트를 사용하며, 기존 방화벽 규칙을 재사용한다.

 

웹소켓 상호작용은 업그레이드를, 혹은 웹소켓 프로토콜로의 스위칭을 위한 HTTP Upgrade 헤더를 사용하는 HTTP 요청과 함께 시작된다. 다음 예제는 이러한 상호작용을 보여준다.

GET /spring-websocker-protfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: http://localhost:8080

 

보통의 200 상태 코드 대신, 웹소켓 서버는 다음과 같은 비슷한 아웃풋을 반환한다.

HTTP/1.1 101 Switching Protocols (1)
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp

 

1) 프로토콜 스위치

핸드셰이크 성공 후, HTTP 업그레이드 요청의 기반 TCP 소켓은 메시지 송수신을 위해 클라이언트와 서버 양쪽으로 열려있게 된다.