import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
val sourceFlow: Flow<Int> = flowOf(1, 2, 3)
val transformedFlow: Flow<String> = sourceFlow.flatMapConcat { number ->
flow {
emit("Start $number")
delay(100)
emit("End $number")
}
}
transformedFlow.collect { value ->
println(value)
}
}
In this example, we start with a source Flow of integers (sourceFlow
). We use the flatMapConcat
operator to transform each emitted integer into a new Flow.
- Each integer generates a new Flow using the
flow {}
builder.
- Inside this nested Flow, two strings are emitted with a delay between them.
- Since
flatMapConcat
ensures sequential execution, the next integer’s Flow only starts after the previous one finishes.
Output:
Start 1
End 1
Start 2
End 2
Start 3
End 3
As you can see, the emitted values keep their original order, and all nested Flows are combined into one continuous stream.
The flatMapConcat
operator is helpful when you need to run asynchronous tasks for each item in a Flow while keeping the order of the results.
flatMapMerge – Merging Multiple Flows
The flatMapMerge
operator in Kotlin Flow combines multiple Flows into a single Flow, but unlike flatMapConcat
, it does not preserve the order of emissions. Each item from the source Flow is transformed into a new Flow, and all the elements from these nested Flows are merged together in parallel. The final output may have interleaved results, depending on the execution timing.
How It Works:
- Source Flow: The original Flow where
flatMapMerge
is applied.
- Transformation Function: A function that takes each item from the source Flow and returns a new Flow.
- Resulting Flow: A merged Flow that emits all values from the nested Flows. Since the nested Flows run in parallel, their order is not guaranteed.
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
val sourceFlow: Flow<Int> = flowOf(1, 2, 3)
val transformedFlow: Flow<String> = sourceFlow.flatMapMerge { number ->
flow {
emit("Start $number")
delay(100)
emit("End $number")
}
}
transformedFlow.collect { value ->
println(value)
}
}
In this example, we start with a Flow of integers (sourceFlow
). We use the flatMapMerge
operator on sourceFlow
to create a new Flow for each emitted number. Each of these nested Flows emits two strings with a short delay.
When we collect and print the values from transformedFlow
, the output may look like this:
Start 1
Start 2
Start 3
End 1
End 2
End 3
As you can see, the elements from different nested Flows are mixed together and may not follow the original order. This happens because each Flow runs independently, and their results appear as they finish.
The flatMapMerge
operator is helpful when you need to process multiple items at the same time and combine the results into one Flow. It’s useful when order doesn’t matter, and you want to improve performance by running tasks in parallel.
flatMapLatest – Keeping Only the Latest Value
The flatMapLatest
operator in Kotlin Flow merges multiple Flows into a single Flow but ensures that only the latest emitted value is processed. When a new item arrives from the source Flow, any previous transformation is canceled, and only the latest item is processed.
How it works:
- Source Flow: The original Flow that emits values.
- Transformation Function: A function that takes an item from the source Flow and returns another Flow.
- Cancels Previous Transformations: If a new item arrives, any ongoing processing for previous items is stopped.
- Resulting Flow: Emits values only from the most recent transformation.
Here’s a simple example in code:
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
val sourceFlow: Flow<Int> = flowOf(1, 2, 3)
val transformedFlow: Flow<String> = sourceFlow.flatMapLatest { number ->
flow {
emit("Start $number")
delay(100)
emit("End $number")
}
}
transformedFlow.collect { value ->
println(value)
}
}
In this example, we have a Flow of integers (sourceFlow
). We use the flatMapLatest
operator to create a new Flow for each emitted number. This new Flow generates two strings with a delay between them.
The key difference with flatMapLatest
is that when a new number is emitted, it cancels the ongoing process for the previous number. This ensures that only the latest number’s transformation continues.
When we collect and print the values from transformedFlow
, the output looks like this:
Start 1
Start 2
Start 3
End 3
As you can see, when item 3 is emitted, the transformations for items 1 and 2 are canceled. Only item 3’s transformation is processed and emitted.
The flatMapLatest
operator is helpful when you need to focus on the latest data and cancel any previous work when new data arrives. This is useful for keeping your processing in sync with the most recent updates.
take – Limit the Number of Emissions
The take
operator limits how many items a Flow emits. You provide a number that specifies the maximum items you want to receive. Once this limit is reached, the Flow stops emitting more items.
How It Works:
- Source Flow: The Flow you apply
take
on.
- Parameter: An integer that sets the maximum number of items to receive.
- Resulting Flow: A new Flow that emits only the first
n
items. After that, it stops.
Here’s an example in code:
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
val sourceFlow: Flow<Int> = flowOf(1, 2, 3, 4, 5)
val takenFlow: Flow<Int> = sourceFlow.take(3)
takenFlow.collect { value ->
println(value)
}
}
In this example, we have a Flow of integers called sourceFlow
. We use the take
operator with a limit of 3, meaning we only want the first three items.
When we collect and print the values from takenFlow
, the output looks like this:
The take
operator helps you control the amount of data a Flow emits. It is useful when you only need the first few items or want to avoid processing too much data and using too many resources.
drop – Skipping Initial Flow Items
The drop
operator in Kotlin Flow lets you skip a set number of initial elements and emit only the remaining ones. This is useful when you want to ignore the first few items before processing the rest.
How It Works:
- Source Flow: The original Flow you apply
drop
to.
- Parameter: An integer specifying how many initial items to skip.
- Resulting Flow: A new Flow that emits everything except the first n items.
Here’s an example in code:
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
val sourceFlow: Flow<Int> = flowOf(1, 2, 3, 4, 5)
val droppedFlow: Flow<Int> = sourceFlow.drop(2)
droppedFlow.collect { value ->
println(value)
}
}
In this example, we start with a Flow of integers (sourceFlow
). We use the drop
operator with a value of 2
, meaning we skip the first two items.
When we collect and print the values from droppedFlow
, the output looks like this:
As you can see, the drop
operator removes the first two items from the source Flow and only emits the rest.
This operator is helpful when you want to ignore initial data that isn’t needed. It lets you focus on the remaining elements without extra processing.
onEach – Performing Actions on Flow Items
The onEach
operator in Kotlin Flow lets you perform actions on each emitted item without changing its value. This is useful for logging, debugging, or monitoring data as it flows through.
How it Works:
- Source Flow: The original Flow where
onEach
is applied.
- Lambda Function: A function that runs for each item. It doesn’t change the item but performs a side effect (like printing or logging).
- Resulting Flow: A new Flow that emits the same values as the original, but also runs the provided function on each item.
Here’s a simple example in code:
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
val sourceFlow: Flow<Int> = flowOf(1, 2, 3, 4, 5)
val modifiedFlow: Flow<Int> = sourceFlow.onEach { value ->
println("Element: $value")
}
modifiedFlow.collect { value ->
}
}
In this example, we have a Flow of integers (sourceFlow
). We use the onEach
operator to print each item as it passes through.
When we run the code, the output is:
Element: 1
Element: 2
Element: 3
Element: 4
Element: 5
The onEach
operator helps you observe or log data without changing it. It’s useful for debugging, logging, or monitoring while keeping the Flow’s original values intact.
catch – Handling Errors in Flows
The catch
operator helps handle errors that may occur in a Flow, allowing you to manage exceptions and recover gracefully.
How It Works:
- Source Flow: The Flow where you apply the
catch
operator.
- Lambda Function: A function that receives the exception and defines how to handle it.
- Resulting Flow: The Flow continues normally, but if an error occurs,
catch
intercepts it and executes the provided function. You can choose to log the error, emit a fallback value, or recover in another way.
Here’s an example in code:
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
val sourceFlow: Flow<Int> = flow {
emit(1)
emit(2)
throw RuntimeException("Error occurred!")
emit(3)
emit(4)
}
val recoveredFlow: Flow<Int> = sourceFlow.catch { exception ->
println("Caught exception: ${exception.message}")
emit(5)
}
recoveredFlow.collect { value ->
println("Received: $value")
}
}
In this example, sourceFlow
emits integers but encounters an error during execution.
- We use the
catch
operator to handle the exception.
- When an error occurs, the lambda function prints an error message and emits a recovery value (
5
) to keep the Flow running.
Received: 1
Received: 2
Caught exception: Error occurred!
Received: 5
Instead of stopping due to an error, the Flow continues with a fallback value. You can customize the error handling based on your needs.
flowOn – Controlling Execution Thread
The flowOn
operator changes the thread or context where a Flow runs. This is useful when you want to move heavy or time-consuming tasks to a different thread.
How It Works:
- Source Flow: The Flow that emits data.
- Coroutine Dispatcher: A dispatcher (like
Dispatchers.IO
) that tells Flow which thread to use.
- Resulting Flow: A new Flow that runs its upstream operations (like
map
or filter
) on the specified thread. However, flowOn
does not change the thread of the collector (collect
).
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
fun main() = runBlocking {
val sourceFlow: Flow<Int> = flow {
for (i in 1..5) {
println("Emitting $i in thread ${Thread.currentThread().name}")
emit(i)
delay(100)
}
}
val transformedFlow: Flow<String> = sourceFlow.map { value ->
"Transformed $value in thread ${Thread.currentThread().name}"
}.flowOn(Dispatchers.IO)
withContext(Dispatchers.Default) {
transformedFlow.collect { value ->
println("Received: $value in thread ${Thread.currentThread().name}")
}
}
}
- We have a Flow that emits integers with delays.
- The
map
operator transforms each number into a string.
- We use
flowOn(Dispatchers.IO)
before map
, so the mapping runs on the IO thread. This helps handle heavy tasks efficiently.
- The
collect
operation runs in the Default dispatcher (using withContext
).
- This shows that
flowOn
only changes the execution thread for upstream operations (before collect
).
When we run this code, we see that:
- The
map
operation runs on the IO dispatcher.
- The
collect
operation runs on the Default dispatcher.
Emitting 1 in thread DefaultDispatcher-worker-2
Received: Transformed 1 in thread DefaultDispatcher-worker-2 in thread DefaultDispatcher-worker-1
Emitting 2 in thread DefaultDispatcher-worker-2
Received: Transformed 2 in thread DefaultDispatcher-worker-2 in thread DefaultDispatcher-worker-2
Emitting 3 in thread DefaultDispatcher-worker-2
Received: Transformed 3 in thread DefaultDispatcher-worker-2 in thread DefaultDispatcher-worker-2
Emitting 4 in thread DefaultDispatcher-worker-2
Received: Transformed 4 in thread DefaultDispatcher-worker-2 in thread DefaultDispatcher-worker-1
Emitting 5 in thread DefaultDispatcher-worker-1
Received: Transformed 5 in thread DefaultDispatcher-worker-1 in thread DefaultDispatcher-worker-1
The map
operation runs on the IO dispatcher (as set by flowOn
), while collect
runs on the Default dispatcher. This helps improve concurrency and makes better use of system resources in Flow-based code.
zip – Combining Flow Items
The zip
operator in Kotlin Flow combines elements from multiple Flows into pairs (or tuples). It waits for each Flow to emit an element and then pairs those elements together before emitting them.
How it Works:
- Input Flows: You provide two or more Flows to the
zip
operator.
- Resulting Flow: It emits pairs of elements, where each pair consists of one element from each input Flow. It only emits when all input Flows have emitted a value.
Here’s an example in code:
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
val numbersFlow: Flow<Int> = flowOf(1, 2, 3, 4, 5)
val lettersFlow: Flow<String> = flowOf("A", "B", "C", "D", "E")
val zippedFlow: Flow<Pair<Int, String>> = numbersFlow.zip(lettersFlow) { number, letter ->
Pair(number, letter)
}
zippedFlow.collect { pair ->
println("Received: ${pair.first} - ${pair.second}")
}
}
In this example, we have two Flows: numbersFlow
, which emits integers, and lettersFlow
, which emits strings. We use the zip
operator to combine them into pairs, where each pair consists of one number and one letter.
When we collect and print the values from zippedFlow
, we get:
Received: 1 - A
Received: 2 - B
Received: 3 - C
Received: 4 - D
Received: 5 - E
The zip
operator ensures that elements are paired based on their emission order, and it only emits a pair when both Flows have produced a value.
When to Use zip
This operator is useful when you need to combine and process data from multiple Flows in sync, such as matching related data from different sources.
distinctUntilChanged – Remove Consecutive Duplicates
The distinctUntilChanged
operator in Kotlin Flow removes consecutive duplicate values from a Flow. It ensures that only unique, non-repeating values (in sequence) are emitted.
How It Works
- Source Flow: The original Flow emitting values.
- Resulting Flow: A new Flow that only emits a value if it is different from the previous one. If the same value appears consecutively, it gets filtered out.
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
val sourceFlow: Flow<Int> = flow {
emit(1)
emit(2)
emit(2)
emit(3)
emit(3)
emit(3)
emit(4)
}
val distinctFlow: Flow<Int> = sourceFlow.distinctUntilChanged()
distinctFlow.collect { value ->
println("Received: $value")
}
}
In this example, sourceFlow
emits a sequence of integers. We apply the distinctUntilChanged
operator to remove consecutive duplicates. The output will be:
Received: 1
Received: 2
Received: 3
Received: 4
The distinctUntilChanged
operator filters out repeated values that appear consecutively (like extra 2
s and 3
s) and only emits the first occurrence of each unique value.
Why Use distinctUntilChanged
?
This operator is helpful when you want to ignore consecutive duplicates, such as preventing unnecessary updates in a UI or reducing redundant processing in a Flow.
retry – Handling Errors and Retrying in Flows
The retry
operator in Kotlin Flow helps recover from errors by retrying the Flow when an exception occurs.
How It Works:
- Source Flow: The Flow where you apply
retry
.
- Parameter: You can specify the number of retries or a condition for retrying.
- Resulting Flow: If an error happens,
retry
will restart the Flow based on the given conditions, either a set number of times or with custom logic.
Here’s an example in code:
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
val sourceFlow: Flow<Int> = flow {
emit(1)
emit(2)
throw RuntimeException("Error occurred!")
emit(3)
}
val retriedFlow: Flow<Int> = sourceFlow.retry(2)
retriedFlow.collect { value ->
println("Received: $value")
}
}
In this example, sourceFlow
emits integers but throws an exception in the middle. We use the retry
operator with a value of 2
, meaning the Flow will retry up to two times if an error occurs. Output will be:
Received: 1
Received: 2
Received: 1
Received: 2
Received: 1
Received: 2
When an exception happens, retry
restarts the Flow up to two times. Each retry starts the Flow from the beginning.
You can also pass a lambda function to retry
to customize retry behavior, such as retrying only for certain exceptions or adding conditions. Here’s an example:
val retriedFlow: Flow<Int> = sourceFlow.retry { cause, _ ->
cause is MyCustomException
}
This lets you control when to retry the Flow based on specific errors.
The retry
operator is helpful for handling temporary errors and ensuring the Flow can recover and continue running when an exception happens.
conflate – Handling Data Efficiently in Flow
The conflate
operator helps when the producer (upstream) emits data faster than the consumer (downstream) can process it. It manages backpressure by skipping intermediate values and keeping only the latest one when the consumer is slow.
How conflate
works:
- It lets the consumer process items at its own speed.
- If a new item is emitted while the previous one is still being processed,
conflate
drops the older value and keeps only the most recent one.
Here’s an example:
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
val flow = flow {
emit(1)
delay(100)
emit(2)
delay(100)
emit(3)
}
flow
.conflate()
.collect { value ->
delay(200)
println(value)
}
}
In this example, the upstream Flow emits values 1, 2, and 3 with delays. However, the downstream (consumer) processes them slowly due to the delay(200)
in the collect
block.
The output will be:
Without conflate
, all values would queue up, which could cause memory and performance issues. But with conflate
, only the latest value is kept, and intermediate values are dropped.
Conclusion
Flow operators in Kotlin help you work efficiently with asynchronous data streams. By learning these operators, you can manage data flows easily, making your code more responsive and easier to maintain. Whether you need to filter, transform, or combine data, Flow operators make handling asynchronous tasks simpler.
Leave a Reply