본문 바로가기

프로그래밍(TA, AA)/JVM 언어

[kotlin] 코틀린의 코루틴 가이드(1)

코틀린 코루틴 가이드를 잘 번역해주신 블로그입니다. 아래 블로그 내용으로 공부를 해서 원본 내용은 아래 링크 참고 바랍니다.
https://medium.com/@myungpyo/reading-coroutine-official-guide-thoroughly-part-0-20176d431e9d

 

Reading Coroutine official guide thoroughly — Part 0

요즘 안드로이드 앱 개발자들은 주개발 언어가 Java 에서 Kotlin 으로 많이 전환 되고 있다고 생각합니다. iOS 가 objective-c 에서 swift 로 전환할 때 처럼 안드로이드 SDK 공식 가이드에서도 Kotlin 과 Java

medium.com

kotlin 언어를 개발한 Jetbrain은 Kotlin 대중화에 한발작 더 나아가기 위해 많은 개발자들이 겪는 스레딩 문제를 직관적인 방식으로 해결할 수 있도록 도와주는 코루틴(Coroutine)을 개발해 Kotlin에 포함시켰다.


1. Basics

package com.smpcoroutinesample.basic

import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch

fun main(args: Array<String>) {
    GlobalScope.launch {
        delay(1000L)
        println("World!")
    }
    println("Hello,")
    Thread.sleep(2000L)
}

GlobalScope.launch {} 코드 블록은 코루틴을 생성하기 위한 코루틴 빌더이며 이렇게 생성되어 실행되는 코루틴은 호출(실행) 쓰레드를 블록하지 않기 때문에 그대로 두면 메인 함수가 종료되고 메인 함수를 실행한 메인 스레드 역시 종료되어 프로그램이 끝나게 된다. 이를 방지하기 위해 임의의 시간을 지정하여 지연시키고 있다. 이렇게 스레드를 멈추는 역할을 수행하는 함수를 중단 함수(Blocking function)이라고 한다.

 

우리는 이러한 중단 함수가 현재 스레드를 멈추게 할 수 있다는 것을 코드상에 보다 명시적으로 나타내기 위해 다음과 같이 runBlocking {} 블록을 사용할 수 있다.

fun main(args: Array<String>) {
    GlobalScope.launch {
        delay(1000L)
        println("World!")
    }
    println("Hello,")
    runBlocking {
        delay(2000L)
    }
}

runBlocking{} 블록은 주어진 블록이 완료될때까지 현재 스레드를 멈추는 새로운 코루틴을 생성하여 실행하는 코루틴 빌더이다. 코루틴 안에서는 runBlocking{}의 사용은 권장되지 않으며, 일반적인 함수 코드 블록에서 중단 함수를 호출할 수 있도록 하기 위해서 존재하는 장치이다. 

 

메인 함수 자체를 runBlocking {} 코루틴 빌더를 이용하여 작성하면 지연을 위한 delay() 중단 함수의 사용이 보다 자연스러워 진다.

fun main(args: Array<String>) = runBlocking {
    GlobalScope.launch {
        delay(1000L)
        println("World!")
    }
    println("Hello,")
    delay(2000L)
}

delay() 는 중단 함수이며 모든 중단 함수들은 코루틴 안에서만 호출될 수 있다는 제약이 있다. GlobalScope.launch{ } 코드블록에서 dealy(1000L) 를 사용할 수 있었던 이유도 GlobalScope.launch{ }가 주어진 코드블록을 수행하는 코루틴을 생성하는 코루틴 빌더이며, 해당 코드블록은 코루틴 안에서 수행되고 있기 때문이다. 

 

지금까지의 예제에서는 GlobalScope.launch{ }로 실행된 코루틴의 수행이 완료될때까지 현재 스레드(main 함수)를 대기시키기 위해서 임의의 지연(2초)를 주었는데 이것은 실제 프로그램에서는 적절한 방법이 아니다. 그 이유는 내부적으로 실행중인 코루틴(자식 코루틴)들이 작업을 완료하고 종료될때까지 얼마나 대기해야 할지 부모 코루틴은 예측할 수 없기 때문이다.

 

이러한 문제를 해결하기 위해서 우리는 다음과 같이 GlobalScope.launch{ }의 결과로 반환되는 Job 인스턴스를 이용할 수 있다.

fun main(args: Array<String>) = runBlocking {
    val job = GlobalScope.launch {
        delay(1000L)
        println("World!")
    }
    println("Hello,")
    job.join()
}

이제 runBlocking{} 빌더로 생성된 코루틴 블록(이후부터는 메인 코루틴이라 칭함)은 GlobalScope.launch{} 빌더를 이용해 생성한 코루틴이 종료될때까지 대기한 후 종료된다. 이는 자식 코루틴의 실행 흐름에 연결됨으로써 가능해졌다.

