코틀린 플로우 내부 살펴보기 2

Coroutine flow under the hood (hot flow)

Myungpyo Shim
12 min readJul 17, 2021
unsplash.com

Korean [English]

(목차로 돌아가기)

지난 포스팅(코틀린 플로우 내부 살펴보기 1)에서 코틀린 플로우 유형 중에서 기본 유형인 ❄️콜드❄️ 플로우의 내부를 살펴보았습니다. 콜드 플로우는 수집(collect)되기 전에는 스트림 데이터가 발생하지 않고, 매 구독마다 새로운 스트림이 생성/시작되어 데이터가 수집하는 쪽(Collector)으로 전달 됩니다.

이번 포스팅에서는 🔥핫 플로우🔥인 StateFlowSharedFlow 의 내부를 살펴보겠습니다.

StateFlow 와 SharedFlow 의 사용을 위한 팁들은 다음을 참고하세요.

지난 포스팅에서 사용한 아래와 같은 다이어그램에서 오른쪽에 불길이 치솟고 있는 클래스 들이 핫플로우, 그 중에서도 오늘의 주인공 SharedFlow 와 StateFlow 구현체들 입니다. (얼음으로 표시 된 콜드 플로우는 지난 포스팅에서 알아보았습니다.)

우선 당연히 콜드 플로우, 핫 플로우 모두 제일 위에 보이는 Flow 인터페이스를 구현하고 있습니다. 하지만 콜드 플로우 구현체가 CancellableFlow 인터페이스만 구현하는 것과 달리 핫 플로우 구현체는 FusibleFlow(??) 도 추가로 구현하고 있습니다.

FusibleFlow

FusibleFlow 란 무엇일까요? 다음은 FusibleFlow 인터페이스 코드 입니다.

FusibleFlow 라는 인터페이스는 fuse() 라는 함수를 정의하네요. Fuse 는 “융합, 결합하다”라는 뜻이 있습니다. 그럼 여기서는 무엇을 결합한다는 것 일까요? 우리는 지금 플로우(스트림) 에 대한 이야기를 하고 있으니 플로우의 결합일 것이라고 예상할 수 있겠죠? 😃

스트림과 스트림에 적용되는 다양한 연산자를 다루는 경우 Stream Fusion 이라는 기법이 있습니다. 스트림을 다룰 때 최 상위 스트림(Root) 에서부터 최 하위 스트림(Leaf) 사이에 많은 연산자(operator)들이 적용될 수 있는데, 이러한 연산자들이 적용되면서 불필요하게 동일한 데이터 모델을 생성하거나 또는 불필요한 스레드 전환 등이 일어나기도 합니다. 그래서 이러한 스트림 연산을 최적화하는 기법으로 Stream Fusion 이라는 것이 등장하게 되었습니다.

fuse 함수의 주석에는 이 함수는 flowOn, buffer 같은 스트림 오퍼레이터들에 의해서 호출 된다고 쓰여있습니다. 정말 그런지 flowOn 오퍼레이터 코드를 잠시 살펴볼까요?

flowOn 이 적용되는 플로우가 FusibleFlow 일 경우 정말 fuse 를 사용하고 있네요! 지금 우리가 알아 볼 SharedFlow, StateFlow 도 FusibleFlow 였으므로 flowOn 이 호출되면 fuse 가 사용되겠네요!

else 부분의 ChannelFlowOperatorImpl 을 생성하는 부분은 콜드 플로우일 경우 사용됩니다.

다시 말하면, 일반적인 Flow 에 flowOn 이 사용되면 뭔가가 추가로 생성되는 것이고, FusibleFlow 인 경우에는 최적화를 시킬 수 있다는 이야기가 되네요.

콜드 플로우에 대한 포스팅에서 flowOn 연산자 적용 시 내부적으로 채널 버퍼를 생성하여 컨텍스트(디스패칭 스레드) 전환을 지원 한다는 것을 알아보았습니다.

그러면 앞서 보여드린 다이어그램에서 FusibleFlow 구현체 중 하나인 SharedFlow, 정확히는 SharedFlowImpl 이 어떻게 fuse 를 구현하고 있는지 살펴봅시다.

우리가 SharedFlow 에 flowOn 을 호출하면 위 함수가 불리게 되고 기본적으로는 다음과 같은 파라미터로 전달됩니다.

  • context = flowOn(context) 에 전달 된 컨텍스트 또는 EmptyCoroutineContext
  • capacity = Channel.OPTIONAL_CHANNEL
  • onBufferOverflow = BufferOverflow.SUSPEND

6 번 라인을 보면 수용량(capicity)이 RENDEZVOUS 또는 OPTIONAL_CHANNEL 이고 onBufferOverflow 가 BufferOverflow.SUSPEND 이면 현재 Flow 를 그대로 반환하도록 되어 있습니다. 이것은 위 조건일 경우 이 플로우에 부가적인 버퍼가 필요하지 않기 때문에 채널 생성이 불필요하기 때문 입니다.

