코루틴 공식 가이드 자세히 읽기 — Part 9

Myungpyo Shim
32 min readDec 12, 2019

--

Asynchronous Flow

Korean [English]

(목차로 돌아가기)

우리는 어떤 연산을 수행한 후 한 개의 값을 반환하는 중단 함수를 정의하고 이를 비동기로 수행 할 수 있습니다. 하지만 어떤 연산 후 두 개 이상의 값을 반환하는 중단함수는 어떻게 만들 수 있을까요? 코틀린 플로우(Kotlin Flow)를 이용하면 이를 수행할 수 있습니다.

다수의 값 나타내기 (Representing multiple values)

다수의 값은 코틀린에서 컬렉션을 통해 나타낼 수 있습니다. 예를 들어, 우리는 세개의 수를 요소로 갖는 리스트를 반환하는 foo() 라는 함수를 만들고 forEach 함수를 이용하여 이 리스트의 모든 수를 출력할 수 있습니다.

출력 결과

1
2
3

시퀀스 (Sequences)

우리는 각각의 수에 CPU 연산이 요구되는 어떤 일련의 수들을 나타내고자 할 때 시퀀스를 이용할 수 있습니다 (아래 예에서는 각각의 연산에 100ms 의 시간이 소요된다고 가정).

이 코드의 수행 결과는 이전과 동일합니다.

중단 함수 (Suspending functions)

하지만 위 예제와 같은 코드 블록은 블록을 실행하고 있는 메인 스레드를 정지 시킵니다. 이러한 연산들이 비동기 코드에서 실행될 때 우리는 함수 foo() 에 suspend 키워드를 붙여 함수를 중단함수로 정의할 수 있습니다. 그리고 이 함수를 코루틴 스코프에서 호출 하여 호출 스레드의 정지 없이 실행할 수 있고 그 결과를 리스트로 반환하도록 만들 수 있습니다.

플로우(Flows)

List<Int> 를 함수의 반환 타입으로 사용한다는 것은 결국 우리가 모든 연산을 수행 한 후 한번에 모든 값을 반환해야 함을 의미합니다. 비동기로 처리 될 값 들의 스트림을 나타내기 위해서 우리는 앞서 살펴본 동기적으로 처리되는 Senquence<Int> 타입에서 했던 것과 같이 Flow<Int> 타입을 사용할 수 있습니다.

이 코드는 각각의 수를 출력하기 전에 메인 스레드를 정지하지 않고 100ms 를 기다렸다가 출력합니다. 이것은 메인스레드에서 수행되는 별도의 코루틴에서 매 100ms 마다 “I’m not blocked” 를 출력함으로써 확인 가능합니다.

출력 결과

I'm not blocked 1 
1
I'm not blocked 2
2
I'm not blocked 3
3

이전의 예제들과 비교하면 Flow 를 사용한 위 코드는 다음과 같은 차이점들을 갖습니다.

  • Flow 타입의 생성은flow {} 빌더를 이용함
  • flow { ... } 블록 안의 코드는 언제든 중단 가능
  • foo() 함수는 더이상 suspend 로 마킹 되지 않아도 됨
  • 결과 값들은 flow 에서 emit() 함수를 이용하여 방출 됨
  • flow 에서 방출된 값들은 collect 함수를 이용하여 수집 됨
foo 함수의 flow { ... } 코드 블록에서 delay() 함수를 Thread.sleep 으로 변경하면 메인 스레드가 정지되는 것을 확인할 수 있습니다.

Flows are cold

플로우는 시퀀스와 유사하게 콜드 스트림 입니다. 다시 말해 flow {} 빌더 내부의 코드 블록은 플로우가 수집(collect) 되기 전까지는 실행되지 않습니다. 이것은 다음 예제를 보면 명확히 알 수 있습니다.

출력 결과

Calling foo... 
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3

이것이 플로우를 반환하는foo() 함수가 suspend 로 표시되지 않는 핵심적인 이유입니다. foo() 함수는 호출 시 바로 반환되며 그 무엇도 기다리지 않습니다. 또한 플로우는 매번 수집(collect) 될 때마다 시작됩니다. 이러한 이유로 우리가 동일한 플로우에 대해서 매번 collect 를 다시 호출할 때 마다 출력결과에서 “Flow started” 를 보게 되는 것입니다.

플로우의 취소 (Flow Cancellation)

