Kotlin

[Kotlin] Asynchronous Programming Techniques - 1

kahnco 2024. 8. 5. 16:44
반응형

Asynchronous Programming Techniques

비동기 프로그래밍은 응용 프로그램이 차단되지 않도록 하는 데 중요한 역할을 합니다. 이는 데스크탑, 모바일 또는 서버 측 응용 프로그램을 개발할 때 사용자 대기 시간을 줄이거나 병목 현상을 방지하여 애플리케이션의 확장을 방해하지 않도록 하는 데 필수적입니다. 코틀린에서는 여러 가지 비동기 프로그래밍 기법을 지원합니다.

 

Threading

스레드는 가장 널리 알려진 비동기 프로그래밍 접근 방식입니다. 스레드를 사용하면 메인 스레드의 차단을 방지할 수 있습니다. 예를 들어, 오래 실행되는 작업을 별도의 스레드에서 실행하여 UI 스레드의 차단을 피할 수 있습니다.

fun postItem(item: Item) {
    val token = preparePost()
    val post = submitPost(token, item)
    processPost(post)
}

fun preparePost(): Token {
    // 오래 실행되는 작업으로 메인 스레드를 차단합니다.
    return token
}

 

위 코드는 preparePost 함수가 오래 실행되어 메인 스레드를 차단하는 상황을 가정합니다. 이를 해결하기 위해 별도의 스레드에서 작업을 실행할 수 있습니다.

 

그러나 스레딩은 여러 가지 단점을 가지고 있습니다.

  • 스레드는 비싸고 컨텍스트 전환 비용이 큽니다.
  • 스레드는 무한하지 않습니다. 운영 체제가 허용하는 스레드 수에는 한계가 있습니다.
  • 일부 플랫폼은 스레드를 지원하지 않습니다.
  • 스레드 디버깅과 경쟁 조건(race condition) 문제는 복잡합니다.

 

Callbacks

 

콜백은 하나의 함수를 다른 함수의 매개변수로 전달하고, 해당 함수가 완료된 후 콜백 함수를 호출하는 방식입니다.

fun postItem(item: Item) {
    preparePostAsync { token ->
        submitPostAsync(token, item) { post ->
            processPost(post)
        }
    }
}

fun preparePostAsync(callback: (Token) -> Unit) {
    // 요청을 보내고 즉시 반환하며, 나중에 콜백을 호출합니다.
}

 

 

콜백 방식은 더 우아한 솔루션처럼 보이지만 다음과 같은 문제점이 있습니다.

  • 중첩된 콜백의 어려움: 콜백 함수가 또 다른 콜백을 필요로 하는 경우 중첩 콜백이 발생하여 코드가 이해하기 어려워집니다.
  • 오류 처리의 복잡성: 중첩된 콜백 모델에서는 오류 처리와 전파가 더 복잡합니다.

 

Futures and Promises

퓨처 또는 프로미스는 호출 시점에 반환될 객체를 약속하는 방식입니다.

fun postItem(item: Item) {
    preparePostAsync()
        .thenCompose { token ->
            submitPostAsync(token, item)
        }
        .thenAccept { post ->
            processPost(post)
        }
}

fun preparePostAsync(): Promise<Token> {
    // 요청을 보내고 나중에 완료될 프로미스를 반환합니다.
    return promise
}

 

 

이 접근 방식은 콜백과 유사하지만 몇 가지 차이점이 있습니다.

  • 다른 프로그래밍 모델: 콜백과 유사하게, 이 모델은 전통적인 명령형 프로그래밍 모델에서 벗어나서, 체인형 호출을 사용하는 합성 모델로 이동합니다.
  • 새로운 API 학습 필요: 새로운 API(thenCompose, thenAccept 등)를 학습해야 합니다.
  • 특정 반환 타입: 실제 데이터 대신 Promise 타입을 반환하며, 이를 처리해야 합니다.

 

Reactive Extensions

Rx 는 데이터 스트림을 다루는 접근 방식입니다. Rx는 옵저버 패턴을 기반으로 하여 데이터 스트림을 관찰하고, 이를 통해 비동기 데이터 흐름을 처리합니다.

// RxJava 예제
Observable.fromCallable { preparePost() }
    .flatMap { token -> Observable.fromCallable { submitPost(token, item) } }
    .subscribe { post -> processPost(post) }

 

 

Rx 는 데이터 스트림을 관찰하고 조작하는 데 유용하며, 오류 처리가 좀 더 우아하게 이루어집니다. 그러나 기존의 동기 코드 작성 방식과는 다른 접근 방식을 요구합니다.

 

Coroutines