job.join()

여기서 한가지 생각해볼만한 문제가 있다. 메인 코루틴 안에서 두개 이상의 자식 코루틴들이 수행되고, 우리는 모든 자식 코루틴들의 종료를 기다리도록 구현해야 한다면 어떨까? 우리는 자식 코루틴들에 대응되는 모든 Job 객체들의 참조를 어딘가에 유지하고 있다가 부모 코루틴이 종료되어야 하는 시점에 실행된 모든 자식 코루틴들의 Job 들에 join 하여 자식 코루틴들의 종료를 기다려야 할것이다. 굉장히 번거로운 작업인데, 바로 이부분이 코루틴 스코프(Scope)가 빛을 발하는 부분이다.

 

모든 코루틴들은 각자의 스코프를 갖는다. 그래서 다음과 같이 runBlocking{} 코루틴 빌더 등을 이용해 생성된 코루틴 블록 안에서 launch{ } 코루틴 빌더 등을 이용하여 새로운 코루틴을 생성하면 현재 위치한 부모 코루틴에 join()을 명시적으로 호출할 필요없이 자식 코루틴들을 실행하고 종료될때까지 대기할 수 있다.

fun main(args: Array<String>) = runBlocking {
    launch {
        delay(1000L)
        println("World!")
    }
    println("Hello,")
}

 

Scope builder

만일 어떤 코루틴들을 위한 사용자 정의 스코프가 필요한 경우가 있다면 coroutineScope{} 빌더를 이용할 수 있다. 이 빌더를 통해 생성된 코루틴은 모든 자식 코루틴들이 끝날때까지 종료되지 않는 스코프를 정의하는 코루틴이다.

 

이 시점에 우리는 예제로 계속 사용하고 있는 runBlocking{ } 빌더와 coroutineScope{ } 빌더가 무슨 차이가 있는지 궁금할 수 있다. 그 차이는 runBlocking{ } 과 달리 coroutineScope{ }는 자식들의 종료를 기다리는 동안 현재 스레드를 블록하지 않는다.

fun main(args: Array<String>) = runBlocking {
    launch {
        delay(200L)
        println("Task from runBlocking")
    }
    
    coroutineScope {
        launch {
            delay(500L)
            println("Task from nested launch")
        }
        delay(100L)
        println("Task from coroutine scope")
    }
    println("Coroutine scope is over")
}

위 예제의 실행 결과는 다음과 같다.

Task from coroutine scope
Task from runBlocking
Task from nested launch
Coroutine scope is over

runBlocking 과의 차이를 보기 위해 coroutineScope를 runBlocking으로 바꿔서 실행해보면 Task from runBlocking이 가장 마지막에 출력된다. launch{ } 블록이 실행 기회를 얻지 못했기 때문이다.

 

Extract function refactoring

지금까지의 코틀린 예제들은 하나의 메인 함수 블록 안에 모든 로직을 기술하였다. 이것은 우리의 일상적인 코딩 패턴이 아니며 단지 샘플을 위한 뚱뚱한 함수일 뿐이다. 이제 이것들을 그 용도에 맞는 개별 함수로 분리하여 보다 실용적으로 사용할 수 있는 패턴으로 변경해보도록 한다.

fun main(args: Array<String>) = runBlocking {
    launch {
        doWorld()
    }
    println("Hello,")
}

suspend fun doWorld() {
    delay(1000L)
    println("World!")
}

위 샘플 코드에서 볼 수 있는것처럼 코루틴 내부에서 실행되는 중단 함수들은 suspend 키워드를 함수명 앞에 붙임으로써 만들 수 있다. 이러한 함수들이 일반 함수와 비교해 갖는 차이점은 delay() 와 같은 다른 중단함수들을 호출할 수 있다는 점이다. 그 이유는 suspend 키워드를 붙여 만든 이 함수 역시 중단 함수이기 때문에 특정 코루틴 컨텍스트 안에서 수행되고 있고, 코루틴 컨텍스트 안에서는 모든 중단 함수를 호출할 수 있기 때문이다.

 

하지만 만약 중단 함수가 현재 스코프에서 수행될 코루틴 빌더를 포함한다면 어떨까? 이 경우에 suspend 키워드 만으로는 충분하지 않다. 위 예제에서 doWorld() 함수를 CoroutineScope의 확장함수로 만드는 방법을 생각해볼 수 있겠지만 이것은 API를 불명확하게 만든다.

 