플로우는 코루틴의 일반적인 취소 매커니즘 준수하긴 하지만 플로우 인프라스트럭쳐 자체적으로 취소 지점을 제공하는 기능은 없습니다. 하지만 이것이 취소에 있어서 보다 명확함을 제공합니다.
일반적인 코루틴의 경우와 동일하게 플로우 컬렉션도 플로우가 취소 가능한 중단함수(ex> delay())에서 중단 되었을 때 취소 가능하며, 다른 경우는 그렇지 못합니다.

다음 예제는 플로우가withTimeoutOrNull 코드 블록에서 실행 중일 때 어떻게 취소되고 코드 실행을 멈추는지 보여줍니다.

출력 결과

Emitting 1 
1
Emitting 2
2
Done

플로우 빌더 (Flow builders)

이전 예제들에서 살펴본 flow { ... } 빌더는 가장 기본적인 것입니다. 코루틴 프레임워크에는 플로우 정의를 돕기위한 다양한 다른 빌더들이 존재합니다.

  • flowOf {} 빌더는 고정된 값들을 방출하는 플로우를 정의
  • 다양한 컬렉션들과 시퀀스들은 .asFlow() 확장 함수를 통해 플로우로 변환 가능

그러므로 플로우로 1에서 3까지 출력하는 예제는 다음과 같이 작성할 수 있습니다.

플로우 중간 연산자 (Intermediate flow operators)

플로우는 여러분이 컬렉션이나 시퀀스에서 경험한 것과 같이 연산자로 변환될 수 있습니다. 중간 연산자는 업스트림 플로우에 적용되어 다운스트림 플로우를 반환합니다. 이 연산자들은 플로우 자체가 그렇듯 콜드(cold) 타입으로 동작합니다. 이러한 연산자들의 호출은 그 자체로는 중단 함수가 아닙니다. 그러므로 새롭게 변형된 플로우를 즉시 반환합니다.

기본 연산자들은 map 이나 filter 와 같이 친숙한 이름들을 가지고 있습니다. 시퀀스와의 중요한 차이점은 이 연산자들로 수행되는 코드 블록에서 중단 함수들을 호출 할 수 있다는 점 입니다.

예를 들어, 요청된 플로우에 대해서 map 연산자를 이용하여 원하는 결과값으로 매핑할 수 있으며, 요청 작업이 긴 시간을 소모하는 중단 함수인 경우에도 성공적으로 동작합니다.

위 예제코드는 다음과 같은 결과를 출력하며, 각각의 라인은 매 초마다 출력됩니다.

response 1 
response 2
response 3

변환 연산자 (Transform operator)

플로우 변환 연산자들 중에서 가장 일반적인 것은 transform 연산자 입니다. 이 연산자는 map 이나 filter 같은 단순한 변환이나 혹은 복잡한 다른 변환들을 구현하기 위해 사용됩니다. transform 연산자를 사용하여 우리는 임의의 횟수로 임의의 값들을 방출할 수 있습니다.

예를 들어, transform 연산자를 사용하여 오래 걸리는 비동기 요청을 수행하기 전에 기본 문자열을 먼저 방출하고 요청에 대한 응답이 도착하면 그 결과를 방출할 수 있습니다.

출력 결과

Making request 1 
response 1
Making request 2
response 2
Making request 3
response 3

크기 제한 연산자 (Size-limiting operators)

take같은 크기 제한 중간 연산자는 정의된 제한치에 도달하면 실행을 취소합니다. 코루틴에서 취소는 언제나 예외를 발생시키는 방식으로 수행 되며, 이를 통해 try { ... } finally { ... } 같은 자원 관리형 함수들이 정상적으로 동작할 수 있게 합니다.

이 코드의 출력 결과는 numbers함수의 flow { ... } 코드 블록의 실행이 두번째 수를 방출하고 멈추었음을 명확히 보여줍니다.

출력 결과

1 
2
Finally in numbers

플로우 종단 연산자 (Terminal flow operators)

플로우의 종단 연산자는 플로우 수집을 시작하는 중단 함수입니다. collect 연산자가 가장 대표적인 것이며, 다음과 같이 수집을 용이하게 해주기 위한 다른 종단 연산자들도 존재합니다.

  • toListtoSet 같은 다양한 컬렉션으로의 변환
  • 첫번째 값만 방출하며 플로우는 단일 값 만 방출함을 보장
  • 플로우를 reducefold 를 이용하여 값으로 변환

예를 들어,

위의 예제의 경우 55를 출력합니다.

플로우는 순차적이다 (Flows are sequential)

어떤 플로우의 독립된 각각의 수집은 다중 플로우가 사용되는 특별한 연산자가 사용되지 않는 이상 순차적으로 수행됩니다. 수집은 종단 연산자를 호출한 코루틴에서 직접 수행되며 기본적으로 새로운 코루틴을 생성하지 않습니다. 각각의 방출된 값은 업스트림의 모든 중간 연산자들에 의해 처리되어 다운스트림으로 전달되며 마지막으로 종단 연산자로 전달됩니다.