코틀린은 코루틴을 사용하여 비동기 코드를 작성합니다. 코루틴은 일시 중단 가능한 계산(suspendable computation) 의 개념을 도입하여 함수 실행을 일시 중단하고 나중에 다시 재개할 수 있습니다.

fun postItem(item: Item) {
    launch {
        val token = preparePost()
        val post = submitPost(token, item)
        processPost(post)
    }
}

suspend fun preparePost(): Token {
    // 요청을 보내고 코루틴을 일시 중단합니다.
    return suspendCoroutine { /* ... */ }
}

 

코루틴의 주요 장점은 다음과 같습니다.

  • 비차단 코드 작성이 동기 코드 작성과 유사합니다.
  • 프로그래밍 모델과 API가 동일하게 유지됩니다. 루프, 예외 처리 등을 계속 사용할 수 있습니다.
  • 플랫폼 독립적입니다. JVM, JavaScript 등 다양한 플랫폼에서 동일한 코드를 사용할 수 있습니다.

코루틴은 새로운 개념이 아니며, 다른 언어에서도 사용되고 있습니다. 코틀린에서는 라이브러리를 통해 대부분의 기능이 제공되며, suspend 키워드 외에는 추가적인 키워드가 필요하지 않습니다.

 

이중에서, 이번 글에서 가장 핵심적으로 다뤄볼 코루틴에 대해서 좀더 파헤쳐보겠습니다.


Coroutines

코틀린은 다른 언어와는 달리 비동기 및 논블로킹 프로그래밍을 위한 저수준 API 를 표준 라이브러리에서 제공합니다. 이를 통해 다른 라이브러리들이 코루틴을 활용할 수 있도록 합니다. 코틀린에서는 async 와 await 같은 키워드가 존재하지 않으며, 표준 라이브러리의 일부도 아닙니다. 대신, 코틀린의 일시 중단 함수(suspending function) 개념은 Futures 와 Promises 보다 안전하고 오류 발생 가능성이 적은 비동기 작업에 대한 추상화를 제공합니다.

 

코루틴은 코틀린에서 비동기 또는 논블로킹 프로그래밍을 지원하는 중요한 기능입니다. 서버 사이드, 데스크탑, 모바일 애플리케이션을 개발할 때, 사용자가 느끼기에 부드러운 경험을 제공하는 동시에 필요한 경우 확장 가능한 애플리케이션을 만드는 것이 중요합니다.

 

코루틴은 일시 중단 가능한 계산 개체입니다. 코루틴은 실행할 코드 블록을 가져가 나머지 코드와 동시에 작동하는 점에서 스레드와 유사합니다. 그러나 코루틴은 특정 스레드에 묶이지 않으며, 한 스레드에서 실행을 일시 중단하고 다른 스레드에서 다시 실행을 재개할 수 있습니다.

 

코루틴은 경량 스레드로 간주할 수 있지만, 스레드와는 여러 중요한 차이점이 있어 실제 사용 시 차이가 큽니다.

 


Coroutine Example

fun main() = runBlocking { // this: CoroutineScope
    launch { // 새로운 코루틴을 시작하고 계속 진행
        delay(1000L) // 1초 동안 논블로킹 일시 중단 (기본 시간 단위는 ms)
        println("World!") // 지연 후 출력
    }
    println("Hello") // 이전 코루틴이 지연되는 동안 메인 코루틴 계속 진행
}

 

 

위 코드는 다음과 같은 결과를 출력합니다.

Hello
World!

 

위 코드를 좀더 자세히 살펴보자면,

  • launch는 새로운 코루틴을 시작하는 코루틴 빌더입니다. 이 함수는 나머지 코드와 동시에 실행되므로 "Hello"가 먼저 출력됩니다.
  • delay는 특수한 일시 중단 함수입니다. 이 함수는 코루틴을 특정 시간 동안 일시 중단합니다. 코루틴을 일시 중단하면 기본 스레드를 차단하지 않고 다른 코루틴이 해당 스레드를 사용할 수 있게 합니다.
  • runBlocking도 코루틴 빌더로, 일반 fun main() 함수와 runBlocking { ... } 코드 블록 내부의 코루틴 코드를 연결합니다. 이 함수는 launch 호출 시 발생하는 Unresolved reference: launch 오류를 방지합니다. runBlocking은 메인 스레드를 차단하여 내부의 모든 코루틴이 완료될 때까지 기다립니다.

 

구조적 동시성 (Structured Concurrency)

 