좀 더 나은 방법은 명시적으로 CoroutineScope을 필드로 갖는 클래스를 만들고 그 클래스가 해당 suspend 함수를 갖게하는 것, 혹은 외부(outer) 클래스의 구현을 암시적으로 사용하는 방법이 있다. 또 다른 수단으로는 CoroutineScope(coroutineContext) 자체를 생성하여 사용할 수 있지만 이러한 접근 방식은 구조적으로 안전하지 않다. 왜냐하면 이러한 방식을 사용하는 순간부터 더이상 이 메서드가 실행되는 스코프를 컨트롤할 방법이 없어지기 때문이다. private API만이 이 빌더를 사용할수 있다.

 

Coroutines are light-weight

일반적인 스레드 구현으로는 메모리 부족(Out-Of-Memory) 오류가 발생할 수 있는 동작도 다음과 같이 코루틴으로 작성하면 정상적으로 동작한다.

fun main(args: Array<String>) = runBlocking {
    repeat(100_000) {
        launch {
            delay(1000L)
            print(".")
        }
    }
}

위 예제는 십만개의 코루틴을 수행하고 1초 후에 각각의 코루틴들은 점(.)을 출력한다. 같은 동작을 스레드로 구현하여 수행하게되면 Out-Of-Memory 류의 메모리 부족 예외를 발생시켰을 것이다.

 

Global coroutines are like daemon threads

다음 코드는 오랜 시간동안 Global Scope에서 수행되는 코루틴을 만들어 실행한다. 이 코루틴은 "I'm sleeping"이라는 문자열을 500ms 간격으로 천번 출력한다. 이렇게 무거운(오래걸리는) 코루틴이 수행되는 동안 메인 함수는 그것보다 짧은 시간을 대기한 후 종료한다.

fun main(args: Array<String>) = runBlocking {
    GlobalScope.launch {
        repeat(1000) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
    }
    delay(1300L)
}

위 예제에서 Global Scope 에서 실행된 코루틴은 마치 데몬 스레드와 같이 자신이 속한 프로세스의 종료를 지연시키지 않고 프로세스 종료시 함께 종료되기 때문에 다음과 같이 허용된 시간 동안만 동작한 결과를 만들어 낸다.


2. Cancellation and Timeouts

코루틴에서 실행되는 모든 중단 함수(suspending function)들은 취소 요청에 응답 가능하도록 구현되어야 한다. 다시말해 중단 함수는 실행 중 취소 가능한 구간마다 취소 요청이 있었는지 확인하고 요청이 있었다면 실행을 즉시 취소하도록 구현되어야 한다. kotlinx.coroutines 라이브러리의 모든 중단함수는 이러한 취소 요청에 대응하도록 구현되어 있다.

 

앞서 이야기 한 것처럼 취소를 지원하는 중단 함수들은 실행하는 동안 취소가 가능한 지점마다 현재 코루틴이 취소 되었는지 확인하며, 만약 취소되었다면 CancellationException을 발생시키며 종료한다.

 

ReactiveX Observable을 구현해본 경험이 있다면, Observable의 코드 실행 중 취소 가능한 구간마다 isDisposed() 같은 취소 상태 확인 함수를 이용하여 현재 Observable의 구독 취소 여부를 확인하고 Observable 의 데이터 방출을 정지하고 종료해야 하는지 판단한다.

 

만약 다음과 같이 코루틴을 작성하면 취소 요청이 오더라도 작업을 멈추지 않고 계속 진행한다. (Sleep 대신 Busy-waiting 구현을 해도 마찬가지이다)

fun main(args: Array<String>) = runBlocking {
    val job = launch(Dispatchers.Default) {
        for (i in 1..10) {
            println("I'm sleeping $i ...")
            Thread.sleep(500L)
        }
    }
    
    delay(1300L)
    println("main : I'm tired of waiting!")
    job.cancelAndJoin()
    println("main : Now I can quit.")
}

이 코루틴을 취소 요청에 친화적인 코드로 만들기 위해서는 취소가 가능한 시점마다 다른 Continuation에 실행 시간을 양보하는 yield() 함수를 호출하거나, CoroutineScope에 정의된 isActive 속성을 참조하여 코루틴이 비활성(Inactive) 상태인 경우 작업을 중단하도록 작성하는 방법이 있다.

 

yield() 중단 함수를 사용한 구현은 다음과 같이 작성할 수 있다.

fun main(args: Array<String>) = runBlocking {
    val job = launch(Dispatchers.Default) {
        for (i in 1..10) {
            yield()
            println("I'm sleeping $i ...")
            Thread.sleep(500L)
        }
    }
    
    delay(1300L)
    println("main : I'm tired of waiting!")
    job.cancelAndJoin()
    println("main : Now I can quit.")
}