짝수를 찾아(filter) 문자열로 변환하는 다음 예를 살펴봅시다.

출력 결과

Filter 1 
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5

플로우 컨텍스트 (Flow context)

플로우의 수집은 항상 호출한 코루틴의 컨텍스트 안에서 수행됩니다. 예를 들어 foo 라는 플로우가 있을 때 다음과 같은 코드는 foo플로우의 구현 내용과는 별개로 이 코드의 작성자가 명시한 컨텍스트 상에서 수행됩니다.

이러한 플로우의 특성은 컨텍스트 보존(context preservation) 이라 불립니다.

그러므로 기본적으로 flow { ... } 빌더에 제공된 코드 블록은 플로우 수집 연산을 실행한 코루틴의 컨텍스트에서 수행됩니다. 예를들어 호출 스레드를 출력하며 3개의 수를 방출하는 foo() 라는 함수를 생각해 봅시다.

출력 결과

[main @coroutine#1] Started foo flow 
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3

foo().collect() 가 메인 스레드에서 호출 되었기 때문에 foo 플로우의 코드 블록 또한 메인 스레드에서 호출 되었습니다. 이것이 빠른 실행을 보장하고, 호출자를 블록하지 않고 실행 컨텍스트에 관계 없이 비동기 작업을 수행하는 최선의 방법입니다.

withContext 를 통한 잘못 된 방출 (Wrong emission withContext)

하지만 오랫동안 수행되는 CPU 소모적인 작업들은 Dispatchers.Default 와 같이 별도의 스레드에서 수행될 필요가 있고, UI를 업데이트 하는 코드는 Dispatchers.Main 과 같은 UI를 위한 전용 스레드에서 수행될 필요가 있습니다. 보통 withContext 는 코틀린 코루틴을 사용하는 코드에서 컨텍스트를 전환하기 위해서 사용됩니다. 하지만 flow { ... } 빌더 내부의 코드는 컨텍스트 보존 특성을 지켜야하기 때문에 다른 컨텍스트에서 값을 방출하는 것이 허용되지 않습니다.

다음 코드를 수행하면,

다음과 같은 예외가 발생합니다.

Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated: flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@428e044, BlockingEventLoop@79f430ab], but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@4a536f6b, DefaultDispatcher]. Please refer to 'flow' documentation or use 'flowOn' instead
at kotlinx.coroutines.flow.internal.SafeCollector.checkContext (SafeCollector.kt:93)
at kotlinx.coroutines.flow.internal.SafeCollector.emit (SafeCollector.kt:30)
at FileKt$foo$1$1.invokeSuspend (File.kt:9)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith (ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run (Dispatched.kt:241)
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely (CoroutineScheduler.kt:594)
at kotlinx.coroutines.scheduling.CoroutineScheduler.access$runSafely (CoroutineScheduler.kt:60)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run (CoroutineScheduler.kt:740)위 예제에서 우리는 이 예외를 확인해보기 위해서 kotlinx.coroutines.withContext 와 같이 Fully qualified name 을 사용했습니다. withContext 와 같이 짧은 이름은 우리가 이러한 문제를 겪지 않도록 컴파일 오류를 내줄 다른 함수에 연결됩니다.

flowOn 연산자 (flowOn operator)

위 예외의 스택 트레이스를 보면 플로우 실행 컨텍스트를 변경할 수 있는 flowOn 연산자에 대해서 언급하고 있습니다. 다음의 예는 플로우에서 컨텍스트를 변경하기 위한 올바른 방법을 보여줍니다. 그리고 flowOn 연산자를 이용하는 다음 예제는 현재 스레드 이름을 출력해서 정상적으로 컨텍스트 전환이 동작하는지 보여 줍니다.

출력 결과

[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1 
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3

메인 스레드에서 수집 연산을 수행할 때 flow { ... } 가 백그라운드 스레드에서 동작하는 모습을 확인해 봅시다.

여기서 한가지 더 확인해 보아야 할 것은 flowOn 연산자가 플로우의 기본적인 특성인 순차성을 일부 포기했다는 점 입니다. 위 예제에서 값 들의 수집이 특정 코루틴(“coroutine#1”) 에서 발생하고 방출은 다른 스레드에서 실행되는 특정 코루틴(“coroutine#2”)에서 발생합니다. flowOn 연산자는 컨텍스트 내에서 CoroutineDispatcher 를 변경해야 할 경우 업스트림 플로우를 위한 다른 코루틴을 생성합니다.

버퍼링 (Buffering)

플로우의 로직을 다른 코루틴에서 수행하는 것은 해당 플로우를 수집하는데 걸리는 전체 시간의 관점으로 보면 도움이 될 수 있습니다. 특히, 오래걸리는 비동기 연산이 관련되어 있다면 더욱 그렇습니다.
예를 들어, foo() 에 의한 방출된 플로우가 느릴 경우를 고려해 봅시다. 이것은 값을 방출하기까지 100ms 가 걸립니다. 그리고 수집하는 쪽도 느립니다. 방출 된 값을 처리하는데 300 ms 가 걸립니다. 이 경우 세개의 수를 처리하는데 얼마나 걸리는지 확인해 봅시다.

위 예제는 다음과 같은 출력 결과를 보이며, 전체 수집 시간은 1200ms 가 걸렸습니다 (세 개 숫자 각각 400ms).

1 
2
3
Collected in 1220 ms

우리는 플로우에 buffer 연산자를 사용함으로써 foo() 의 방출 코드가 수집 코드와 동시에 수행되도록 만들 수 있습니다.

위 예제는 동일한 수들을 더 빠르게 처리합니다. 이것은 첫번째 수를 위해서만 100ms 를 기다리고 각각의 수 처리를 위해서 300ms 씩만 기다리도록 프로세싱 파이프라인을 효율화 함으로써 가능했습니다.

1 
2
3
Collected in 1071 ms
flowOn 연산자가 CoroutineDispatcher 를 변경할 경우 동일한 버퍼링 매커니즘을 사용함을 알아둡시다. 여기서 우린 buffer 연산자를 사용함으로써 실행 컨텍스트의 전환 없이 버퍼링을 수행했습니다.

병합 (conflation)

어떤 플로우가 연산의 일부분이나 연산 상태의 업데이트를 방출하는 경우 방출되는 각각의 값을 처리하는 것은 불필요 하며, 대신에 최신의 값 만을 처리하는 것이 필요할 것입니다. 이 경우, conflate 연산자를 사용하여 수집기의 처리가 너무 느릴 경우 방출 된 중간 값들을 스킵 할 수 있습니다.

코드를 보면 처음 수가 여전히 처리 중인데 두 번째와 세 번째 수가 방출 됨을 확인할 수 있습니다. 그래서 두 번째 수는 스킵되고 가장 최근 값인 세 번째 값이 수집기로 전달 됩니다.

1 
3
Collected in 758 ms

최신 값 처리 (Processing the latest value)

값의 병합 (Conflation) 은 방출과 수집이 모두 느릴 경우 처리 속도를 높이는 방법 중 하나 입니다. 이것은 중간 값들을 삭제 함으로써 이를 수행합니다.

또 다른 방법은 새로운 값이 방출될 때 마다 느린 수집기를 취소하고 재시작하는 것입니다. 이를 위해서 xxx 연산자들을 위해 연산자마다 xxxLatest 연산자가 존재하며, 이 연산자들은 새로운 값이 방출되면 그들의 코드블록을 취소합니다. 이전 예제에서conflatecollectLatest 로 변경해 봅시다.

collectLatest 의 코드 블럭이 300ms를 소모하고 새로운 값들은 매 100ms 마다 방출되기 때문에 우리는 이 블럭이 매 값에 대해서 실행 됨을 확인할 수 있으며, 마지막 값에 대해서만 끝까지 수행 됨을 확인할 수 있습니다.

Collecting 1 
Collecting 2
Collecting 3
Done
3 Collected in 741 ms

다중 플로우 합성 (Composing multiple flows)

다중 플로우를 합성하는 방법은 여러가지가 있습니다.

Zip

코틀린 표준 라이브러리의 Sequence.zip 확장 함수와 동일하게, 플로우에도 두개의 플로우들의 값들을 병합하는 zip 연산자가 있습니다.

위 예제의 출력결과는 다음과 같습니다.

1 -> one 
2 -> two
3 -> three

Combine

어떤 플로우가 어떤 연산이나 상태의 최근 값을 나타낼 때, 우리는 그 플로우의 최근 값에 추가 연산을 수행하거나 또는 별도의 업스트림 플로우가 값을 방출할 때마다 다시 그 추가 연산을 수행해야 할 수 있습니다. 이와 관련된 연산자들을 combine 이라 부릅니다.

예를 들어, 이전 예제에서의 수들을 300ms 마다 업데이트 하고, 문자열들은 400ms 마다 업데이트 되도록 변경한 후 zip 연산자를 이용하여 이들을 하나의 결과로 만들면 여전히 동일한 결과를 400ms 마다 출력합니다
(가장 느린 Flow 에 맞추어서 수행됨).

우리는 이 예제에서 onEach 중간 연산자를 사용하여 각각의 항목들을 지연시킴으로써 방출 코드를 더 가독성 있고 간결하게 만들 것 입니다.

이 예제에서 우리가 zip 연산자 대신에 다음과 같이 combine 연산자를 사용한다면,

우리는 zip 연산자를 사용했을 때와는 상당히 달라진 다음과 같은 결과를 볼 수 있습니다. 결과를 보면 numsstrs 플로우로부터 방출이 일어날 때마다 다른 플로우의 최신값을 가지고 병합하여 출력이 되었습니다.

1 -> one at 452 ms from start 
2 -> one at 651 ms from start
2 -> two at 854 ms from start
3 -> two at 952 ms from start
3 -> three at 1256 ms from start

플로우 플래트닝 (Flattening flows)

플로우는 비동기로 수신되는 값 들의 시퀀스를 나타냅니다. 그러므로 어떤 플로우에서 수신되는 일련의 값 들이 다른 값 들의 시퀀스를 요청하는 플로우가 되는 일은 자주 발생합니다.
예를 들어, 우리는 다음과 같이 500ms 간격으로 두개의 문자열을 방출하는 플로우를 정의 할 수 있습니다.

이제 우리는 세개의 정수를 방출하는 플로우를 가지고 각각의 정수가 방출될 때마다 다음과 같이 requestFlow 를 호출합니다.

우리는 위 작업의 결과로 수신한 값 들에 추가 처리를 위해서 플래트닝이 필요한 플로우들의 플로우를 얻게 됩니다 (Flow<Flow<String>>). 컬렉션들과 스퀀스들은 이를 위해flattenflatMap 연산자들을 가지고 있습니다. 하지만 플로우는 그 자체의 비동기 특성으로 인해서 다른 모드의 플래트닝이 필요하며 플로우를 위한 플래트닝 연산자들이 별도로 정의 되어 있습니다.

flatMapConcat

연결(Concatenating) 모드는 flatMapConcatflattenConcat 연산자들에 의해 구현됩니다. 이 연산자들은 시퀀스에 정의된 비슷한 유형의 연산자들과 가장 유사하게 동작하는 연산자들 입니다. 이 연산자들은 다음 예제가 보여주듯이 다음 플로우의 수집을 시작하기 전에 현재 플로우가 완료될 때까지 기다립니다.

flatMapConcat 연산자의 순차적 특성은 출력 결과를 통해 확실히 확인할 수 있습니다.

1: First at 121 ms from start 
1: Second at 622 ms from start
2: First at 727 ms from start
2: Second at 1227 ms from start
3: First at 1328 ms from start
3: Second at 1829 ms from start

flatMapMerge

다른 플래트닝 모드로는 모든 들어오는 플로우들을 동시에 수집하고 그 값들을 단일 플로우로 합쳐서 값들이 가능한 빨리 방출되도록 하는 모드가 있습니다. 이것은 flatMapMergeflattenMerge 연산자들로 구현됩니다. 이 두 연산자는 모두 옵셔널하게 concurrency 파라미터를 지원하며 이를 통해 동시에 수집 가능한 플로우의 개수를 제한할 수 있습니다. (기본 모드는 DEFAULT_CONCURRENCY 입니다.)

flatMapMerge 의 동시성 특성은 출력 결과에 명백히 드러납니다.

1: First at 136 ms from start 
2: First at 231 ms from start
3: First at 333 ms from start
1: Second at 639 ms from start
2: Second at 732 ms from start
3: Second at 833 ms from start
위 출력 결과를 통해 flatMapMerge 가 코드 블록(예제에서는 { requestFlow(it) })은 순차적으로 호출하지만 그 결과 플로우들은 동시에 수집한다는 것을 확인할 수 있습니다. 이것은 순차적으로 map { requestFlow(it) } 를 호출하고, 그 결과에 flattenMerge 를 호출하는 것과 동일합니다.

flatMapLatest

Processing the latest value 섹션에서 살펴 본collectLatest 연산자와 유사한 방식으로 플래트닝 모드가 정의 된 연산자가 있습니다. 이것은 flatMapLatest 연산자로 구현되어 있으며 새로운 플로우가 방출될 때마다 직전 플로우를 취소시킵니다.

이 예제의 다음과 같은 출력 결과를 통해 flatMapLatest 가 동작하는 원리를 이해할 수 있습니다.

1: First at 142 ms from start 
2: First at 322 ms from start
3: First at 425 ms from start
3: Second at 931 ms from start

flatMapLatest 는 새 값이 방출되면 그 실행 블록(이 예제에서는 { requestFlow(it) }) 전체를 취소합니다. 이번 예제 에서는 별 차이를 확인할 수 없는데 그 이유는 requestFlow 호출 그 자체는 빠르며, 중단되지 않고, 취소될 수도 없기 때문입니다. 만약 우리가 이 부분에 delay 와 같은 중단 함수를 사용한다면 flatMapLatest 의 특성을 더 명확히 확인해 볼 수 있습니다.

플로우 예외 (Flow exceptions)

플로우 수집은 방출 로직이나 연산자 안의 코드가 예외를 발생시키면 예외 발생 상태로 종료될 수 있습니다. 이러한 예외들을 다루기 위한 다양한 방법이 있습니다.

수집기의 try & catch

수집기(collector)는 예외를 다루기 위해 코틀린의 try/catch 블록을 사용할 수 있습니다.

위 코드는 다음의 출력결과에서 볼 수 있듯이 collect 종단 연산자에서 성공적으로 예외를 잡아내며 그 후에는 어떠한 값도 방출되지 않습니다.

Emitting 1 
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2

모든 예외 처리(Everything is caught)

위 예제는 사실 방출 로직이나 중간 혹은 종단 연산자 등에서 발생하는 모든 예외를 잡아냅니다. 예를들어 방출된 수를 문자열로 변환 하도록 다음과 같이 코드를 변경하고, 해당 변환 코드에서 예외를 발생 시키도록 해 봅시다.

이 예외 또한 동일하게 처리되며 수집이 중단 됩니다.

Emitting 1 
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2

예외 투명성 (Exception transparency)

그렇다면 방출 코드의 예외 처리 로직은 어떻게 캡슐화 할 수 있을까요?

플로우는 예외에 있어서 반드시 투명해야 합니다. flow { ... } 빌더 코드 블록 안에서 try/catch 블록으로 예외를 처리한 후 값을 방출하는 것은 예외 투명성을 위반하는 것 입니다. 우리는 예외 투명성을 지킴으로써 이전 예제에서 처럼 예외가 발생하는 수집기는 try/catch 를 이용하여 그 예외를 잡아 처리 할 수 있게됩니다.

방출 로직은 이러한 예외 투명성을 보존하기 위해서 catch 연산자를 사용할 수 있으며 이를 통해 그 예외 처리 로직의 캡슐화가 가능합니다. catch 연산자의 구현 블록은 예외를 분석하고 발생한 예외의 타입에 따라 각기 다른 대응이 가능합니다.

  • throw 연산자를 통한 예외 다시 던지기
  • catch 로직에서 emit 을 사용하여 값 타입으로 방출
  • 다른 코드를 통한 예외 무시, 로깅, 기타 처리

예를들어 예외 발생 시 문자열을 방출하도록 해 봅시다.

이 예제의 출력 결과는 우리가 이전 예제처럼 코드 블록을 try/catch 로 감싸지 않았지만 동일합니다.

Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2

catch 예외 투명성 (Transparent catch)

예외 투명성을 지키는 catch 중간 연산자는 오직 업 스트림에서 발생하는 예외(catch 연산자 위의 모든 연산자)들에 대해서만 동작하며 다운 스트림에서 발생한 예외에 대해서는 처리하지 않습니다.

Emitting 1 1 Emitting 2 Exception in thread "main" java.lang.IllegalStateException: Collected 2
at FileKt$main$1$invokeSuspend$$inlined$collect$1.emit (Collect.kt:136)
at kotlinx.coroutines.flow.FlowKt__ErrorsKt$catchImpl$$inlined$collect$1.emit (Collect.kt:137)
at kotlinx.coroutines.flow.internal.SafeCollector.emit (SafeCollector.kt:33)
at FileKt$foo$1.invokeSuspend (File.kt:7)
at FileKt$foo$1.invoke (File.kt:-1)
at kotlinx.coroutines.flow.SafeFlow.collect (Builders.kt:53)
at kotlinx.coroutines.flow.FlowKt__ErrorsKt.catchImpl (Errors.kt:232)
at kotlinx.coroutines.flow.FlowKt.catchImpl (:1)
at kotlinx.coroutines.flow.FlowKt__ErrorsKt$catch$$inlined$unsafeFlow$1.collect (SafeCollector.kt:121)
at FileKt$main$1.invokeSuspend (File.kt:19)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith (ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run (Dispatched.kt:241)
at kotlinx.coroutines.EventLoopImplBase.processNextEvent (EventLoop.common.kt:270)
at kotlinx.coroutines.BlockingCoroutine.joinBlocking (Builders.kt:79)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking (Builders.kt:54)
at kotlinx.coroutines.BuildersKt.runBlocking (:1)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default (Builders.kt:36)
at kotlinx.coroutines.BuildersKt.runBlocking$default (:1)
at FileKt.main (File.kt:11)
at FileKt.main (File.kt:-1)

출력 결과를 보면 catch 연산자가 있음에도 “Caught … “ 메시지가 출력되지 않았음을 알 수 있습니다.

플로우 종료 (Flow completion)

플로우의 수집이 종료(정상 종료 혹은 예외 발생)되면 그 이후 동작을 수행해야 할 수 있습니다. 이미 눈치 채셨겠지만 이는 두 가지 방식으로 가능합니다.
: Imperative / Declarative

Imperative finally block

수집 시에 try/catch 에 추가적으로 수집 종료 시 실행할 코드를 finally 블록을 통해 정의할 수 있습니다.

출력 결과

1 
2
3
Done

Declarative handling

선언적인 접근으로는 플로우에 onCompletion 중간 연산자를 추가해서 플로우가 완전히 수집되었을 때 실행 될 로직을 정의할 수 있습니다.

이전 예제는 onCompletion 연산자를 이용하여 다음과 같이 다시 작성할 수 있고 이는 동일한 출력 결과를 보입니다.

onCompletion 을 사용 함으로써 얻을 수 있는 최대 이점은 람다에 nullable 로 정의되는 Throwable 파라미터를 통해 플로우 수집이 성공적으로 종료되었는지 혹은 예외가 발생 되었는지 알 수 있다는 것입니다. 다음 예제는 foo() 플로우가 숫자 1을 방출한 후 예외를 던집니다.

출력 결과는 예상한 것과 동일합니다.

1 
Flow completed exceptionally
Caught exception

onCompletion 연산자는 catch 와는 달리 예외를 처리하지는 않습니다. 위 예제 코드에서 볼 수 있듯이 예외는 여전히 다운 스트림으로 전달 됩니다. 결국 예외는 onCompletion 연산자를 거쳐 catch 연산자로 처리됩니다.

업 스트림 예외에 국한됨 (Upstream exceptions only)

catch 연산자와 동일하게 onCompletion 연산자도 업 스트림에서 전달되는 예외만 식별하고 처리할 수 있으며 다운 스트림의 예외는 알지 못합니다. 다음 코드를 실행해 봅시다.

출력 결과

1 Flow completed with null Exception in thread "main" java.lang.IllegalStateException: Collected 2
at FileKt$main$1$invokeSuspend$$inlined$collect$1.emit (Collect.kt:136)
at kotlinx.coroutines.flow.FlowKt__ErrorsKt$catchImpl$$inlined$collect$1.emit (Collect.kt:137)
at kotlinx.coroutines.flow.FlowKt__BuildersKt$asFlow$$inlined$unsafeFlow$9.collect (SafeCollector.kt:123)
at kotlinx.coroutines.flow.FlowKt__ErrorsKt.catchImpl (Errors.kt:232)
at kotlinx.coroutines.flow.FlowKt.catchImpl (:1)
at kotlinx.coroutines.flow.FlowKt__EmittersKt$onCompletion$$inlined$unsafeFlow$1.collect (SafeCollector.kt:123)
at FileKt$main$1.invokeSuspend (File.kt:14)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith (ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run (Dispatched.kt:241)
at kotlinx.coroutines.EventLoopImplBase.processNextEvent (EventLoop.common.kt:270)
at kotlinx.coroutines.BlockingCoroutine.joinBlocking (Builders.kt:79)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking (Builders.kt:54)
at kotlinx.coroutines.BuildersKt.runBlocking (:1)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default (Builders.kt:36)
at kotlinx.coroutines.BuildersKt.runBlocking$default (:1)
at FileKt.main (File.kt:6)
at FileKt.main (File.kt:-1)

우리는 이 결과를 통해 completion 의 cause 가 null 이지만 수집은 예외로 인해 실패 했음을 알 수 있습니다.

Imperative versus declarative

이제 우리는 플로우를 수집하는 방법을 알고 그 종료와 예외를 명령적(Imperative) 혹은 선언적(declarative) 방식으로 다룰 수 있습니다. 여기서 나올 수 있는 질문 중 하나는 “어떠한 방식이 더 선호되며 그 이유는 무엇일까?” 입니다. 라이브러리로써 우리는 특정 접근이 더 낫다고 말할 수 없으며 두 가지 방식 모두 유효하기 때문에 여러분이 선호하는 코딩 스타일에 따라 선택해도 좋다고 생각합니다.

플로우 실행 (Launching flow)

어떤 소스로부터 발생하는 비동기 이벤트는 플로우를 통해 쉽게 표현할 수 있습니다. 이러한 경우를 위해서 우리는 일반적으로 들어오는 이벤트들에 대응하는 처리 코드를 addEventListener 를 통해 등록하고 이후 필요한 일을 진행해 가는 방식을 사용하곤 합니다.

플로우 에서는onEach 연산자가 이 역할을 담당 합니다. 하지만 onEach 는 중간 연산자입니다. 우리는 플로우 수집을 시작 시키기 위해서 종단 연산자가 필요합니다. 그렇지 않고 단지 onEach 를 호출하는 것으로는 아무런 효과가 없습니다.

만일 우리가 onEach 연산자 이후에 collect 종단 연산자를 사용하면 그 이후 코드는 플로우가 수집될 때까지 대기하게 될 것입니다.

출력 결과

Event: 1 
Event: 2
Event: 3
Done

launchIn 종단 연산자가 이부분에서 유용하게 사용될 수 있습니다. collect 연산자를 launchIn 으로 변경함으로써 우리는 플로우의 수집을 다른 코루틴에서 수행할 수 있으며 이를 통해 이후에 작성된 코드들이 곧바로 실행되도록 할 수 있습니다.

출력 결과

Done Event: 1 Event: 2 Event: 3

launchIn 연산자에 꼭 필요한 파라미터는 플로우를 수집할 코루틴의 CoroutineScope 입니다. 위 예제에서 이 스코프는 runBlocking 코루틴 빌더로 부터 전달 되었습니다. 그로인해 플로우가 실행되는 동안 runBlocking 스코프는 그 자식 코루틴의 종료를 기다리게 되고 결국 메인 함수가 반환되어 프로그램이 종료되는 것을 방지합니다.

실제 애플리케이션에서 스코프는 제한된 생명 주기를 갖는 엔터티로부터 전달 될 수 있습니다. 이 엔터티의 생명 주기가 종료되면 그에 속한 스코프는 취소되며 이 스코프에 속한 플로우 또한 수집을 취소하게 됩니다. 이러한 방식으로 onEach { ... }.launchIn(scope)addEventListener 와 동일하게 수행 됩니다. 하지만 여기에는 차이점이 존재하는데 그것은 더이상 removeEventListener 가 필요하지 않다는 것 입니다. 취소와 구조화된 동시성이 이것을 대신 수행해 주기 때문입니다.

launchIn 도 역시 Job 을 반환한다는 것을 기억하세요. 우리는 이를 통해 전체 스코프를 취소하거나 특정 Job 에 대한 조인 구문을 사용 할 필요 없이 그에 속한 플로우 수집 코루틴을 취소할 수 있습니다.

플로우와 리액티브 스트림 (Flow and Reactive Streams)

Reactive Stream 이나 Reactor 나 RxJava 같은 Reactive Framework 에 친숙한 분들은 아마 플로우의 디자인이 아주 친숙해 보일 것 입니다.

사실 플로우의 디자인은 리액티브 스트림과 그 안의 다양한 구현에 영향을 받았습니다. 하지만 플로우의 주요 목표는 구조화된 동시성을 따르며 코틀린과 중단함수를 이용하여 가능한 한 단순화 된 디자인을 갖는 것입니다. 이 목표를 달성하는 것은 리액티브 개척자과 그들의 방대한 성과 없이는 불가능 했을 것입니다. 여러분은 그에 대한 전체 이야기를 여기에서 읽어볼 수 있습니다.

개념적으로 다르긴 하지만 플로우는 리액티브 스트림이며 여러분은 플로우를 리액티브 퍼블리셔로 변환하거나 그 반대로 변환할 수 있습니다. 그러한 변환기는 kotlinx.coroutines 에 의해 제공되며 그에 대응하는 리액티브 모듈에서 찾아볼 수 있습니다.

  • kotlin-coroutines-reactive : Reactive Streams
  • kotlin-coroutines-reactor : Project Reactor
  • kotlin-coroutines-rx2 : RxJava2

통합 모듈은 플로우와의 상호 변환과 리액터 컨텍스트와의 통합, 다양한 리액티브 엔터티들과 중단 함수 친화적인 방법들을 포함합니다.

--

--