코루틴은 구조적 동시성 원칙을 따릅니다. 이는 새로운 코루틴이 특정 CoroutineScope 내에서만 시작될 수 있으며, 이는 코루틴의 수명을 제한합니다. runBlocking이 해당 스코프를 설정하여, World! 가 출력된 후 1초 지연 후 종료되는 이유입니다.

 

실제 애플리케이션에서는 많은 코루틴을 실행하게 됩니다. 구조적 동시성은 코루틴이 유실되거나 누출되지 않도록 보장합니다. 외부 스코프는 모든 자식 코루틴이 완료되기 전까지 완료되지 않습니다. 또한, 구조적 동시성은 코드에서 발생하는 모든 오류가 제대로 보고되고 유실되지 않도록 합니다.

 

함수 추출 리팩터링 (Extract Function Refactoring)

launch { ... } 블록 내부의 코드를 별도의 함수로 추출해 보겠습니다. "Extract function" 리팩터링을 수행하면 suspend 수정자를 가진 새로운 함수가 생성됩니다. 이것이 첫 번째 일시 중단 함수입니다. 일시 중단 함수는 코루틴 내에서 일반 함수처럼 사용할 수 있지만, 다른 일시 중단 함수(예: delay)를 사용하여 코루틴의 실행을 일시 중단할 수 있는 추가 기능이 있습니다.

fun main() = runBlocking { // this: CoroutineScope
    launch { doWorld() }
    println("Hello")
}

// 첫 번째 일시 중단 함수
suspend fun doWorld() {
    delay(1000L)
    println("World!")
}

 

 

 

스코프 빌더 (Scope Builder)

코루틴 빌더가 제공하는 코루틴 스코프 외에도 coroutineScope 빌더를 사용하여 자체 스코프를 선언할 수 있습니다. 이는 코루틴 스코프를 생성하며, 모든 자식 코루틴이 완료될 때까지 완료되지 않습니다.

fun main() = runBlocking {
    doWorld()
}

suspend fun doWorld() = coroutineScope { // this: CoroutineScope
    launch {
        delay(1000L)
        println("World!")
    }
    println("Hello")
}

 

 

위 코드도 다음과 같이 출력됩니다.

Hello
World!

 

스코프 빌더와 동시성 (Scope Builder and Concurrency)

suspend 함수 내에서 여러 동시 작업을 수행하려면 coroutineScope 빌더를 사용할 수 있습니다. 아래 예제에서는 doWorld 일시 중단 함수 내에서 두 개의 동시 코루틴을 실행합니다.

fun main() = runBlocking {
    doWorld()
    println("Done")
}

suspend fun doWorld() = coroutineScope { // this: CoroutineScope
    launch {
        delay(2000L)
        println("World 2")
    }
    launch {
        delay(1000L)
        println("World 1")
    }
    println("Hello")
}

 

위 코드는 다음과 같이 출력됩니다.

Hello
World 1
World 2
Done

 

명시적 잡 (Explicit Job)

launch 코루틴 빌더는 실행된 코루틴에 대한 핸들러인 Job 객체를 반환하며, 이를 사용하여 명시적으로 완료를 기다릴 수 있습니다. 예를 들어, 자식 코루틴이 완료될 때까지 기다린 후 "Done" 문자열을 출력할 수 있습니다:

val job = launch { // 새로운 코루틴을 시작하고 그 Job에 대한 참조를 유지
    delay(1000L)
    println("World!")
}
println("Hello")
job.join() // 자식 코루틴이 완료될 때까지 대기
println("Done")

 

 

경량 코루틴 (Coroutines are Light-weight)

코루틴은 JVM 스레드보다 적은 리소스를 사용합니다. 예를 들어, 50,000 개의 코루틴을 각각 5초 동안 기다리고 마침표를 출력하는 다음과 같은 코드가 있습니다.

import kotlinx.coroutines.*

fun main() = runBlocking {
    repeat(50_000) { // 많은 코루틴을 시작
        launch {
            delay(5000L)
            print(".")
        }
    }
}

이 코드는 메모리를 거의 사용하지 않습니다. 동일한 프로그램을 스레드를 사용하여 작성하면 많은 메모리를 소모하게 됩니다. 운영 체제, JDK 버전, 설정에 따라 메모리 부족 오류가 발생하거나 동시에 실행되는 스레드 수를 제한하여 스레드가 천천히 시작될 수 있습니다.

 


Cancellation and Timeouts

이 섹션에서는 코루틴의 취소와 타임아웃에 대해 다룹니다. 장시간 실행되는 애플리케이션에서는 백그라운드 코루틴에 대한 세밀한 제어가 필요할 수 있습니다. 예를 들어, 사용자가 코루틴을 시작한 페이지를 닫은 경우 해당 작업이 더 이상 필요하지 않으므로 취소할 수 있습니다.

 