- RENDEZVOUS 는 만남이란 뜻을 갖고 있는데 Provider 와 Consumer 가 만나야 데이터 교환이 이루어지는 옵션 입니다. 예를들어 Provider 가 데이터를 전달(emit) 하는데 Consumer 가 없다면 전달을 대기하게되고, 반대로 Consumer 가 데이터를 수신하려는데 Provider 가 없다면 역시 대기하게 됩니다.
- OPTIONAL_CHANNEL 은 Channel 생성시에는 사용되지 않으며 내부적으로 flowOn 등의 연산자 구현에서 사용되며, 필요한 경우 채널을 생성할 수 있지만 필수는 아님을 의미합니다.
- BufferOverflow.SUSPEND 는 버퍼가 가득찬 상태로 Provider 가 추가 데이터를 전달하려는 경우 (ex> capacity = 5 인데 5개의 데이터가 전달되고 아직 수신되지 않았으며 6번째 데이터를 전달하는 상황)에 Provider 는 버퍼에 여유공간이 생길때 까지 대기(Suspend)하게 됩니다.

이렇듯 FusibleFlow 는 추가 버퍼가 필요한 오퍼레이터에 대해서 조건이 맞는다면 추가 버퍼를 생성하지 않음으로써 불필요한 리소스 낭비를 줄여줍니다. 다이어그램에서 볼 수 있듯 SharedFlow 와 StateFlow 는 모두 FusibleFlow 입니다.

SharedFlow

다이어그램의 오른쪽 부분에서 SharedFlow 와 StateFlow 의 관계를 살펴보면 StateFlow interface 가 SharedFlow interface 를 상속하고 있음을 알 수 있습니다. 이것은 우리가 SharedFlow 의 동작원리를 이해 하고나면 StateFlow 에 대한 것들은 StateFlow 만이 제공하는 추가 기능들에 대해서만 분석하면 된다는 것을 의미합니다.

SharedFlow : Collector 는 수집을 시작한 시점부터 전달 된 값들만 수집합니다. 수집 시점을 기준으로 발생한 이벤트들에 따른 동작을 정의할 때 사용합니다.
StateFlow : 기본 값이 있기 때문에 수집을 하면 항상 값을 전달 받습니다. 이러한 특성 때문에 스테이트 머신을 정의하고 상태 변화를 전파하기 위해 사용합니다.

자, 콜드 플로우와 비교해서 SharedFlow 나 StateFlow 같은 핫 플로우가 갖는 중요한 특징은 무엇일까요? 그것은 콜드 플로우는 수집(Collect)이 시작될 때마다 새로운 스트림이 생성되지만, 핫 플로우는 하나 이상의 Collector 와 Emitter 가 공유된 스트림을 통해 데이터를 교환할 수 있다는 점 입니다. 핫 플로우에서는, 더 정확히는 SharedFlow 에서는 이것이 어떻게 가능한 것일까요?

그것은 SharedFlow 가 내부적으로 다음과 같은 버퍼를 운용하고 있기 때문입니다.

Inside of SharedFlow

SharedFlow 는 내부적으로 버퍼를 갖습니다. 이 버퍼에는 플로우에 전달되는 값들과 값의 전달을 대기중인 Emitter 들이 들어가게 됩니다. 이 때, Emitter 들의 버퍼에 존재하는 값들의 가장 마지막에 추가됩니다. 위 그림에서 버퍼 사이즈(값이 저장될 수 있는 크기)는 8 입니다. 또한 버퍼에는 Replay size 를 지정할 수 있는데 현재 버퍼는 그 크기가 3으로 잡혀있고, 만약 새로운 Collector 가 현재 플로우에 값의 수집을 시작한다면 6 부터 값을 수신하게 됩니다.

Collectors

플로우에 값을 수집하기 위해 Collector 가 추가되면 Collector 는 SlotArray 에서 Slot 을 하나 할당 받고, Slot 에 Collector 코루틴의 Continuation 을 저장합니다. 바로 수신 가능한 값이 있다면 수신하지만 (Fast-path), 수신 가능한 값이 없다면 중단 상태로 들어갑니다. 이후에 버퍼에서 값이 수신되면 Continueation 이 재개되면서 값이 전달되게 됩니다.

코루틴은 Continuation 을 통해 중단 함수의 중단과 재개를 수행합니다.

위 그림에서는 총 3개의 Collector 가 Slot 을 할당 받아 데이터를 수집하고 있는데, 초록색 Collector 가 값의 수집이 느려 아직 데이터 1을 수신하지 못했기 때문에 버퍼에서 1을 제거할 수 없고 버퍼는 8까지 가득 찬 상태입니다. 빨간색 Collector 와 파란색 Collector 는 각각 7, 8 까지의 값을 수신하고 있습니다.

