Kotlin

[Kotlin] Asynchronous Programming Techniques - 2

kahnco 2024. 8. 7. 00:43
반응형

지난 시간에 이어서 코틀린의 비동기 프로그래밍 기술에 대해서 알아보겠습니다.


Asynchronous Flow

코틀린 플로우는 비동기적으로 여러 값을 반환하기 위한 방법을 제공합니다. 이는 컬렉션과 시퀀스를 사용하는 방법과 유사하지만, 플로우는 비동기적으로 값을 계산하고 반환할 수 있다는 점에서 차이가 있습니다.

 


Representing Mutiple Values

코틀린에서 여러 값을 표현하는 기본적인 방법은 컬렉션을 사용하는 것입니다. 예를 들어, 세 개의 숫자를 포함하는 리스트를 반환하는 간단한 함수는 다음과 같습니다.

fun simple(): List<Int> = listOf(1, 2, 3)

fun main() {
    simple().forEach { value -> println(value) } 
}

// 출력 결과
1
2
3

 

Sequences

CPU를 많이 사용하는 블로킹 코드로 숫자를 계산하는 경우, 시퀀스를 사용하여 숫자를 표현할 수 있습니다.

fun simple(): Sequence<Int> = sequence { 
    for (i in 1..3) {
        Thread.sleep(100) 
        yield(i)
    }
}

fun main() {
    simple().forEach { value -> println(value) } 
}

 

위 코드는 동일한 숫자를 출력하지만 각 숫자를 출력하기 전에 100ms 동안 대기합니다.

 

Suspending Functions

그러나 이 계산은 코드를 실행하는 메인 스레드를 블로킹합니다. 이러한 값을 비동기적으로 계산할 때는 suspend 수식어로 simple 함수를 표시하여 블로킹하지 않고 작업을 수행하고 결과를 리스트로 반환할 수 있습니다.

suspend fun simple(): List<Int> {
    delay(1000)
    return listOf(1, 2, 3)
}

fun main() = runBlocking<Unit> {
    simple().forEach { value -> println(value) } 
}

 

Flows

List<Int> 반환 타입을 사용하면 모든 값을 한 번에 반환해야 합니다. 비동기적으로 계산되는 값을 표현하려면 Flow<Int> 타입을 사용할 수 있습니다.

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    launch {
        for (k in 1..3) {
            println("I'm not blocked $k")
            delay(100)
        }
    }
    simple().collect { value -> println(value) } 
}

// 출력 결과
I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3

 

위 코드는 100ms 마다 숫자를 출력하며, 메인 스레드를 블로킹하지 않습니다.

 


Flows are Cold

플로우는 시퀀스와 유사하게 Cold Stream 입니다. 즉, 플로우 빌더 내부의 코드는 플로우가 수집될 때까지 실행되지 않습니다. 이는 다음 예제에서 명확하게 드러납니다.

fun simple(): Flow<Int> = flow {
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    println("Calling simple function...")
    val flow = simple()
    println("Calling collect...")
    flow.collect { value -> println(value) } 
    println("Calling collect again...")
    flow.collect { value -> println(value) } 
}

// 출력 결과
Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3

 


Flow Cancellation Basics

플로우는 코루틴의 일반적인 cooperative cancellation 규칙을 따릅니다. 예를 들어, 아래 예제 코드는 withTimeoutOrNull 블록에서 플로우가 취소되는 경우를 보여줍니다.

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100)
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    withTimeoutOrNull(250) {
        simple().collect { value -> println(value) } 
    }
    println("Done")
}

// 출력 결과
Emitting 1
1
Emitting 2
2
Done

Flow Builders

플로우를 선언하는 여러 빌더가 있습니다.

  • flowOf 빌더는 고정된 값 집합을 emit 하는 플로우를 정의합니다.
  • 다양한 컬렉션과 시퀀스는 .asFlow() 확장 함수를 사용하여 플로우로 변환할 수 있습니다.

예를 들어, 숫자 1부터 3까지를 emit 하는 플로우는 다음과 같이 재작성할 수 있습니다.

(1..3).asFlow().collect { value -> println(value) }

 


Intermediate Flow Operators

플로우는 컬렉션과 시퀀스를 변환하는 것처럼 연산자를 사용하여 변환할 수 있습니다. 이러한 연산자는 Upstream Flow 에 적용되어 Downstream Flow 를 반환합니다. 이러한 연산자는 Cold 특징을 지니며, 호출 자체는 일시 중단 함수가 아닙니다.

 