코루틴 실행 취소 (Cancelling Coroutine Execution)

코루틴 실행을 취소하려면 launch 함수가 반환하는 Job 객체를 사용합니다. Job 객체는 실행 중인 코루틴을 취소하는 데 사용됩니다.

import kotlinx.coroutines.*

fun main() = runBlocking {
    val job = launch {
        repeat(1000) { i ->
            println("job: I'm sleeping $i ...")
            delay(500L)
        }
    }
    delay(1300L) // 잠시 대기
    println("main: I'm tired of waiting!")
    job.cancel() // 작업 취소
    job.join() // 작업 완료까지 대기
    println("main: Now I can quit.")
}

// 결과
job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.

 

 

협력적 취소 (Cooperative Cancellation)

코루틴 취소는 협력적입니다. 코루틴 코드는 취소 가능하도록 협력해야 합니다.

kotlinx.coroutines의 모든 일시 중단 함수는 취소 가능합니다. 이 함수들은 코루틴의 취소를 확인하고 취소 시 CancellationException을 발생시킵니다. 그러나 코루틴이 계산 중이고 취소를 확인하지 않는다면 취소할 수 없습니다.

val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
    var nextPrintTime = startTime
    var i = 0
    while (i < 5) { // 계산 루프, CPU를 낭비합니다.
        if (System.currentTimeMillis() >= nextPrintTime) {
            println("job: I'm sleeping ${i++} ...")
            nextPrintTime += 500L
        }
    }
}
delay(1300L) // 잠시 대기
println("main: I'm tired of waiting!")
job.cancelAndJoin() // 작업 취소 및 완료까지 대기
println("main: Now I can quit.")

 

위 코드는 작업이 완료될 때까지 I'm sleeping 을 계속 출력합니다.

 

계산 코드의 취소 가능성 추가

계산 코드를 취소 가능하게 만드는 방법에는 두 가지가 있습니다. 첫 번째는 취소를 확인하는 일시 중단 함수를 주기적으로 호출하는 것입니다. yield 함수가 이러한 목적으로 좋은 선택입니다. 두번째 방법은 취소 상태를 명시적으로 확인하는 것입니다.

val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
    var nextPrintTime = startTime
    var i = 0
    while (isActive) { // 취소 가능한 계산 루프
        if (System.currentTimeMillis() >= nextPrintTime) {
            println("job: I'm sleeping ${i++} ...")
            nextPrintTime += 500L
        }
    }
}
delay(1300L) // 잠시 대기
println("main: I'm tired of waiting!")
job.cancelAndJoin() // 작업 취소 및 완료까지 대기
println("main: Now I can quit.")

 

isActive는 코루틴 내부에서 사용할 수 있는 CoroutineScope 객체의 확장 속성입니다. 이 코드를 실행하면 이제 루프가 취소됩니다.

 

리소스 닫기와 finally 블록

취소 시 일시 중단 함수는 CancellationException을 발생시킵니다. 이는 일반적인 방식으로 처리할 수 있습니다. 예를 들어, try {...} finally {...} 표현식과 코틀린의 use 함수는 코루틴이 취소될 때 정상적으로 종료 작업을 수행합니다.

val job = launch {
    try {
        repeat(1000) { i ->
            println("job: I'm sleeping $i ...")
            delay(500L)
        }
    } finally {
        println("job: I'm running finally")
    }
}
delay(1300L) // 잠시 대기
println("main: I'm tired of waiting!")
job.cancelAndJoin() // 작업 취소 및 완료까지 대기
println("main: Now I can quit.")

// 출력 결과
job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
job: I'm running finally
main: Now I can quit.

 

Run Non-Cancellable Block

위의 예제에서 finally 블록 내부에서 일시 중단 함수를 사용하려고 하면 CancellationException 이 발생합니다. 이 경우 withContext(NonCancellable) 를 사용하여 해당 코드를 비취소 컨텍스트에서 실행할 수 있습니다.

val job = launch {
    try {
        repeat(1000) { i ->
            println("job: I'm sleeping $i ...")
            delay(500L)
        }
    } finally {
        withContext(NonCancellable) {
            println("job: I'm running finally")
            delay(1000L) // withContext(NonCancellable) 구문을 통해 delay 사용 가능
            println("job: And I've just delayed for 1 sec because I'm non-cancellable")
        }
    }
}
delay(1300L) // 잠시 대기
println("main: I'm tired of waiting!")
job.cancelAndJoin() // 작업 취소 및 완료까지 대기
println("main: Now I can quit.")