isActive 속성을 이용한 구현은 다음과 같다.

fun main(args: Array<String>) = runBlocking {
    val job = launch(Dispatchers.Default) {
        for (i in 1..10) {
            if (!isActice) {
                break
            }
            println("I'm sleeping $i ...")
            Thread.sleep(500L)
        }
    }
    
    delay(1300L)
    println("main: I'm tired of waiting!")
    job.cancelAndJoin()
    println("main: Now I can quit.")
}

취소 가능한 중단함수들은 취소되면 CancellationException을 발생시키며, 우리는 일반적인 예외처리 방식과 동일하게 이를 처리할 수 있다. 만약 예외발생 시 해제해야하는 리소스가 있다면 두가지 방식을 사용할 수 있는데 try ~ finally 구문을 사용하는 방식과 Kotlin use() 함수를 사용하는 방식이 있다.

 

try ~ finally 방식을 사용하면 다음과 같이 작성할 수 있다.

fun main(args: Array<String>) = runBlocking {
    val job = launch {
        try {
            repeat(1000) { i -> 
                println("I'm sleeping $i ...")
                delay(500L)
            }
        } finally {
            println("main : I'm running finally!")
        }
    }
    
    delay(1300L)
    println("main : I'm tired of waiting!")
    job.cancelAndJoin()
    println("main : Now I can quit.")
}

Kotlin use() 함수를 사용하면 다음과 같이 작성할 수 있다.

fun main(args: Array<String>) = runBlocking {
    val job = launch {
        SleepingBed().use {
            it.sleep(1000)
        }
    }
    
    delay(1300L)
    println("main : I'm tired of waiting!")
    job.cancelAndJoin()
    println("main : Now I can quit.")
}

class SleepingBed : Closeable {
    
    suspend fun sleep(times: Int) {
        repeat(times) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
    }
    
    override fun close() {
        println("main : I'm running close() in SleepingBed!")
    }
}

 

취소 불가능한 코드 블록의 실행 (Run non-cancellable block)

이미 CancellableException이 발생한 코루틴의 finally 블록 안에서 중단 함수를 호출하면 현재 코루틴은 이미 취소된 상태이기때문에 CancellationException이 발생한다.

 

보통 리소스를 정리하는 함수들은 논-블로킹(Non-Blocking)으로 동작하기 때문에 이러한 제약이 큰 문제가 되지는 않는다. 하지만 이미 취소된 로루틴 안에서 동기적으로 어떤 중단 함수를 호출해야 하는 상황이라면 우리는 withContext{ } 코루틴 빌더에 NonCancellable 컨텍스트를 전달하여 이를 처리할 수 있다.

fun main(args: Array<String>) = runBlocking {
    val job = launch {
        try {
            repeat(1000) { i ->
                println("I'm sleeping $i ...")
                delay(500L)
            }
        } finally {
            withContext(NonCancellable) {
                delay(1000)
                println("main : I'm running finally!")
            }
        }
    }
    
    delay(1300L)
    println("main : I'm tired of waiting!")
    job.cancelAndJoin()
    println("main : Now I can quit.")
}

 

타임 아웃(Timeout)

일반적으로 어떤 코루틴의 실행을 중간에 취소해야하는 경우는 그 수행시간이 너무 길어져 허용할 수 있는 시간을 넘어섰을 경우이다. 이 경우 우리는 타임아웃(Timeout)을 지정하고 이 시간을 넘어설 경우 해당 작업을 취소하도록 구현할 수 있다.

 

일반적으로 취소 요청이 없었음에도 어떤 코루틴의 실행을 중간에 취소해야 하는 경우는 그 코루틴의 수행시간이 허용 가능한 시간보다 길어졌을 경우이다. 이러한 경우를 다루기 위해서 우리는 코루틴에 제한 시간(Timeout)을 설정하고 이 시간이 넘어설 경우 코루틴이 취소되도록 구현할 수 있다.

 

이러한 기능을 현재까지 학습해온 코루틴 기본함수들로 직접 구현해보면 다음과 같이 구현해 볼 수 있다.

 

1. 제한 시간을 설정할 대상이 되는 코루틴을 생성한다.

2. 일정 시간(Timeout) 지연 후 전달 받은 Job이 끝나지 않았으면 취소하는 동작을 하는 코루틴을 생성하고, 1번에서 만들고 실행한 코루틴의 Job 객체를 전달한다. 

3. 테스트를 위해 1번 코루틴은 2번 코루틴에서 설정한 시간보다 긴 수행시간을 갖도록 구현한다.

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = launch {
        try {
            repeat(1000) { i ->
                println("I'm sleeping $i ...")
                delay(500L)
            }
        } finally {
            println("main : I'm running finally!")
        }
    }
    
    launch {
        delay(1300L)
        println("main : I'm tired of waiting. Cancel the job!")
        if (job.isActive) {
            job.cancelAndJoin()
        }
    }
}