가장 기본적인 연산자는 mapfilter 입니다. 이 연산자 내부의 코드 블록은 일시 중단 함수를 호출할 수 있습니다.

suspend fun performRequest(request: Int): String {
    delay(1000)
    return "response $request"
}

fun main() = runBlocking<Unit> {
    (1..3).asFlow()
        .map { request -> performRequest(request) }
        .collect { response -> println(response) }
}

// 출력 결과
response 1
response 2
response 3

 

Transform Operator

가장 일반적인 플로우 변환 연산자는 transform 입니다. 이를 사용하여 map 과 filter 같은 간단한 변환을 흉내내거나 더 복잡한 변환을 구현할 수 있습니다.

(1..3).asFlow()
    .transform { request ->
        emit("Making request $request")
        emit(performRequest(request))
    }
    .collect { response -> println(response) }
    
// 출력 결과
Making request 1
response 1
Making request 2
response 2
Making request 3
response 3

 

Size-Limiting Operators

크기 제한 중간 연산자는 해당 한도에 도달하면 플로우 실행을 취소합니다.

fun numbers(): Flow<Int> = flow {
    try {
        emit(1)
        emit(2)
        println("This line will not execute")
        emit(3)
    } finally {
        println("Finally in numbers")
    }
}

fun main() = runBlocking<Unit> {
    numbers()
        .take(2)
        .collect { value -> println(value) }
}

// 출력 결과
1
2
Finally in numbers

Terminal Flow Operators

터미널 연산자는 플로우의 수집을 시작하는 일시 중단 함수입니다. collect 연산자는 가장 기본적인 연산자이며, 다양한 컬렉션으로 변환하는 연산자 (toList, toSet), 첫번째 값을 가져오는 연산자, 값을 줄이는 연산자 (reduce, fold) 등이 있습니다.

val sum = (1..5).asFlow()
    .map { it * it }
    .reduce { a, b -> a + b }
println(sum)

// 출력 결과
55

 


Flows are Sequential

각 플로우 수집은 순차적으로 수행됩니다. 즉, 각 방출된 값은 모든 중간 연산자를 통과하여 터미널 연산자에 도달합니다.

(1..5).asFlow()
    .filter {
        println("Filter $it")
        it % 2 == 0
    }
    .map {
        println("Map $it")
        "string $it"
    }.collect {
        println("Collect $it")
    }

// 출력 결과
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5

 


Flow Context

플로우 수집은 호출 코루틴의 컨텍스트에서 항상 발생합니다. 예를 들어, 다음 코드는 간단한 플로우를 수집하는 컨텍스트를 지정합니다.

withContext(context) {
    simple().collect { value ->
        println(value)
    }
}

 

플로우 빌더의 코드 (flow { ... }) 는 수집자의 컨텍스트에서 실행됩니다.

fun simple(): Flow<Int> = flow {
    log("Started simple flow")
    for (i in 1..3) {
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    simple().collect { value -> log("Collected $value") }
}

// 출력 결과
[main @coroutine#1] Started simple flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3

 

withContext 사용 시 주의 사항

 

CPU 집약적인 코드는 Dispatchers.Default 컨텍스트에서 실행될 필요가 있습니다. 그러나 flow { ... } 빌더의 코드는 컨텍스트 보존 속성을 따라야 하므로 다른 컨텍스트에서 방출할 수 없습니다.

fun simple(): Flow<Int> = flow {
    withContext(Dispatchers.Default) {
        for (i in 1..3) {
            Thread.sleep(100)
            emit(i)
        }
    }
}

fun main() = runBlocking<Unit> {
    simple().collect { value -> println(value) }
}

// 출력 결과
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated: Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323], but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, Dispatchers.Default]. Please refer to 'flow' documentation or use 'flowOn' instead

 

flowOn Operator

flowOn 연산자를 사용하여 플로우의 컨텍스트를 변경하는 방법은 아래와 같습니다.

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        Thread.sleep(100)
        log("Emitting $i")
        emit(i)
    }
}.flowOn(Dispatchers.Default)

fun main() = runBlocking<Unit> {
    simple().collect { value -> log("Collected $value") }
}

// 출력 결과
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3

 


Buffering