// 결과
job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
job: I'm running finally
job: And I've just delayed for 1 sec because I'm non-cancellable
main: Now I can quit.

 

Timeout

코루틴 실행을 취소하는 가장 일반적이고 실용적인 이유는 실행 시간이 일정 타임아웃을 초과했기 때문입니다. 타임아웃을 설정하려면 withTimeout 함수를 사용합니다.

withTimeout(1300L) {
    repeat(1000) { i ->
        println("I'm sleeping $i ...")
        delay(500L)
    }
}

 

위 코드는 다음과 같은 결과를 출력합니다.

I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms

 

타임아웃 발생 시 withTimeout 이 발생시키는 TimeoutCancellationExceptionCancellationException 의 하위 클래스입니다. 타임아웃 시 추가 작업이 필요한 경우 try catch 블록으로 코드를 감싸거나 withTimeoutOrNull 함수를 사용하여 타임아웃 시 예외를 발생시키지 않고 null 을 반환할 수 있습니다.

val result = withTimeoutOrNull(1300L) {
    repeat(1000) { i ->
        println("I'm sleeping $i ...")
        delay(500L)
    }
    "Done" // 이 결과를 생성하기 전에 취소됩니다.
}
println("Result is $result")

// 결과
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Result is null

 

비동기 타임아웃과 리소스

타임아웃 이벤트는 해당 블록 내부 코드 실행과 비동기적이므로 언제든지 발생할 수 있습니다. 따라서 블록 내부에서 리소스를 열거나 획득하고, 블록 외부에서 이를 닫거나 해제할 필요가 있을 때 주의해야 합니다.

var acquired = 0

class Resource {
    init { acquired++ } // 리소스 획득
    fun close() { acquired-- } // 리소스 해제
}

fun main() = runBlocking {
    repeat(10_000) { // 10K 코루틴 시작
        launch {
            val resource = withTimeout(60) { // 60ms 타임아웃
                delay(50) // 50ms 지연
                Resource() // 리소스를 획득하고 반환
            }
            resource.close() // 리소스 해제
        }
    }
    println(acquired) // 여전히 획득된 리소스 수 출력
}

 

위 에제는 항상 0을 출력하지 않을 수 있습니다. 이러한 문제를 해결하려면 리소스를 반환하는 대신 변수에 저장한 후, 블록 외부에서 이를 해제해야 합니다.

runBlocking {
    repeat(10_000) { // 10K 코루틴 시작
        launch {
            var resource: Resource? = null // 아직 획득되지 않음
            try {
                withTimeout(60) { // 60ms 타임아웃
                    delay(50) // 50ms 지연
                    resource = Resource() // 리소스를 변수에 저장
                }
            } finally {
                resource?.close() // 리소스가 획득된 경우 해제
            }
        }
    }
}
println(acquired) // 여전히 획득된 리소스 수 출력

 

위 예제는 항상 0을 출력합니다. 즉, 리소스가 누출되지 않습니다.


Composing suspending functions

코틀린에서는 일시 중단 함수를 구성하는 다양한 방법을 제공합니다. 이 섹션에서는 여러 가지 접근 방식을 다룹니다.

 

Sequential by Default (기본적인 순차적 실행)

아래와 같이 두 개의 일시 중단 함수를 정의한다고 가정합니다. 이 함수들은 원격 서비스 호출이나 계산을 수행한다고 가정하지만, 예제에서는 각각 1초씩 지연합니다.

suspend fun doSomethingUsefulOne(): Int {
    delay(1000L) // 여기서 유용한 작업을 수행한다고 가정
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L) // 여기서도 유용한 작업을 수행한다고 가정
    return 29
}

 

위 두 함수들을 순차적으로 호출하여 결과를 합산해야 한다면, 다음과 같이 할 수 있습니다.

val time = measureTimeMillis {
    val one = doSomethingUsefulOne()
    val two = doSomethingUsefulTwo()
    println("The answer is ${one + two}")
}
println("Completed in $time ms")

// 결과
The answer is 42
Completed in 2017 ms

 

Concurrent Using Async

두 함수 간에 의존성이 없다면, async 를 사용하여 두 작업을 동시에 수행할 수 있습니다. asyncDeferred 객체를 반환하여 나중에 결과를 받을 수 있습니다.

