Effective Coroutine Flow

A Safer way to collect flows from Android UIs

Myungpyo Shim
8 min readJul 30, 2021
unsplash.com

[Korean] English

Table of contents

You need to handle data stream that— is set of data related each other — from time to time when you develop an application. One of the examples you can think is observing data of a sensor which keep changing or data chunks which is part of a large file. In these cases, You can use various kinds of Stream APIs that are supported by the language itself or 3rd parties such as RxJava/Kotlin that is one of the ReactiveX implementation. But you have another option in Kotlin and that is Flow in Coroutine.

This posting is inspired by A Safer way to collect flows from Android UIs written by Manuel Vivo. 👍

Flow(Stream) has mainly two types: Cold flow and Hot flow. This posting explains possible inefficiency of cold flow when you use it carelessly and suggests possible solutions for it with a little bit of internal stories.

Cold Flow❄️ : Each collector will get its own stream every time it collects the flow.
Hot Flow🔥 : Each collector can share one stream and the stream can be started eagerly or lazily — It depends on the configuration of the flow.

If you read my previous posting Coroutine Flow Internal 1, you will be able to understand this posting more easily.👍

We sometimes apply various of architecture patterns to our application we make or even create our own architecture in order to loosen coupling between ui and business logics. Eventually, we get multiple layers from the domain to the ui layer thanks to the architecture pattern. The data flow flows through these layers (we tend to prefer uni-directional data flow these days).

A few layers could be a gateway layer for other layers as providing many additional functionalities like authentication or thread control.
(But this is not the topic of this posting 😅).

You can see the data flow from the UI to the DataSource and it’s uni-directinal round trip. As I’ve mentioned earlier, we (you and I including our coworkers) would make some boundaries like the point where the context can be switched or the point where authentication should be checked.

In the posting of Manuel Vivo, he also said it is very natural thing to create a Flow in the data layer and offer it to the UI layer in our application architecture. and actually many of components in JetPack are also using this way when they need to provide data stream to the UI layer.

In the above picture, UI requests data to the data layer and the data layer returns Flow to the UI. Eventually, the data layer starts to send its data to the UI layer exactly when the UI layer calls collect function.

Now, Let’s take a look some edge cases that cold flow could be ineffective and find out how to improve the performance of our app.

When we talk about the Flow, we can separate it as two main parts and those are Provider which provides data through the stream and Collector which collects data from the stream. Basically there are no buffers involved when a collector collects from the Cold Flow and this means both provider and collector can be suspended until they meet (Rendezvous). For this reason, there is no need to control stream window. So, there won’t be wasted resources if collectors control the collection timing appropriately.

In Android, we can collect from a flow in the coroutine that is created by launchWhenXXX coroutine builder (e.g. launchWhenStarted, launchWhenResumed, …) so that the collector controls the collection timing. To be more specific, In the onStart-onStop lifecycle bound coroutine, collector can be lazily started when the UI lifecycle is in before onStart state and can be paused when the UI lifecycle is after onStop state. However this is insufficient in some cases 😭.

When you use callbackFlow { } which is one of the flow builders or some operators like buffer, conflated, flowOn, and shareIn, the flow you use can not react from the UI lifecycle. That’s because the aforementioned flow has builtin❗️buffer ❗️in it and this makes the flow emits data until the buffer is fully filled with data even though there are no collectors attached.

Let’s take a look the code snippet of callbackFlow { } builder.

Above code snippet is constructor of callbackFlow { } builder. The constructor receives capacity of buffer as its parameter and extends from ChannelFlowBuilder and it means it uses channel buffer internally.

The flowOn operator is used to change the execution context of upstream flow (mainly for changing its dispatcher). When we use flow with the operator flowOn in order to change the dispatcher, it creates channel buffer internally.
We covered it in the previous posting “Coroutine Flow Internal 1”.

You can see the differences between buffered and no-buffered flow collection.

When the provider emits data to the collector, the collector collects the data in its suspending collection block. When the collection is too slow or is paused temporarily, the provider can not emit data anymore and is suspended until the collector is available.
(The gray colored emission and collection flows won’t be happened here.)

However if you use aforementioned callbackFlow { } builder or the operators which has a buffer internally, Provider can emits its data even if collectors of the stream are suspended like above picture. This is because the provider can emits its data to the buffer which is located in the middle of prrovider and collector.