플로우의 서로 다른 부분을 다른 코루틴에서 실행하면 전체 수집 기간을 단축할 수 있습니다. buffer 연산자를 사용하여 플로우를 버퍼링할 수 있습니다.

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        simple()
            .buffer()
            .collect { value -> 
                delay(300)
                println(value)
            } 
    }   
    println("Collected in $time ms")
}

// 출력 결과
1
2
3
Collected in 1071 ms

 

Conflation

컨플레이션은 수집자가 값을 처리하는 데 너무 느린 경우 중간 값을 건너뛸 수 있습니다.

val time = measureTimeMillis {
    simple()
        .conflate()
        .collect { value -> 
            delay(300)
            println(value) 
        } 
}   
println("Collected in $time ms")

// 출력 결과
1
3
Collected in 758 ms

 

Processing the Latest Value (최신 값 처리)

collectLatest 연산자를 사용하여 최신 값을 수집하고 누린 수집기를 취소하고 재시작할 수 있습니다.

val time = measureTimeMillis {
    simple()
        .collectLatest { value ->
            println("Collecting $value") 
            delay(300)
            println("Done $value") 
        } 
}   
println("Collected in $time ms")

// 출력 결과
Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 741 ms

 


Composing Mutiple Flows

여러 플로우를 구성하는 것에는 다양한 방법이 있습니다.

 

Zip

두 플로우의 값을 결합하는 zip 연산자

val nums = (1..3).asFlow()
val strs = flowOf("one", "two", "three")
nums.zip(strs) { a, b -> "$a -> $b" }
    .collect { println(it) }
    
// 출력 결과
1 -> one
2 -> two
3 -> three

 

Combine

combine 연산자는 각 플로우의 최신 값을 기반으로 계산을 수행합니다.

val nums = (1..3).asFlow().onEach { delay(300) }
val strs = flowOf("one", "two", "three").onEach { delay(400) }
val startTime = System.currentTimeMillis()
nums.combine(strs) { a, b -> "$a -> $b" }
    .collect { value -> println("$value at ${System.currentTimeMillis() - startTime} ms from start") }

// 출력 결과
1 -> one at 452 ms from start
2 -> one at 651 ms from start
2 -> two at 854 ms from start
3 -> two at 952 ms from start
3 -> three at 1256 ms from start

 


Flattening Flows

코틀린의 플로우는 비동기적으로 값을 처리하는 스트림을 나타내며, 각 값이 또 다른 값의 시퀀스를 요청하는 상황에 쉽게 처할 수 있습니다. 예를 들어, 다음과 같은 함수가 있으며 이 함수는 500ms 간격으로 두 개의 문자열을 반환하는 플로우를 생성합니다.

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First")
    delay(500) // 500ms 대기
    emit("$i: Second")
}

 

이제 세 개의 정수를 포함하는 플로우가 있고, 각 정수에 대해 requestFlow 를 호출하면 다음과 같은 플로우가 생성됩니다.

(1..3).asFlow().map { requestFlow(it) }

 

이 경우 Flow<Flow<String>> 타입의 중첩된 플로우가 생기며, 이를 단일 플로우로 Flattening 하여 추가 처리가 필요합니다. 컬렉션과 시퀀스에는 이러한 작업을 위한 flatten 및 flatMap 연산자가 있지만, 플로우의 비동기적 특성 때문에 이를 처리하는 데는 여러 가지 평탄화 모드가 필요합니다. 따라서, 플로우에서는 평탄화를 위한 다양한 연산자들이 제공됩니다.

 

flatMapConcat

flatMapConcat 연산자는 내부 플로우를 순차적으로 수집합니다. 첫 번째 플로우가 완료된 후에야 다음 플로우를 수집하기 시작합니다. 다음 예제를 통해 flatMapConcat 연산자의 동작을 살펴보겠습니다.

val startTime = System.currentTimeMillis() // 시작 시간 기록
(1..3).asFlow().onEach { delay(100) } // 100ms마다 숫자 방출
    .flatMapConcat { requestFlow(it) }
    .collect { value -> // 수집 및 출력
        println("$value at ${System.currentTimeMillis() - startTime} ms from start")
    }

// 출력 결과
1: First at 121 ms from start
1: Second at 622 ms from start
2: First at 727 ms from start
2: Second at 1227 ms from start
3: First at 1328 ms from start
3: Second at 1829 ms from start

 

flatMapMerge

