Deep dive into Coroutine Flow 1

Coroutine flow under the hood (cold flow)

Myungpyo Shim
6 min readJul 30, 2021

--

unsplash.com

[Korean] English

Table of contents

These days many Android projects are trying to change their code base from Java to Kotlin more and more. And they also change their stream processing logic like RxJava/Kotlin to Coroutine Flow with the code base change. Although coroutine flow has a few builtin operators and window control functionality compared to powerful reactive-streams extensions, coroutine is relatively lighter than the others for this reason. Moreover coroutine flow supports suspending function as an operator on the chain of stream and it makes your code be more easy-to-read. 🌟
(One more additional benefit of coroutine flow is that most of the libraries in the JetPack supports coroutine and flow out-of-the box.)

You can see the basics of coroutine flow in the official website.

In this positing, we are looking into the internal of Flow. At first, let’s take a look below class diagram in order to overview the members of Flow World.

Above diagram omits unnecessary minor details and describes the relationships among classes and interfaces in coroutine flow package.

It could look complicated at the first glance but we are only going to take a look at ❄️ Cold Flow part which is marked by ice image this time. And I am going to cover about Hot Flow later soon.

❄️ Cold Stream : Each collector owns its individual stream and the stream doesn’t emit any data until it is collected.
🔥 Hot Stream : A stream can emits data without any collector and two or more collectors can share data on the stream.
(Of course, the stream can be started lazily or replay previous data.)

See the Flow interface on the top-center of the diagram. Flow interface only declares one function “collect()”. We can create Flow as using flow { } coroutine builder or creating MutableStateFlow or MutableSharedFlow directly. After that when we call “collect()” function to it with FlowCollector<T> as its parameter, the flow is started and the emit() function of FlowCollector will be called. We can receive data from Flow in this way.

However I have never called collect() function with FlowCollector. This is because we usually use the inline function “collect()” which you can find the declaration at the bottom of above code snippet.
(Actually, it is suspending inline extension function 😅.)

Look at this inline function. This function creates FlowCollector internally and calls “action” lambda function every time “emit” funcation is called.

action” lambda is declared and marked as crossinline in order to inform that callers cannot do “non-local return” in their action lambda function. In short, the inline function callect calls action lambda inside of internally created FlowCollector and this lambda block cannot have return statement.

Thanks to this inline function, we can collect data from this flow like below.

In the above example, I created Flow as using flow { } coroutine builder and it is declared like below.

flow { } coroutine builder has lambda block paramter which has neither parameter nor return value and has FlowCollector as its receiver type. The other words, this block can be called on the context of FlowCollector. Let’s take a look at the FlowCollector interface again. It has only one funtion “emit()”. For this reason, we can create flow as using flow { } builder and call suspending emit function inside of lambda block.

This process is described in the below picture.

  1. When you pass suspending lambda block which emits values that — ranges from 1 to 5 — to flow { } builder, an object called SafeFlow is created and it has the suspending lambda block as a member peoperty.
  2. When you call “collect { value -> … }” function to collect data from the Flow, the lambda block {value -> …} for collecting is wrapped as a form of FlowCollector (orange colored). After that it is wrapped again as the SafeCallector object (purple colored) which is composed of two property objects: coroutine context of the collector and the FlowCollector.
  3. Finally, the suspending function which is created in step 1 is called in order to emit its values and the receiver is SafeCollector which is created in the step 2.

Following is the source code of SafeFlow.

The “block” property is data emitting function that is passed from the flow { } builder (explained in step 1) and the “collector” parameter of collectSafely function is SafeCollector (explained in step 2).

When data emission block is executed, the emit functions (emit(1),…,emit(5)) starts to send value sequentially to the SafeCollector.
As a Result, SafeCollector mediate the data from the emitter to the suspend lambda block which is passed from collect() function.

Now, let’s summarize the whole process of cold flow collection.

Flow can be created anywhere but collect { } must be called in a coroutine since it is suspending function. When Flow.collect { } function is called in a coroutine, the coroutine is suspended until there are any data emission reached — The coroutine will be resumed every time there are data available in the Flow through the funtion call Flow.emit(1) Flow.emit(2), Flow.emit(3).

Well, It’s time to modify a little bit of this Flow.

First off, I assume the data emission code is I/O bound task and make it executed on a worker thread.

the flow { } builder emits values from 1 to 5 (the same with previous) and flowOn operator has been added at line 4. So the upstream (data emission) will use I/O Dispatcher.

If you use flowOn { } operator in a flow stream chain, the operator affects only to the up-stream.

Now, the Flow is like above picture. You can see that ChannelCoroutine is added in the middle of Collector and Flow. If you use flowOn operator to change the dispatcher of up-stream, ChannelCoroutine which has the specified dispatcher as its coroutine context element is created. When you call Flow.collect { } function, you will receive data from this channel.

In other words, Flow.emit(1), Flow.emit(2), Flow.emit(3) are sent to the ChannelCoroutine through Dispatchers.IO and FlowCollector receives the data from the ChannelCoroutine through Dispatchers.Main
(Coroutine context switching happens at this point).

If the coroutine context which is passed to the flowOn operator is
- the same with the context of the collector
OR
- the same with the context of the collector except Dispatcher

, flowOn operator detects this condition and optimizes the chain.

Now, I’ve added odd number filter to the chain.

And the block diagram is changed like below.

When collect() function is called, the ChannelCoroutine which is created by flowOn operator collects data with the specified Dispatcher. It affects filter operation of FilterCollector and emission of Flow to be executed on the same dispatcher. Below is the result.

1
3
5

Let’s convert the number to star(*) by modifying code like below.

And you can see that MapCollector has been added to the chain like below.

Below is the result.

*
***
*****

This is the internal operation of Coroutine Flow. When you add operators on the flow chain, one or more Flow (SafeFlow) could be added to the chain. After that, when you call collect() function on the flow, cascade collect() function calling happens until it reaches to the root stream and the data from the root stream transfer to the leaf stream.

You can use launchIn(CoroutineScope) operator instead of collect() terminal operator and use onEach operator to use collected data like below. But this is not a new way but a way to use helper function which generates new coroutine in current scope and collect target flow.

Thanks. The end. 😀

--

--