위 구현은 우리가 예상한대로 동작한다. 다만 위와 같이 대상 코루틴의 Job 객체 참조를 유지하며 별도의 코루틴에서 취소를 처리하도록 하는 일을 매번하는 것은 번거로운 일이다. 그래서 코루틴 프레임워크는 이 작업을 대신해줄 withTimeout() 함수가 준비되어 있다.

fun main(args: Array<String>) = runBlocking<Unit> {
    withTimeout(1300L) {
        launch {
            try {
                repeat(1000) { i -> 
                    println("I'm sleeping $i ...")
                    delay(500L)
                }
            } finally {
                println("main : I'm running finally!")
            }
        }
    }
}

위 함수를 실행하면 다음과 같이 TimeoutCancellationException이 발생하는데, 이는 예제가 메인 함수에서 바로 실행되었기 때문이다. 코루틴이 취소될 경우 발생하는 CancellationException은 코루틴에서 일반적인 종료 상황 중 하나로 간주된다.

I’m sleeping 0 …
I’m sleeping 1 …
I’m sleeping 2 …
main : I’m running finally!
Exception in thread “main” kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms at kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:122) at kotlinx.coroutines.TimeoutCoroutine.run(Timeout.kt:88)
Process finished with exit code 1

3. 채널(Channels)

지연 값(Deferred values)을 사용하면 서로 다른 코루틴들이 손쉽게 하나의 값을 공유할 수 있다. 채널을 이용하면 서로 다른 코루틴들간에 데이터 스트림을 공유할 수 있다. 채널은 BlockingQueue와 유사하게 동작하지만 값을 읽고 쓰기 위한 함수의 이름이 다르다. 큐와 비교하면 값을 큐에 넣기 위해 put() 대신 send()를 사용하고, 큐에서 값을 꺼내기 위해서 take() 대신 recieve() 를 사용한다.

val channel = Channel<Int>()
launch {
    // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
    for (x in 1..5) channel.send(x * x)
}
// here we print five received integers:
repeat(5) { println(channel.receive()) }
println("Done!")
1
4
9
16
25
Done!

 

채널의 순회와 닫기 (Closing and iteration over channels)

큐(queue)와 달리 채널은 닫을 수 있고, 닫힌 채널은 더이상 값이 전달되지 않을 것임을 나타낸다. 어떤 채널의 데이터를 수신하는 쪽에서는 이 점을 이용하여 채널을 순회하는 함수들을 사용할 수 있다. (ex > for loop)

 

채널을 닫는 요청을 하게되면 내부적으로 채널이 닫힐 것임을 나타내는 값을 채널 큐의 마지막에 추가 하는 방식으로 동작하기 때문에, 이 닫힘 값이 채널에 전달되기 전에 도착했던 값들을 수신하는 쪽에서 모두 수신할 수 있음이 보장된다.

val channel = Channel<Int>()
launch {
    for (x in 1..5) channel.send(x * x)
    channel.close() // we're done sending
}
// here we print received values using `for` loop (until the channel is closed)
for (y in channel) println(y)
println("Done!")

 

채널 프로듀서 만들기 (Building Channel Producers)

코루틴이 어떤 데이터 스트림(연속된 값 들의 흐름)을 생성해내는 일은 꽤나 흔한일이며, 이것은 우리가 동시성 코드에서 흔히 접할 수 있었던 프로듀서-컨슈머 패턴의 일부이다. 이러한 프로듀서의 생성 작업을 추상화하기 위해서 채널을 파라미터로 전달받는 생성 함수로 만들 수도 있다. 하지만 이러한 함수는 "함수는 반드시 결과가 반환되어야 한다"는 상식에 맞지 않게 된다. 

 

코루틴 프레임워크 에서는 이러한 프로듀서의 생성 작업을 용이하게 하기 위해서 produce{ } 라는 코루틴 빌더와 이렇게 생성된 프로듀서가 생성하는 값들의 수신을 돕기위한 consumeEach() 라는 확장 함수가 제공된다. (consumeEach는 for를 대체함)

fun main(args: Array<String>) = runBlocking<Unit> {
    val squares = produceSquares(5)
    squares.consumeEach { println(it) }
    println("Done")
}

fun CoroutineScope.produceSquares(max: Int): ReceiveChannel<Int> = produce {
    for (x in 1..max) {
        send(x * x)
    }
}

 

파이프라인 (Pipeline)