flatMapMerge 연산자는 모든 내부 플로우를 동시에 수집합니다. 즉, 내부 플로우가 생성되면 즉시 수집을 시작하여 값을 가능한 빨리 방출합니다. 이 연산자는 선택적으로 수집할 동시 플로우의 수를 제한하는 concurrency 파라미터를 가질 수 있으며, 기본 값은 DEFAULT_CONCURRENCY 입니다.

val startTime = System.currentTimeMillis() // 시작 시간 기록
(1..3).asFlow().onEach { delay(100) } // 100ms마다 숫자 방출
    .flatMapMerge { requestFlow(it) }
    .collect { value -> // 수집 및 출력
        println("$value at ${System.currentTimeMillis() - startTime} ms from start")
    }

// 출력 결과
1: First at 136 ms from start
2: First at 231 ms from start
3: First at 333 ms from start
1: Second at 639 ms from start
2: Second at 732 ms from start
3: Second at 833 ms from start

 

flatMapLatest

flatMapLatest 연산자는 새로운 값이 방출될 때 이전 플로우 수집을 취소하고 최신 플로우를 재시작합니다. 이는 수집 중인 플로우가 최신 상태를 유지하도록 합니다.

val startTime = System.currentTimeMillis() // 시작 시간 기록
(1..3).asFlow().onEach { delay(100) } // 100ms마다 숫자 방출
    .flatMapLatest { requestFlow(it) }
    .collect { value -> // 수집 및 출력
        println("$value at ${System.currentTimeMillis() - startTime} ms from start")
    }

// 출력 결과
1: First at 142 ms from start
2: First at 322 ms from start
3: First at 425 ms from start
3: Second at 931 ms from start
  • 첫 번째 값 1이 100ms 후에 방출되고 requestFlow(1)이 호출됩니다. "1: First"가 출력됩니다.
  • 두 번째 값 2이 방출되면 requestFlow(1)의 나머지 부분이 취소되고 requestFlow(2)이 호출됩니다. "2: First"가 출력됩니다.
  • 세 번째 값 3이 방출되면 requestFlow(2)의 나머지 부분이 취소되고 requestFlow(3)이 호출됩니다. "3: First"가 출력됩니다.
  • 마지막으로 requestFlow(3)이 완료되어 "3: Second"가 출력됩니다.

Flow Exceptions

플로우 수집은 예외가 발생할 수 있으며, 이를 처리하는 다양한 방법이 있습니다.

 

Collector try and catch

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value -> 
            println(value)
            check(value <= 1) { "Collected $value" }
        }
    } catch (e: Throwable) {
        println("Caught $e")
    }
}

// 출력 결과
Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2

 

Catch Operator

catch 연산자를 사용하여 예외를 캡슐화하고 처리할 수 있습니다.

simple()
    .catch { e -> emit("Caught $e") }
    .collect { value -> println(value) }

// 출력 결과
Emitting 1
1
Emitting 2
Caught java.lang.IllegalStateException: Collected 2

 


Exception Transparency

플로우의 예외 처리에서 중요한 원칙은 Exception Transparency 입니다. 이는 플로우 내에서 발생하는 예외가 캡슐화되지 않고 수집기에서 직접 처리할 수 있도록 하는 것을 의미합니다.

 

예외 투명성을 유지하기 위한 catch 연산자

플로우 내에서 예외를 캡슐화하지 않고 투명하게 처리하기 위해 catch 연산자를 사용할 수 있습니다. catch 연산자는 예외를 분석하고, 재발생시키거나 값을 방출하거나, 로그를 기록하는 등의 작업을 수행할 수 있습니다.

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    simple()
        .catch { e -> emit("Caught $e") } // 예외 발생 시 텍스트 방출
        .collect { value -> println(value) }
}

// 출력 결과
Emitting 1
1
Emitting 2
2
Emitting 3
3

 

Transparent Catch

catch 연산자는 업스트림에서 발생한 예외만을 처리합니다. 따라서 catch 연산자 아래에 있는 collect 블록에서 발생한 예외는 처리되지 않습니다.

 

예제: catch 연산자가 다운스트림 예외를 처리하지 않는 경우

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    simple()
        .catch { e -> println("Caught $e") } // 다운스트림 예외를 처리하지 않음
        .collect { value ->
            check(value <= 1) { "Collected $value" }
            println(value)
        }
}

// 출력 결과
Emitting 1
1
Emitting 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
	at ...

 

 