This also means providers can emit their data even when the UI components of Android platform (like Activity, Fragment, View, etc) are suspended.

I’ve made an example to elaborate this scenario.
The example demonstrates measuring micro dust level using sensor.
Let’s take a look MainFragment in the example.

I added lifecycle event logs on onStart and onStop callbacks and start collection of the flow on onViewCreated callback in the code snippet above. The collection is bound to the lifecycle of the fragment. You can see that observeAirQuality2() function has commented out. The difference between the two (observeAirQuality1 and observeAirQuality2) is only the fact that observeAirQuality2() uses flowOn operator to switch the execution thread of upstream flow.
(📌 As you may know, using flowOn operator makes flow utilize internal buffer.)

You can see the result of using observeAirQuality1() function below.

As you can see, the provider and the consumer took turn sending to and receiving from the stream. While UI state is in the onStop state, both the provider and the consumer are suspended and there are no data transmission.

You can see the result of using observeAirQuality2() function below.

In the above logs, you can see the provider is still running after MainFragment has been stopped. When Mainfragment is back to onStart state, it collects all the pending data at once and waits fot the next event.
(This is because observeAirQuality2 function uses flowOn operation that involves creation of internal buffer.)

To prevent this unintended resource consumption of Cold Flow, there will be a helper function released through the Lifecycle KTX.
(Currently it’s alpha — lifecycle:lifecycle-runtime-ktx:2.4.0-alpha01)

When you collect from cold flow, you can use repeatOnLifecycle extionsion function so that the collector can be created and destroyed as the lifecycle of the UI is changing (It’s not just paused but destroyed).

Below is the result logs.

The main part of repeatOnLifecycle is like below.

You can find that the code is started with suspendCancellableCoroutine { } at line 2 and this means caller of repaetOnLifecycle is being suspended at this point.

Through the line 3 to 4, it determines when to execute and when to cancel the block.

After that, it subscribes lifecycle event so that it can execute or cancel the block. In addition, it resumes current coroutine that— was suspended at line 2 — when onDestroy lifecycle callback is called.

In other words,

The suspended coroutine at line 2 will be resumed at line 9 (Some clean-up tasks) and the function block will be finished.

You can also use flowWithLifecycle operator instead of repeatOnLifecycle operator like below.

The result is the same with the previous one and you can see its implementation detail below.

You can find the fact that flowWithLifecycle operator use repeatOnLifecycle operator wrapped with callbackFlow internally.

When you collect a flow within a repeatOnLifecycle block, collect function is controlled by external lifecycle event. On the other hand, When you use flowWithLifecycle operator on a flow, the upstream collection of the flow is controlled by internal operator. So, flowWithLifecycle operator uses cabackFlow internally.

As I mentioned earlier, repeatOnLifecycle is being resumed at line 5 when onDestroy lifecycle callback is called and it is being finished with cleaning up unused resources at line 10.

Here is one more important thing you should know. repeatOnLifecycle and flowWithLifecycle both are not paused but cancelled when they meet ending lifecycle callback (e.g. onPause, onStop). This means that you are going to get fresh new data stream and read data from the starting point when the lifecycle is resumed.

For this reason, It’s quite appropriate when you observe some pysical quantity such as luminance or coordinates because you don’t need the data which is emitted while lifecycle is in the paused state. If you want to collect all the stream data regardless of UI lifecycle, flowWithLifecycle or repeatOnLifecycle is not the right tool for you. You can just use launchWhenXXX function because you want to get all the stream data regardless of UI lifecycle.
(It doesn’t mean launchWhenXXX function ignore UI lifecycle, It will hold all the stream data during the paused state and emit those data after it resumed.)

If you build and run the sample application, you can see the below UI on its first scene. It has a progress bar which mimics downloading binary of system updates. This uses flowWithLifecycle to download the system binary. When you go device home and back to the app, It starts again from the start position. And this is obviously not you want to do.
(Of course, as an Android developer, you should use ForegroundService or WorkManager for this kind of task. This is just for the example.)

Until now, we’ve covered the scenario that collecting a cold flow involves unnecessary resource consumption.

In conclusion, You don’t always have to use repeatOnLifecycle or flowWithLifecycle instead of launchWhenXXX. You can choose depends on the circumstances.

Just keep in mind that repeatOnLifecycle and flowWithLifecycle do not pause the collection when they exit desired lifecycle state but cancel the collection. This is the key of this posting. 📌

Thanks. The end.

--

--