파이프라인이란 하나의 코루틴이 데이터 스트림(무한 스트림 포함)을 생산해내고 다른 하나 이상의 코루틴들이 이 스트림을 수신받아 필요한 작업을 수행한 후 가공된 결과를 다시 전송하는 패턴을 말한다.

fun main(args: Array<String>) = runBlocking<Unit> {
    val numbers = produceNumbers(5)
    val doubledNumbers = produceDouble(numbers)
    doubledNumbers.consumeEach { println(it) }
    println("Done")
}

fun CoroutineScope.produceNumbers(max: Int): ReceiveChannel<Int> = produce {
    for (x in 1..max) {
        send(x)
    }
}

fun CoroutineScope.produceDouble(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
    numbers.consumeEach { send(it * 2) }
}

위 예제는

1) 일련의 정수를 생성해내는 프로듀서를 produce{  } 빌더를 통해 생성하고,

2) 그 결과인 ReceiveChannel을 전달된 수의 두배수를 생성해내는 프로듀서를 만들때 전달하여 파이프라인을 구성하고 그 결과를 출력하는 예제이다.

[정수 생성 프로듀서] -> [두배수 변환 프로듀서] -> 출력

 

파이프라인을 이용한 소수 생성 (Prime numbers with pipeline)

코루틴 파이프라인을 이용하여 소수를 생성해내는 조금 더 무거운 작업을 수행해본다.

fun main(args: Array<String>) = runBlocking<Unit> {
    var cur = numbersFrom(2)
    for (i in 1..10) {
        val prime = cur.receive()
        println(prime)
        cur = filter(cur, prime)
    }
    coroutineContext.cancelChildren()
    println("Done")
}

fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
    var x = start
    while (true) send(x++) // infinite stream of integers from start
}

fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
    for (x in numbers) {
        if (x % prime != 0) send(x)
    }
}

우리는 먼저 numbersFrom() 프로듀서를 이용하여 2부터 시작되는 정수의 무한스트림 채널을 만든다. 그리고 filter() 프로듀서를 더해 파이프라인을 구성하는데 filter() 프로듀서는 수신 채널과 소수를 전달받아 수신되는 값들 중에서 소수와 나누어 떨어지는 값들을 제외하고 송신한다.

numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...

여기서 우리는 runBlocking() 을 이용하여 메인스레드에서 모든 코루틴을 수행하였기 때문에 이들의 취소를 위해 코루틴들의 참조 리스트를 유지할 필요가 없어 10개의 소수를 출력한 후 cancelChildren() 확장함수를 이용하여 자식 코루틴들을 한번에 취소할 수 있다.

 

이것과 동일한 기능을 수행하는 파이프라인을 표준 라이브러리의 buildIterator{ }를 이용하여 만들 수 있다. produce{ }를 buildIterator{ }로, send()를 yield()로, receive()를 next()로 ReceiveChannel을 Iterator로 변경하고 코루틴 스코프를 제거하면 된다.

 

이렇게 채널이 아닌 buildIterator를 이용한 구현으로 변경하고나면 더이상 runBlocking { }은 필요하지 않게된다. 하지만 채널을 사용하는 파이프라인 구현 방식은 멀티 코어 환경에서 Dispatchers.Default 컨텍스트를 사용하여 동시 수행의 이점을 누릴 수 있다.

 

앞서 설명한 예제를 채널을 이용한 구현에서 buildIterator를 이용한 구현으로 변경해보면 다음과 같다.

fun main(args: Array<String>) {
    var cur = getNumberIteratorFrom(2)
    for (i in 1..10) {
        val prime = cur.next()
        println(prime)
        cur = getFilteredNumberIterator(cur, prime)
    }
    println("Done")
}

fun getNumberIteratorFrom(start: Int) = iterator {
    var x = start
    while (true) yield(x++) // infinite stream of integers from start
}

fun getFilteredNumberIterator(numbers: Iterator<Int>, prime: Int) = iterator {
    for (x in numbers) {
        if (x % prime != 0) yield(x)
    }
}

두가지 방식 모두 소수를 찾기 위한 방법으로는 실용적이지 못하다. 보통 파이프라인을 사용할 경우 다른 중단 호출들과 함께 작업을 수행하게 되는데(원격 서비스로의 비동기 호출 같은) 이러한 파이프라인의 기능은 buildSquence나 buildIterator 같은 것들로는 만들어낼 수 없다. 그 이유는 이것들은 비동기를 전반적으로 지원하는 produce {}와는 달리 임의의 시점에 실행 중단을 허용하지 않기 때문이다.

 

Fan-out

하나의 채널로부터 두개 이상의 수신 코루틴들이 데이터를 분배하여 수신받을 수 있다.