Catching Declaratively

catch 연산자의 선언적 특성을 활용하여 모든 예외를 처리하려면 collect 연산자의 내용을 onEach 연산자로 옮기고, catch 연산자를 그 아래에 두어야 합니다.

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    simple()
        .onEach { value ->
            check(value <= 1) { "Collected $value" }
            println(value)
        }
        .catch { e -> println("Caught $e") }
        .collect()
}

// 출력 결과
Emitting 1
1
Emitting 2
Caught java.lang.IllegalStateException: Collected 2

 

위 예제에서는 onEach 연산자에서 모든 예외를 선언적으로 처리할 수 있습니다. 이로 인해 "Caught ..." 메시지가 출력됩니다.

 


Flow Completion

플로우 수집이 정상적 혹은 예외적으로 완료되면 특정 작업을 실행할 필요가 있을 수 있습니다. 이러한 작업은 두 가지 방식으로 수행될 수 있습니다: Imperative / Declarative

 

명령적 finally 블록

try/catch 블록 외에도, 수집기가 finally 블록을 사용하여 플로우 수집이 완료될 때 작업을 실행할 수 있습니다.

fun simple(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value -> println(value) }
    } finally {
        println("Done")
    }
}

// 출력 결과
1
2
3
Done

 

선언적 처리

선언적 접근 방식에서는 플로우의 onCompletion 중간 연산자를 사용합니다. 이 연산자는 플로우 수집이 완전히 완료되었을 때 호출됩니다.

simple()
    .onCompletion { println("Done") }
    .collect { value -> println(value) }

// 출력 결과
1
2
3
Done

 

onCompletion 연산자의 주요 이점은 람다의 인자로 nullableThrowable 을 받아 플로우 수집이 정상적으로 완료되었는지 또는 예외적으로 완료되었는지 판단할 수 있다는 점입니다.

 

다음 예제는 플로우가 숫자 1을 방출한 후 예외를 발생시키는 경우입니다.

fun simple(): Flow<Int> = flow {
    emit(1)
    throw RuntimeException()
}

fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
        .catch { cause -> println("Caught exception") }
        .collect { value -> println(value) }
}

// 출력 결과
1
Flow completed exceptionally
Caught exception

 

onCompletion 연산자는 예외를 처리하지 않으며, 예외는 여전히 다운스트림으로 전달됩니다. 추가적인 onCompletion 연산자에게도 전달되며, catch 연산자로 처리할 수 있습니다.

 

Successful Completion

또 다른 차이점은 catch 연산자와 달리 onCompletion은 모든 예외를 볼 수 있으며, 업스트림 플로우가 성공적으로 완료된 경우에만 null 예외를 받습니다.

fun simple(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { cause -> println("Flow completed with $cause") }
        .collect { value ->
            check(value <= 1) { "Collected $value" }
            println(value)
        }
}

// 출력 결과
1
Flow completed with java.lang.IllegalStateException: Collected 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2

 

위 예제에서는 플로우가 다운스트림 예외로 인해 중단되었으므로 완료 원인이 null이 아닙니다.

 

명령형 VS 선언형

이제 우리는 플로우를 수집하고, 명령형 및 선언형 방식으로 완료 및 예외를 처리하는 방법을 알게 되었습니다. 자연스럽게 "어떤 접근 방식을 선호해야 하는가?"라는 질문이 생깁니다. 라이브러리 측에서는 특정 접근 방식을 권장하지 않습니다. 두 가지 옵션 모두 유효하며, 개인의 선호도와 코드 스타일에 따라 선택하면 됩니다.

 


Launching Flow

플로우는 소스로부터 비동기 이벤트를 나타내는 데 사용할 수 있습니다. 이러한 경우, addEventListener 함수와 유사한 기능이 필요합니다. 이 함수는 이벤트가 발생할 때 반응하는 코드를 등록하고, 추가 작업을 계속 수행합니다. onEach 연산자가 이 역할을 할 수 있습니다. 그러나 onEach는 중간 연산자이기 때문에 플로우를 수집하려면 터미널 연산자가 필요합니다. 그렇지 않으면 onEach를 호출하는 것만으로는 효과가 없습니다.

 

만약 onEach 다음에 collect 터미널 연산자를 사용하면, 해당 플로우가 수집될 때까지 이후의 코드는 대기합니다:

