Deep dive into Coroutine Flow 2

Coroutine flow under the hood (hot flow)

Myungpyo Shim
8 min readJul 30, 2021
unsplash.com

[Korean] English

Table of contents

In the earlier posting (Deep dive into Coroutine Flow 1), I analyzed ❄️ cold flow ❄️ which is one of the Flows in coroutine. Cold flow doesn’t make any stream until it is collected. And it creates and starts new stream for each collector every time it is collected.

In this posting, I am going to analyze SharedFlow and Stateflow that are one of the 🔥 hot flows 🔥 in the coroutine world.

For the tips to understand how to use SharedFlow and StateFlow, see below links.

In the Below diagram that I’d used in the last posting, you can see classes that are burning on the right-bottom. Those are hot flows in which I want to deep dive today. More specifically, They are SharedFlow and StateFlow. 🔥
(Frozen ones are classes for cold flow and I’ve covered it in the last posting.)

First of all, All of the flows including cold & hot flows implement Flow interface which you can find in the top of the digram. But there is a difference between cold flow and hot flow. Hot flows implement both CancellableFlow and FusibleFlow interfaces while Cold flows only implement former one.

FusibleFlow

So, What is FusibleFlow? Following is the code of FusibleFlow interface.

FusibleFlow interface define a function which is named fuse(). According to dictionary, Fuse means joining or blending to form a single entity. Then, what is the meaning of fuse in this coroutine flow world? Due to the fact taht flow is handling data streams, fuse means combining two data streams into one.

When you deal with streams and related operators, you can use the technic called “Stream Fusion” to enhance the performance of whole stream chain. Stream chain is composed of many streams and operators and sometimes those operators would create unnecessary data model or switch its context. In this case, you can optimize the stream chain as applying the Stream Fusion.

On the comment of fuse function in the kotlin source code, it says this function is called from stream operators like flowOn and buffer.
Let’s check the code of flowOn operator.

As you can see in the above code, flowOn operator uses fuse functionality when the type of current flow is FusibleFlow.

the ChannelFlowOperatorImpl creation logic in the else block is used when the type of current flow is cold flow. ❄️

You could also notice that some kind of channel will be created when you use flowOn operator on the cold flow. However if the flow is hot 🔥, the channel will not be created as using Stream Fusion.

I’ve already covered that flowOn operator creates channel buffer internally in order to support context switching in the article about ColdFlow.

Now, Let’s see the fuse implementation of SharedFlow which is one of the implementation of FusibleFlows. More specifically, we are going to see the SharedFlowImpl class which implements SharedFlow.

When we call flowOn operator on the SharedFlow, this function is called and basically following parameters passed.

  • context =Which is passed in the flowOn(context) operator or EmptyCoroutineContext
  • capacity = Channel.OPTIONAL_CHANNEL
  • onBufferOverflow = BufferOverflow.SUSPEND

In the line number 6, when capicity is either RENDEZVOUS or OPTIONAL_CHANNEL and onBufferOverflow is BufferOverflow.SUSPEND, then just return current flow at it is. This is because the flow doesn’t need additional buffer in that case. So, there won’t be any unnecessary chennel creation. 👍

- RENDEZVOUS is the option for transferring data when Provider and Consumer are met. For instance, when a Provider wants to send(emit) data and there aren’t any Consumer, the Provider must wait for a customer and vice versa.
- OPTIONAL_CHANNEL is never used for channel creation but for the implementation of operators like flowOn. It means channel would be created if it is necessary.
- BufferOverflow.SUSPEND is the option for making Emitters wait (be suspended) until there are room for data transmission in the buffer. For example, when a Provider wants to send data to a buffer (capacity = 5) and the buffer is already full of data, the provider must wait until there are any room for the data.

In short, FusibleFlow optimize stream chain when the conditions are met as skipping unnecessary buffer creation so that it makes stream chain performant. As you can see in the class diagram, both SharedFlow and StateFlow are sort of FusibleFlow.

SharedFlow

You can see that StateFlow inherits SharedFrom in the top-right of the class diagram. This means that once you understand how SharedFlow works, you can understand StateFlow easily and all you need to do is checking minor details of SharedFlow.

SharedFlow : Collectors only collect data that are emitted right after its collection is started. You can use SharedFlow to declare actions that should be executed when some events are occurred after collection.
StateFlow : It has default value and it makes you can receive the value all the time. For this feature of StateFlow, you can use StateFlow when you define a state machine and use it to transfer the state to the collectors.