fun main(args: Array<String>) = runBlocking<Unit> {
    val producer = produceNumbers()
    repeat(5) { launchProcessor(it, producer) }
    delay(950L)
    producer.cancel()
}

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1
    while (true) {
        send(x++)
        delay(100L)
    }
}

fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
    for (msg in channel) {
        println("Processor #$id received $msg")
    }
}

produceNumbers{}는 주기적으로 초당 10개의 정수를 생산해 내는 프로듀서이다. 그리고 launchProcessor{} 프로세서 코루틴은 그들의 id와 수신받은 정수를 출력하는 동작만 수행하는 코루틴이며 main 함수에서 5개 생성된다. 이를 실행하면 다음과 유사한 결과를 얻는다.

Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10

예제에서 마지막에 producer 코루틴이 자신의 채널을 닫고 있는데, 이는 이 채널을 순회(iteration)하고 있는 프로세서 코루틴들이 종료되도록 만든다. 또한 프로세서 코루틴은 consumeEach 확장 함수가 아니라 for-loop으로 채널을 순회하고 있는데, 이것은 지금과 같이 코루틴이 두개 이상 함께 사용될때 유용한 방법이다. for-loop을 사용하면 프로세서 코루틴들 중에 하나가 실패하더라도 나머지 프로세서 코루틴들은 연산을 이어나가게 만들수 있다. 반대로 consumeEach 확장 함수를 이용했다면 각 프로세서 코루틴들은 다른 프로세서 코루틴의 정상 종료 혹은 비정상 종료에 대해서 취소 이벤트를 전파 받고 모두 종료될 것이다.

fun main(args: Array<String>) = runBlocking<Unit> {
    val producer = produceNumbers()
    repeat(5) {
        val job = launchProcessor(it, producer)
        if (it == 3) {
            delay(200)
            job.cancel()
        }
    }
    delay(950L)
    producer.cancel()
}

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1
    while (true) {
        send(x++)
        delay(100L)
    }
}

fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {

    channel.consumeEach {
        println("Processor #$id received $it")
    }
}

위 예제는 프로세서 코루틴의 채널 순회 방식을 consumeEach로 바꾸고, 메인 함수에서 프로세서 코루틴 중 하나가 200ms 후에 취소되도록 한것이다. 이를 실행하면 프로세서 하나가 취소됨에 따라 나머지 프로세서들도 취소되는 것을 확인할 수 있다.

 

Fan-In

두 개 이상의 코루틴들이 동일한 하나의 채널로 데이터를 전송할 수 있다. 예를 들어 우리가 문자열 채널을 하나 가지고 있다고 가정해보자. 그리고 일정 시간마다 특정 문자열을 반복적으로 이 채널로 전송하는 중단 함수가 있다고 생각해보자.

 

문자열을 전송하는 코루틴 몇 개를 동시에 실행하면 어떻게 동작하는지 다음 예제를 통해 살펴본다. (예제에서는 코루틴들을 메인 코루틴의 자식 코루틴으로 만들고 메인스레드 컨텍스트에서 수행되도록 한다)

fun main(args: Array<String>) = runBlocking<Unit> {
    val channel = Channel<String>()
    launch { sendString(channel, "Foo", 200L) }
    launch { sendString(channel, "Bar", 500L) }
    repeat(6) {
        println(channel.receive())
    }
    coroutineContext.cancelChildren()
}

suspend fun sendString(channel: SendChannel<String>, text: String, time: Long) {
    while (true) {
        delay(time)
        channel.send(text)
    }
}
Foo
Foo
Bar
Foo
Foo
Bar

 

버퍼가 있는 채널 (Buffered channels)

지금까지 살펴본 채널들은 버퍼가 없다. 버퍼가 없는 채널들은 송신자와 수신자가 만나야만 데이터를 전달한다(랑데뷰). 만약 송신이 먼저 일어나면 수신이 일어날때까지 송신자는 중단된다. 반대로 수신이 먼저 일어나면 송신이 일어날때까지 수신자는 중단된다.

 

Channel<T>() 팩토리 함수와 produce{ } 빌더는 모두 capacity라는 버퍼 사이즈를 설정할 수 있는 옵셔널 파라미터를 가지고 있다. 버퍼는 송신자가 중단되기 전에 버퍼의 수용량만큼 더 송신할 수 있도록 해준다. 최대 수용량이 지정된 BlockingQueue와 유사하게 수용량의 최대치에 도달하면 송신자는 중단된다.

fun main(args: Array<String>) = runBlocking<Unit> {
    val channel = Channel<Int>(4)
    
    val sender = launch {
        repeat(10) {
            print("Try to send $it : ")
            channel.send(it)
            print("Done\n")
        }
    }
    
    delay(1000)
    sender.cancel()
}