// 이벤트 플로우를 모방
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .collect() // <--- 플로우 수집 대기
    println("Done")
}

// 출력 결과
Event: 1
Event: 2
Event: 3
Done

 

launchIn 터미널 연산자를 사용하면 별도의 코루틴에서 플로우 수집을 시작할 수 있으므로 이후의 코드는 즉시 실행됩니다.

fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .launchIn(this) // <--- 별도의 코루틴에서 플로우 수집
    println("Done")
}

// 출력 결과
Done
Event: 1
Event: 2
Event: 3

 

launchIn에 필요한 매개변수는 플로우를 수집할 코루틴 스코프를 지정해야 합니다. 위 예제에서 이 스코프는 runBlocking 코루틴 빌더에서 제공되므로, 플로우가 실행되는 동안 runBlocking 스코프는 자식 코루틴의 완료를 기다립니다. 실제 응용 프로그램에서는 스코프가 제한된 수명을 가진 엔티티에서 제공됩니다. 이 엔티티의 수명이 종료되면 해당 스코프가 취소되어 해당 플로우 수집도 취소됩니다.

 

launchIn은 또한 Job을 반환하므로, 해당 플로우 수집 코루틴만 취소하거나 대기(join)하는 데 사용할 수 있습니다.

 

Flow Cancellation Checks

편의를 위해, 플로우 빌더는 각 방출 값에 대해 추가적으로 ensureActive 검사를 수행하여 취소 가능성을 확인합니다. 이는 다음과 같이 바쁜 루프에서 방출되는 플로우를 취소할 수 있음을 의미합니다.

fun foo(): Flow<Int> = flow {
    for (i in 1..5) {
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    foo().collect { value ->
        if (value == 3) cancel()
        println(value)
    }
}

// 출력 결과
Emitting 1
1
Emitting 2
2
Emitting 3
3
Emitting 4
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@6d7b4f4c

 

다른 대부분의 플로우 연산자는 성능상의 이유로 추가적인 취소 검사를 수행하지 않습니다. 예를 들어, IntRange.asFlow 확장을 사용하여 동일한 바쁜 루프를 작성하고 어디에서도 일시 중단하지 않으면, 취소 검사가 수행되지 않습니다.

fun main() = runBlocking<Unit> {
    (1..5).asFlow().collect { value ->
        if (value == 3) cancel()
        println(value)
    }
}

// 출력 결과
1
2
3
4
5
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@3327bd23

 

Making Busy Flow Cancellable

코루틴과 함께 바쁜 루프가 있는 경우, 취소를 명시적으로 확인해야 합니다. onEach { currentCoroutineContext().ensureActive() } 를 추가할 수 있지만, 이를 위해 제공되는 cancellable 연산자를 사용하는 것이 좋습니다.

fun main() = runBlocking<Unit> {
    (1..5).asFlow().cancellable().collect { value ->
        if (value == 3) cancel()
        println(value)
    }
}

// 출력 결과
1
2
3
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@5ec0a365

 


Flow and Reactive Streams

리액티브 스트림이나 RxJava, 프로젝트 리액터와 같은 리액티브 프레임워크에 익숙한 사람들에게, 플로우의 디자인은 매우 익숙하게 보일 수 있습니다. 플로우의 주요 목표는 가능한 한 간단한 디자인을 유지하고, 코틀린과 일시 중단 친화적이며, 구조적 동시성을 존중하는 것입니다.

 

플로우리액티브 스트림과 개념적으로 유사하지만, 서로 다른 점도 존재합니다. kotlinx.coroutines는 리액티브 스트림으로 변환하거나 그 반대로 변환할 수 있는 변환기를 제공합니다. 이 변환기는 kotlinx-coroutines-reactive, kotlinx-coroutines-reactor, kotlinx-coroutines-rx2, kotlinx-coroutines-rx3와 같은 모듈에서 찾을 수 있습니다. 이들 모듈은 리액터의 컨텍스트와의 통합 및 다양한 리액티브 엔티티와 함께 작동하는 일시 중단 친화적인 방법을 포함합니다.

반응형

'Kotlin' 카테고리의 다른 글

[Kotlin] Asynchronous Programming Techniques - 1  (0) 2024.08.05
[Kotlin] Types  (0) 2024.08.01
[Kotlin] Basic Syntax  (0) 2024.07.31
[Kotlin] 언어 소개  (0) 2024.07.21