val time = measureTimeMillis {
    val one = async { doSomethingUsefulOne() }
    val two = async { doSomethingUsefulTwo() }
    println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")

// 결과
The answer is 42
Completed in 1017 ms

 

Lazily Started Async

async 를 지연 시작으로 설정할 수 있습니다. 이 모드에서는 결과가 필요할 때 await 가 호출되거나 Jobstart 함수가 호출될 때만 코루틴을 시작합니다.

val time = measureTimeMillis {
    val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
    val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
    // 일부 계산
    one.start() // 첫 번째 코루틴 시작
    two.start() // 두 번째 코루틴 시작
    println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")

// 결과
The answer is 42
Completed in 1017 ms

 

Async-style Functions

코틀린 코루틴에서 GlobalScope 를 사용하여 async 스타일 함수를 정의할 수 있습니다. 이러한 함수는 비동기 계산을 시작하고 결과를 받기 위해 Deferred 객체를 반환합니다. GlobalScope 를 사용할 때는 @OptIn(DelicateCoroutinesApi::class) 를 명시해야 합니다.

@OptIn(DelicateCoroutinesApi::class)
fun somethingUsefulOneAsync() = GlobalScope.async {
    doSomethingUsefulOne()
}

@OptIn(DelicateCoroutinesApi::class)
fun somethingUsefulTwoAsync() = GlobalScope.async {
    doSomethingUsefulTwo()
}

fun main() {
    val time = measureTimeMillis {
        val one = somethingUsefulOneAsync()
        val two = somethingUsefulTwoAsync()
        runBlocking {
            println("The answer is ${one.await() + two.await()}")
        }
    }
    println("Completed in $time ms")
}

 

Structured Concurrency with Async (구조적 동시성을 사용한 Async)

구조적 동시성을 유지하면서 async 를 사용하려면 coroutineScope 를 사용하여 코루틴 범위를 정의합니다.

suspend fun concurrentSum(): Int = coroutineScope {
    val one = async { doSomethingUsefulOne() }
    val two = async { doSomethingUsefulTwo() }
    one.await() + two.await()
}

val time = measureTimeMillis {
    println("The answer is ${concurrentSum()}")
}
println("Completed in $time ms")

// 결과
The answer is 42
Completed in 1017 ms

 

Cancellation Propagation

구조적 동시성에서는 하나의 자식 코루틴에서 오류가 발생하면 모든 자식 코루틴이 취소됩니다.

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
    try {
        failedConcurrentSum()
    } catch(e: ArithmeticException) {
        println("Computation failed with ArithmeticException")
    }
}

suspend fun failedConcurrentSum(): Int = coroutineScope {
    val one = async<Int> { 
        try {
            delay(Long.MAX_VALUE) // 매우 긴 계산을 에뮬레이트
            42
        } finally {
            println("First child was cancelled")
        }
    }
    val two = async<Int> { 
        println("Second child throws an exception")
        throw ArithmeticException()
    }
    one.await() + two.await()
}

// 결과
Second child throws an exception
First child was cancelled
Computation failed with ArithmeticException

Coroutine Context and Dispatchers

코루틴은 항상 CoroutineContext 타입의 값으로 표현되는 컨텍스트 내에서 실행됩니다. 코루틴 컨텍스트는 다양한 요소들의 집합입니다. 주요 요소는 코루틴의 JobDispatcher 입니다.

 

Dispatchers and Threads

CoroutineContext 에는 CoroutineDispatcher 가 포함되어 있으며, 이는 해당 코루틴이 어떤 스레드나 스레드 풀에서 실행될지를 결정합니다. 디스페처는 코루틴을 특정 스레드에 고정하거나, 스레드 풀에 디스패치하거나, 고정되지 않은 상태로 실행할 수 있습니다.

 

모든 코루틴 빌더(launch, async 등) 는 선택적으로 CoroutineContext 매개변수를 받아 새로운 코루틴의 디스패처 및 기타 컨텍스트 요소를 명시적으로 지정할 수 있습니다.

 

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
    launch { // 부모, 메인 runBlocking 코루틴의 컨텍스트
        println("main runBlocking      : I'm working in thread ${Thread.currentThread().name}")
    }
    launch(Dispatchers.Unconfined) { // 고정되지 않음 - 메인 스레드에서 실행
        println("Unconfined            : I'm working in thread ${Thread.currentThread().name}")
    }
    launch(Dispatchers.Default) { // DefaultDispatcher에 디스패치
        println("Default               : I'm working in thread ${Thread.currentThread().name}")
    }
    launch(newSingleThreadContext("MyOwnThread")) { // 새로운 스레드에서 실행
        println("newSingleThreadContext: I'm working in thread ${Thread.currentThread().name}")
    }
}

// 출력 결과 (순서는 다를 수 있음)
Unconfined            : I'm working in thread main
Default               : I'm working in thread DefaultDispatcher-worker-1
newSingleThreadContext: I'm working in thread MyOwnThread
main runBlocking      : I'm working in thread main

 