Well, what is the key difference between Cold Flow ❄️ and Hot Flow 🔥 (e.g. SharedFlow and StateFlow)? That is when the stream of flow will be created. Cold Flows create new data stream every time it is collected. On the other hand, Hot Flows have its stream already and various of emitters and collectors share the stream to transfer data each other.

What makes Hot Flow — more specifically SharedFlow — can share its stream among all the Emitters and Collectors ? That is because SharedFlows operate internal buffer mechanism like below.

Inside of SharedFlow

As you can see, SharedFlow has a buffer internally and this buffer can hold values that will be collected by collectors and emitters suspended. All the emitters suspended are inserted at the end of the buffer. In the picture above, the buffer size is 8 — means eight values can be inserted. You can also set the replay size of the buffer. The buffer in the picture has 3 as its replay cache size. When a new collector starts collecting from this buffer, it will receive value from 6.

Collectors

When a collector starts to collect data from this buffer, 1 dedicated slot is allocated to the collector and the slot is positioned in the Slot array. The Slot has only 2 properties — the position in the Slot array and the continuation to resume suspended collector. After taking the Slot, if there are values can be taken immediately, the collectors takes those values right away. If there aren’t any values available, the collector is suspended and save its continuation to the allocated Slot and it will be resumed when some values are added to the buffer.

A coroutine controls resume and pause of suspending function through the continuation.

In the above picture, there are 3 collectors in total and those have its own Slot and are collecting data from the buffer. The green collector is relatively slow and haven’t collect the value 1 yet and it makes the buffer cannot remove the value 1 from it. The buffer is full of values (8 values) and red and blue collectors have collected 7 values and 8 values respectively.

The aforementioned Slot is implemented like below.

As you can see, it has two properties — index on Slot array and continuation of bound collector.

Next code snippet is collect() function of SharedFlow.

I added numbered comments in order.

  1. allocate 1 slot to the collector
  2. check if there are value available or not
  3. when there aren’t any values available, suspend the collector
  4. when receive the value, transfer it to the collection block.

The collection logic of SharedFlow is composed of simple 4 steps like above.

Emitters

Emitters emit new values at the end of the buffer — but right before the suspended emitters. In the aforementioned scenario, 4 emitters are suspended since target buffer is full of values. These suspended emitters are added at the end of the buffer. Suspended emitters will be able to emit their values when green collector collects the value 1 successfully or is cancelled by exception. (Resuming suspended emitters will get values on first-come-first-serve basis.

Those emitters that — cannot emit their data because of lack of space in the buffer — insert to the buffer as the form of below.

The property cont is the continuation which is used to resume the suspended emitter and the property value is what emitter was going to send originally.

SharedFlows (or StateFlow) can make many-to-many relationship among Collectors and Emitters with this mechanism. In addition, SharedFlows can optimize operators which involve additional buffer as using the technic called ‘Stream Fusion” since it already has internal buffer.

StateFlow

StateFlow that — inherits from SharedFlow — has been designed to have only one state with all the functionality of SharedFlow.

SharedFlow interface declares replay cache in the above code snippet. StateFlow interface declares value: T on top of that. You can see that StateFlow updates its internal state when its value is set as calling updateState() function in the StateFlowImpl class. SharedFlow also overrides replayCache and makes it have only one value all the time.

When collectors call collect() function to StateFlow in order to get recent state , the code for the process like below.

  1. Get one dedicated slot from the Slot array
  2. If there is new state which is differ from the old state, transfer the new state to the Collection block.
  3. Check whether there is pending value or not.
  4. If there aren’t any pending values, set the cont property of the Slot as the continuation of the Collector and wait for the resuming signal.

You can find the sentence “ set the cont property of the Slot as the continuation of the Collector” on step 4. This is hard to understand with the knowledge of SharedFlow. In fact, the slot used by StateFlow is differ from the slot used by SharedFlow. The slot of StateFlow only has _state property and it keeps change the property as time goes by.

Let’s take a look the code snippet of StateFlowSLot briefly.

The _state property can have 4 states at a time and each of the states is meaning

  • null : Not be allocated to any Collector yet.
  • NONE : allocated to a Collector and not in the state of suspended and pending.
  • PENDING : in the pending state processing new value.
  • CancellableContinuationImpl : waiting for a new value being suspended.

Take a look at the awaitPending() funtion. The function make the collector be suspended and set the state of Slot as the continuation of the collector. This continuation will be resumed when there are any values can be collected.

This is internal mechanism of SharedFlow and StateFlow to share its value of state among many Collectors and Emitters.

The end. 🙂

--

--

No responses yet