위 예제는 수용량이 4인 채널을 생성한 후 생성된 채널로 10개의 정수를 송신하는 코드이다.

Try to send 0 : Done
Try to send 1 : Done
Try to send 2 : Done
Try to send 3 : Done
Try to send 4 :

수신하는 코루틴이 없기 때문에 5번째 송신에서 송신자가 중단되어 있음을 알 수 있다. (Done이 출력되지 않음)

 

채널의 공정성 (Channels are fair)

2개 이상의 코루틴들이 하나의 채널로 송/수신을 수행한다면 실행 순서는 그 호출 순서에 따라 공정하게 할당되며 FIFO(First-In First-Out) 방식으로 스케쥴링 된다. 다시말해, 처음 receive()를 호출한 코루틴이 데이터를 먼저 수신한다. 다음 예제는 ping 코루틴과 pong 코루틴이 table 채널로부터 공유된 ball 오브젝트를 수신하는 예제이다.

data class Ball(var hits: Int)

fun main(args: Array<String>) = runBlocking<Unit> {
    val table = Channel<Ball>()
    
    launch { player("ping", table) }
    launch { player("pong", table) }
    
    table.send(Ball(0))
    delay(1000)
    coroutineContext.cancelChildren()
}

suspend fun player(name: String, table: Channel<Ball>) {
    for (ball in table) {
        ball.hits++
        println("$name $ball")
        // Comment out below delay to see the fairness a bit more.
        delay(300)
        table.send(ball)
    }
}

"ping" 코루틴이 먼저 시작했기 때문에 먼저 공을 수신한다. 이때, 주석의 내용처럼 delay() 함수를 지운다고 해서 그 순서가 변경되지는 않는다. 왜냐하면 "ping" 코루틴이 송신하고 바로 수신으로 돌아간다고 하더라도 이미 "pong" 코루틴이 수신 대기 중이기 때문이다. 하지만 간혹 채널이 공정하지 못한 실행을 보이기도 하는데 이것은 사용되는 Executor의 특성 때문이다.

 

Ticker channels

Ticker channel은 마지막 수신 이후에 지정된 지연 시간이 지나면 Unit 오브젝트를 송신하는 채널이다. 이 자체로는 큰 실용성이 없을 수 있지만 시간 기반으로 무언가 처리해야 하는 파이프라인 곳에서는 의미가 있을수 있다.

 

이 채널을 만들기 위해서는 ticker() 팩토리 함수를 사용하면 되고, 더이상 필요가 없을 경우에는 RecieveChannel에 cancel() 함수를 호출하면 된다.

 

ticker() 팩토리 함수를 이용해 채널을 생성하면 기본 TickerMode가 FIXED_PERIOD이다. 이 모드는 수신자가 지연되는 것을 인지하고 지연이 발생하면 다음 송신을 그에 맞게 조절하여 데이터 발생을 지정된 지연 시간에 최대한 맞게 맞추어 준다.

fun main(args: Array<String>) = runBlocking<Unit> {
    val tickerChannel = ticker(
        delayMillis = 100,
        initialDelayMillis = 0
    ) // create ticker channel
    var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Initial element is available immediately: $nextElement") // initial delay hasn't passed yet
    
    nextElement = 
        withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements has 100ms delay
    println("Next element is not ready in 50 ms: $nextElement")
    
    nextElement = withTimoutOrNull(60) { tickerChannel.receive() }
    println("Next element is ready in 100 ms: $nextElement")
    
    // Emulate large consumption delays
    println("Consumer pauses for 300ms")
    delay(300)
    // Next element is available immediately
    nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Next element is available immediately after large consumer delay: $nextElement")
    // Note that the pause between `receive` calls is taken into account and next element arrives faster
    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
    println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")

    tickerChannel.cancel() // indicate that no more elements are needed
}
Initial element is available immediately: kotlin.Unit
Next element is not ready in 50 ms: kotlin.Unit
Next element is ready in 100 ms: null
Consumer pauses for 300ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit

반면, ticker 생성시 다음과 같이 FIXED_DELAY를 사용하면 다음의 결과와 같이 수신 후 지정한 지연 시간이 지나야 다음 데이터를 수신할 수 있다.

val tickerChannel = ticker(
        delayMillis = 100,
        initialDelayMillis = 0,
        mode = TickerMode.FIXED_DELAY
    ) // create ticker channel
Initial element is available immediately: kotlin.Unit
Next element is not ready in 50 ms: null
Next element is ready in 100 ms: kotlin.Unit
Consumer pauses for 300ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: null