Unconfined VS Confined Dispatcher

Dispatchers.Unconfined 코루틴 디스패처는 호출자 스레드에서 코루틴을 시작하지만, 첫 번째 일시 중단 지점 이후에는 일시 중단 함수를 호출한 스레드에서 재개됩니다. Dispatchers.Unconfined 는 CPU 시간을 소비하지 않거나 특정 스레드에 고정되지 않는 데이터 (ex. UI) 를 업데이트하지 않는 코루틴에 적합합니다.

 

반면에 기본 디스패처는 외부 CoroutineScope 에서 상속됩니다. runBlocking 코루틴의 기본 디스패처는 호출자 스레드에 고정되어 있어, 해당 스레드에서 실행이 계속됩니다.

launch(Dispatchers.Unconfined) {
    println("Unconfined      : I'm working in thread ${Thread.currentThread().name}")
    delay(500)
    println("Unconfined      : After delay in thread ${Thread.currentThread().name}")
}
launch {
    println("main runBlocking: I'm working in thread ${Thread.currentThread().name}")
    delay(1000)
    println("main runBlocking: After delay in thread ${Thread.currentThread().name}")
}

// 출력 결과
Unconfined      : I'm working in thread main
main runBlocking: I'm working in thread main
Unconfined      : After delay in thread kotlinx.coroutines.DefaultExecutor
main runBlocking: After delay in thread main

 

Dispatcher.Unconfined 는 일반적인 코드에서는 사용하지 않는 것이 좋습니다.

 

Debugging Coroutines and Threads

코루틴은 한 스레드에서 일시 중단되고 다른 스레드에서 재개될 수 있습니다. 단일 스레드 디스패처를 사용하더라도, 어떤 스레드에서 무엇을 하고 있었는지를 파악하는 것은 어려울 수 있습니다. Intellij IDEA 의 코루틴 디버거를 사용하면 코루틴 디버깅이 쉬워집니다. 코루틴 디버거를 사용하면 각 코루틴의 상태, 지역 변수, 캡처된 변수 등을 확인할 수 있습니다.

 

또한, 로깅을 사용하여 디버깅할 수도 있습니다. -Dkotlinx.coroutines.debug JVM 옵션을 사용하여 다음 코드를 실행한 결과는 다음과 같습니다.

val a = async {
    log("I'm computing a piece of the answer")
    6
}
val b = async {
    log("I'm computing another piece of the answer")
    7
}
log("The answer is ${a.await() * b.await()}")

