Kotlin Flows: Exception transparency

Kotlin Flows: Exception transparency

Introduction

A flow is a stream of data that can be computed asynchronously. It’s useful when we need to emit multiple values sequentially. For example, when fetching data from the server, we can emit different results (Loading, Success, Error…) depending on the result of the network call.

A flow can be completed successfully or with an exception (error). Some exceptions are expected, such as reading data from the network and a failure can happen if something is wrong with the connection or the server. However, some exceptions are unexpected and should never happen in the correct code. How can we handle these exceptions appropriately? This question sparked my mind recently when I encountered an unusual crash report:

From the report, I know that I have a NullPointerException, which is of course an unexpected error and should be resolved. After de-obfuscating the stack trace, I know exactly which object is null and causes the crash.

But the interesting part for me is the line “Flow exception transparency is violated”, what does it mean by that? Why is it not just a simple NullPointerException?

Let's find out.

Exception transparency

Suppose that we are writing a simple application that fetches data from a server and displays it in UI. We can use flow like this:

sealed interface Result {
    data class Success(val data: Data) : Result
    data class Error(val exception: Throwable? = null) : Result
    object Loading : Result
}

fun dataFlows(): Flow<Result> = flow {
    emit(Result.Loading)
    try {
        val data = repository.fetchData()
        emit(Result.Success(data))
    } catch (e: Exception){
        emit(Result.Error(e))
    }
}

fun main(args: Array<String>) {
    runBlocking {
        dataFlows().collect { result ->
            when(result) {
                is Result.Loading -> showLoading()
                is Result.Success -> showData(result.data)
                is Result.Error -> showError(result.exception)
            }
        }
    }
}

fun showLoading() {
    println("### Loading...")
}

fun showData(data: Data) {
    println("### Result: ")
    println(data.value)
}

fun showError(e: Throwable?) {
    println("### Error: ")
    println(e)
}

In happy cases, we might have a result like this:

### Loading...

### Result:

Today weather:

Severe heat is expected in this area.

But what if the connection is unstable and the network request fails? The try-catch block helps us handle any errors that occur.

fun dataFlows(): Flow<Result> = flow {
    emit(Result.Loading)
    try {
        val data = repository.fetchData()
        emit(Result.Success(data))
    } catch (e: Exception){ 
        emit(Result.Error(e))
    }
}

class Repository() {
    suspend fun fetchData(): Data {
        // simulate request fail
        throw NetworkException()
    }
}

### Loading...

### Error:

NetworkException: Connection error. Please try again later.

So far so good. We have handled the network exception and displayed the proper UI. Using a try-catch block like this, all the exceptions that happen in the repository can be propagated to the UI.


But imagine what will happen if we have errors in the code that update the UI? With the try-catch block inside the flow, all exceptions in the collect block (downstream) will be caught as well. The consumer of the flow will not be notified about the errors.

Kotlin Flows are designed to allow modular reasoning about data streams. Every flow implementation has to ensure exception transparency - a downstream exception must always be propagated to the collector.

Let’s simulate that scenario.

fun main(args: Array<String>) {
    runBlocking {
        dataFlows().collect { result ->
            when(result) {
                is Result.Loading -> showLoading()
                is Result.Success -> showData(result.data)
                is Result.Error -> showError(result.exception)
            }
        }
    }
}

fun showData(data: Data) {
   // simulate error when updating UI
   throw NullPointerException()
}

What happened here is that the flow has caught a downstream exception when the consumer is processing the result of emit(Result.Success(data)). The flow then emits a different error to the consumer, which violates the exception transparency and can lead to unspecified behavior.

Fortunately, the log also tells us how to fix this, by using Flow.catch() operator instead.

Handling exceptions

We want to handle exceptions transparently and predictably. Any consumer throwing an exception can always catch it using try/catch. One simplified version to handle errors can be written like this:

public fun <T> Flow<T>.handleErrors(action: (Throwable) -> Unit): Flow<T> =
   flow {
      val exception = handleErrorsImpl(this)
       if (exception != null) action(exception)
   }

suspend fun <T> Flow<T>.handleErrorsImpl(collector: FlowCollector<T>): Throwable? {
   var fromDownstream: Throwable? = null

   try {
       collect {
           try {
               collector.emit(it)
           } catch (e: Throwable) {
               fromDownstream = e
               throw e
           }
       }
   } catch (e: Throwable) {
       if (e == fromDownstream) {
           throw e // If exception is from downstream, re-throw it.
       } else  {
           return e // Return exception from upstream
       }
   }

   return null
}

Notice we only catch exceptions in the flow completion and call a specified [action] with the caught exception. Any exception that comes from downstream will be re-thrown to the collector.

Here are the results of that code when we have errors from downstream and upstream. Now the errors are very clear and we know exactly where they occurred.

Upstream errors:

fun main(args: Array<String>) {
    runBlocking {
        dataFlows()
            .handleErrors {
                println(it.message)
            }
            .collect { result ->
                try {
                    when(result) {
                        is Result.Loading -> showLoading()
                        is Result.Success -> showData(result.data)
                        is Result.Error -> showError(result.exception)
                    }
                } catch (e: Exception) {
                    println("Error when updating UI: $e")
                }
        }
    }
}

fun dataFlows(): Flow<Result> = flow {
    emit(Result.Loading)
    val data = repository.fetchData()
    emit(Result.Success(data))
}

class Repository() {
    suspend fun fetchData(): Data {
        // simulate network error
        throw NetworkException()
    }
}

### Loading...

### Error:

NetworkException: Connection error. Please try again later.

Downstream errors:

fun main(args: Array<String>) {
    runBlocking {
        dataFlows()
            .handleErrors {
                println(it.message)
            }
            .collect { result ->
                try {
                    when(result) {
                        is Result.Loading -> showLoading()
                        is Result.Success -> showData(result.data)
                        is Result.Error -> showError(result.exception)
                    }
                } catch (e: Exception) {
                    println("Error when updating UI: $e")
                }
        }
    }
}

fun showData(data: Data) {
    // simulate error when display result in the UI
    throw NullPointerException()
}

### Loading...

Error when updating UI: java.lang.NullPointerException

But we don’t have to write all the logic to handle errors ourselves, Kotlin Flows provides several exception-handling operators that ensure exception transparency. In this case, we can use the Flow.catch() operator. The catch intermediate operator, honoring exception transparency, catches only upstream exceptions (that is an exception from all the operators above catch, but not below it). You can find the full source code of the catch operator here.

The final code in our example when using Flow.catch() is shown below.

fun main(args: Array<String>) {
    runBlocking {
        dataFlows()
            .collect { result ->
                try {
                    when(result) {
                        is Result.Loading -> showLoading()
                        is Result.Success -> showData(result.data)
                        is Result.Error -> showError(result.exception)
                    }
                } catch (e: Exception) {
                    println("Error when updating UI: $e")
                }
        }
    }
}

fun dataFlows(): Flow<Result> = flow {
    emit(Result.Loading)
    val data = repository.fetchData()
    emit(Result.Success(data))
}.catch {
    emit(Result.Error(it))
}

References

Exceptions in Kotlin Flows - Roman Elizarov

Exception transparency - kotlinlang.org/docs