Slot 은 SlotArray 에서 Slot 의 위치와 대응되는 Collector 코루틴의 재개를 위한 Continuation 에 대한 속성을 갖는 다음과 단순한 클래스로 정의되어 있습니다.

SharedFlow 에 collect() 함수는 다음과 같이 구현되어 있습니다.

순서에 따라 주석에 숫자를 붙였습니다.

  1. Slot 을 할당받고
  2. 바로 수신 가능한 값이 있는지 버퍼를 확인하고
  3. 바로 수신 가능한 값이 없다면 중단(Suspend) 에 들어가고
  4. 수신 받은 값을 실제 수집 블록으로 전달

위와같이 4개의 단순한 과정으로 이루어져 있습니다.

Emitters

Emitter 들은 버퍼의 가장 마지막 값 뒤에 새로운 값을 전달 합니다. 위 상황에서는 새로운 Emitter 들이 등장하여 값을 전달하고자 하는데 버퍼가 가득차 있는 상태이기 때문에 4개의 Emitter 모두 버퍼의 가장 마지막에 중단 상태로 추가되어 있습니다. 만일 초록색 Collecotor 가 성공적으로 값 1을 수신하거나 예외 등으로 취소된다면 보라색 Emitter 부터 그 다음 값을 전달 할 기회를 얻게됩니다.

버퍼가 가득차 Emitter 들이 값이 전달할 수 없는 경우 다음과 같은 형태로 버퍼에 삽입되어 값의 전달이 가능할 때 까지 대기하게 됩니다.

cont 는 중단 된 Emitter 를 재개시키기 위한 Continuation 이고, value 는 Emitter 가 전달하려던 값 입니다.

이러한 내부 매커니즘이 있기에 SharedFlow (or StateFlow) 는 여러 Collector 들과 Emitter 들이 값을 공유할 수있게 되는 것 입니다. 그리고 이런 버퍼가 내부에 존재하기에 앞서 알아보았던 것 처럼 추가 버퍼를 생성하는 연산자들에 대해서는 Stream Fusion 을 지원함으로써 불필요한 리소스 낭비를 방지합니다.

StateFlow

StateFlow 는 SharedFlow 의 구현을 상속하면서도 하나의 상태 값만 갖도록 설계 되었습니다.

코드를 보면 SharedFlow Interface 는 replayCache 를 정의하고, StateFlow 는 추가적으로 value: T 를 정의하고 있음을 알 수 있습니다. StateFlowImpl 을 보면 StateFlow 가 정의한 value 에 값이 설정(set) 되면 updateState 함수를 이용해 내부 상태를 업데이트 한다는 것과 SharedFlow 가 정의한 replayCache 를 재정의하여 오직 현재 값만 replayCache 로 사용하도록 하고 있다는 것을 알 수 있습니다.

StateFlow 에 Collector 가 수집하기 위해서 collect 함수를 호출하면 다음과 같이 최신 상태를 수신하게 됩니다.

  1. 먼저 Slot 을 할당 받고
  2. 기존 상태와 다른 새로운 상태가 존재한다면 Collection block 으로 전달
  3. 할당 예정인 새로운 값(Pending) 이 있는지 확인하고
  4. 없다면 Slot 의 상태를 Collector 의 Continuation 으로 만들어 중단하며 재개 시그널을 기다림

과 같이 동작합니다. 4번 과정에서 Slot 의 상태를 Collector 의 Continuation 으로 변경하다는 말이 있는데 SharedFlow 의 Slot 을 생각해보면 직관적으로 이해가 되지 않는 것 같습니다. 사실 StateFlow 의 Slot 은 SharedFlow 가 사용하는 Slot 과는 다르게 _state 라는 상태값 하나를 두고 여러가지 상태로 전환하며 사용하도록 되어 있기 때문에 설명도 조금 달라졌습니다.

StateFlowSlot 을 간략하게 살펴보면,

위 코드와 같이 _state 라는 상태가 4 가지 상태를 가질 수 있고 각각 다음을 의미합니다.

  • null : Collector 에 할당되지 않음
  • NONE : Collector 에 할당 되었으며, 중단되지 않았고 Pending 값도 없음
  • PENDING : 새 값을 처리하기 위해 PENDING 상태에 있음
  • CancellableContinuationImpl : 중단 상태로 새 값을 기다리는 중

가장 아래 awaitPending 함수를 보면 현재 상태가 NONE 이라면 Collector 를 중단시키고 해당 Continuation 을 Slot 의 상태로 만들어서 이후에 새 값이 전달되면 재개될 수 있도록 합니다.

지금까지 SharedFlow 와 StateFlow 의 내부 동작에 대해서 살펴보면서 어떻게 이벤트와 상태를 여러 Collector 들과 Emitter 들이 공유할 수 있는지 알아보았습니다.

끝. 🙂

--

--