// 출력 결과
[main @coroutine#2] I'm computing a piece of the answer
[main @coroutine#3] I'm computing another piece of the answer
[main @coroutine#1] The answer is 42

 

Jumping Between Threads

다음 코드를 -Dkotlinx.coroutines.debug JVM 옵션을 사용하여 출력한 결과는 다음과 같습니다.

import kotlinx.coroutines.*

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun main() {
    newSingleThreadContext("Ctx1").use { ctx1 ->
        newSingleThreadContext("Ctx2").use { ctx2 ->
            runBlocking(ctx1) {
                log("Started in ctx1")
                withContext(ctx2) {
                    log("Working in ctx2")
                }
                log("Back to ctx1")
            }
        }
    }
}

// 출력 결과
[Ctx1 @coroutine#1] Started in ctx1
[Ctx2 @coroutine#1] Working in ctx2
[Ctx1 @coroutine#1] Back to ctx1

 

Job in the Context

코루틴의 Job 은 컨텍스트의 일부이며, coroutineContext[Job] 표현식을 사용하여 가져올 수 있습니다.

println("My job is ${coroutineContext[Job]}")

// 출력 결과
My job is "coroutine#1":BlockingCoroutine{Active}@6d311334

 

Children of a Coroutine

코루틴이 다른 코루틴의 CoroutineScope 에서 시작될 때, 그 컨텍스트를 상속받고, 새로운 코루틴의 Job 은 부모 코루틴의 Job 의 자식이 됩니다. 부모 코루틴이 취소되면 모든 자식 코루틴도 재귀적으로 취소됩니다. 하지만, 명시적으로 다른 스코프나 Job 객체를 지정하여 이 관계를 재정의할 수 있습니다.

val request = launch {
    launch(Job()) {
        println("job1: I run in my own Job and execute independently!")
        delay(1000)
        println("job1: I am not affected by cancellation of the request")
    }
    launch {
        delay(100)
        println("job2: I am a child of the request coroutine")
        delay(1000)
        println("job2: I will not execute this line if my parent request is cancelled")
    }
}
delay(500)
request.cancel()
println("main: Who has survived request cancellation?")
delay(1000)

// 출력 결과
job1: I run in my own Job and execute independently!
job2: I am a child of the request coroutine
main: Who has survived request cancellation?
job1: I am not affected by cancellation of the request

 

Parental Responsibilities

부모 코루틴은 항상 모든 자식이 완료될 때까지 대기합니다. 부모는 명시적으로 모든 자식을 추적할 필요가 없으며, Job.join 을 사용하여 대기할 필요도 없습니다.

val request = launch {
    repeat(3) { i ->
        launch {
            delay((i + 1) * 200L)
            println("Coroutine $i is done")
        }
    }
    println("request: I'm done and I don't explicitly join my children that are still active")
}
request.join()
println("Now processing of the request is complete")

// 출력 결과
request: I'm done and I don't explicitly join my children that are still active
Coroutine 0 is done
Coroutine 1 is done
Coroutine 2 is done
Now processing of the request is complete

 

Naming Coroutines for Debugging

자동 할당된 ID 는 자주 로그를 남길 때 유용하지만, 특정 요청을 처리하거나 특정 작업을 수행하는 코루틴에는 명시적으로 이름을 지정하는 것이 좋습니다.

log("Started main coroutine")
val v1 = async(CoroutineName("v1coroutine")) {
    delay(500)
    log("Computing v1")
    6
}
val v2 = async(CoroutineName("v2coroutine")) {
    delay(1000)
    log("Computing v2")
    7
}
log("The answer for v1 * v2 = ${v1.await() * v2.await()}")

 

이 코드는 -Dkotlinx.coroutines.debug JVM 옵션과 함께 실행하면 다음과 같은 출력을 생성합니다:

[main @main#1] Started main coroutine
[main @v1coroutine#2] Computing v1
[main @v2coroutine#3] Computing v2
[main @main#1] The answer for v1 * v2 = 42

 

 

Combining Context Elements

여러 컨텍스트 요소를 결합할 때는 + 연산자를 사용할 수 있습니다.

launch(Dispatchers.Default + CoroutineName("test")) {
    println("I'm working in thread ${Thread.currentThread().name}")
}

// -Dkotlinx.coroutines.debug JVM 옵션 출력 결과
I'm working in thread DefaultDispatcher-worker-1 @test#2

 

Coroutine Scope

애플리케이션이 생명 주기를 가진 객체를 포함하는 경우, CoroutineScope 인스턴스를 생성하여 생명 주기와 코루틴을 연결할 수 있습니다.

class Activity {
    private val mainScope = MainScope()

    fun destroy() {
        mainScope.cancel()
    }

    fun doSomething() {
        repeat(10) { i ->
            mainScope.launch {
                delay((i + 1) * 200L)
                println("Coroutine $i is done")
            }
        }
    }
}

fun main() {
    val activity = Activity()
    activity.doSomething()
    println("Launched coroutines")
    delay(500L)
    println("Destroying activity!")
    activity.destroy()
    delay(1000L)
}

// 출력 결과
Launched coroutines
Coroutine 0 is done
Coroutine 1 is done
Destroying activity!

 

Thread-Local Data

코루틴 간에 스레드 로컬 데이터를 전달하려면 asContextElement 확장 함수를 사용할 수 있습니다.

val threadLocal = ThreadLocal<String?>()

fun main() {
    threadLocal.set("main")
    println("Pre-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
    val job = launch(Dispatchers.Default + threadLocal.asContextElement(value = "launch")) {
        println("Launch start, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
        yield()
        println("After yield, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
    }
    job.join()
    println("Post-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
}

// -Dkotlinx.coroutines.debug JVM 옵션 출력 결과
Pre-main, current thread: Thread[main @coroutine#1,5,main], thread local value: 'main'
Launch start, current thread: Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main], thread local value: 'launch'
After yield, current thread: Thread[DefaultDispatcher-worker-2 @coroutine#2,5,main], thread local value: 'launch'
Post-main, current thread: Thread[main @coroutine#1,5,main], thread local value: 'main'

 

위 예제는 Dispatchers.Default 를 사용하여 백그라운드 스레드 풀에서 새로운 코루틴을 시작하지만, 지정한 스레드 로컬 변수의 값을 유지합니다.


내용이 너무 길어져서, 여기서 끊고 다음 글에서는 Flow 부터 시작하겠습니다.

반응형

'Kotlin' 카테고리의 다른 글

[Kotlin] Asynchronous Programming Techniques - 2  (0) 2024.08.07
[Kotlin] Types  (0) 2024.08.01
[Kotlin] Basic Syntax  (0) 2024.07.31
[Kotlin] 언어 소개  (0) 2024.07.21