Issue
I have a callback based API like this:
class CallbackApi {
fun addListener(callback: Callback) {
// todo
}
fun removeListener(callback: Callback) {
// todo
}
interface Callback {
fun onResult(result: Int)
}
}
and an extension function which converts the API into a hot cold flow:
fun CallbackApi.toFlow() = callbackFlow<Int> {
val callback = object : CallbackApi.Callback {
override fun onResult(result: Int) {
trySendBlocking(result)
}
}
addListener(callback)
awaitClose { removeListener(callback) }
}
Would you mind suggesting how to write a unit test which ensures that the API is correctly converted to a hot flow?
Here is my attempt. By trial and error, I came up with this solution.
@Test
fun callbackFlowTest() = runBlocking {
val callbackApi = mockk<CallbackApi>()
val callbackSlot = slot<CallbackApi.Callback>()
every { callbackApi.addListener(capture(callbackSlot)) } just Runs
every { callbackApi.removeListener(any()) } just Runs
val list = mutableListOf<Int>()
val flow: Flow<Int> = callbackApi.toFlow().onEach { list.add(it) }
val coroutineScope = CoroutineScope(this.coroutineContext + SupervisorJob())
flow.launchIn(coroutineScope)
yield()
launch {
callbackSlot.captured.onResult(10)
callbackApi.removeListener(mockk()) // this was a misunderstanding
}.join()
assert(list.single() == 10)
}
But I don't understand two pieces of this solution.
1- In the absence of that SupervisorJob(), it appears that test will never end. Maybe collecting the flow never ends for some reason, which I don't understand. I'm feeding captured callback in a separate coroutine.
2- If I remove the launch body which callbackSlot.captured.onResult(10) is inside it, test will fail with this error UninitializedPropertyAccessException: lateinit property captured has not been initialized. I would think that yield should start the flow.
Solution
and an extension function which converts the API into a hot flow
This extension looks correct, however the flow is not hot (nor should it be). It only registers a callback when an actual collection starts, and unregisters when the collector is cancelled (this includes when the collector uses terminal operators that limit the number of items, such as .first() or .take(n)).
This is quite an important note to keep in mind for your other questions.
In the absence of that SupervisorJob(), it appears that test will never end. Maybe collecting the flow never ends for some reason, which I don't understand
As mentioned above, due to how the flow is constructed (and how the CallbackApi works), the flow collection cannot end by decision of the producer (the callback API). It can only stop by cancellation of the collector, which will also unregister the correponding callback (which is good).
The reason your custom job allows the test to end is probably because you're escaping structured concurrency by overriding the job in the context by a custom one that doesn't have that current job as parent. However you're likely still leaking that neverending coroutine from the scope that is never cancelled.
I'm feeding captured callback in a separate coroutine.
And that's correct, although I don't understand why you call removeListener from this separate coroutine. What callback are you unregistering here? Note that this also cannot have any effect on the flow, because even if you could unregister the callback that was created in the callbackFlow builder, it wouldn't magically close the channel of the callbackFlow, so the flow wouldn't end anyway (which I'm assuming is what you tried to do here).
Also, unregistering the callback from outside would prevent you from checking it was actually unregistered by your production code.
2- If I remove the launch body which callbackSlot.captured.onResult(10) is inside it, test will fail with this error UninitializedPropertyAccessException: lateinit property captured has not been initialized. I would think that yield should start the flow.
yield() is quite brittle. If you use it, you must be very conscious about how the code of each concurrent coroutine is currently written. The reason it's brittle is that it will only yield the thread to other coroutines until the next suspension point. You can't predict which coroutine will be executed when yielding, neither can you predict which one the thread will resume after reaching the suspension point. If there are a couple suspensions, all bets are off. If there are other running coroutines, all bets are off too.
A better approach is to use kotlinx-coroutines-test which provides utilities like advanceUntilIdle which makes sure other coroutines are all done or waiting on a suspension point.
Now how to fix this test? I can't test anything right now, but I would probably approach it this way:
- use
runTestfromkotlinx-coroutines-testinstead ofrunBlockingto control better when other coroutines run (and wait for instance for the flow collection to do something) - start the flow collection in a coroutine (just
launch/launchIn(this)without custom scope) and keep a handle to the launchedJob(return value oflaunch/launchIn) - call the captured callback with a value,
advanceUntilIdle()to ensure the flow collector's coroutine can handle it, and then assert that the list got the element (note: since everything is single threaded and the callback is not suspending, this would deadlock if there was no buffer, butcallbackFlowuses a default buffer so it should be fine) - optional: repeat the above several times with different values and confirm they are collected by the flow
- cancel the collection job,
advanceUntilIdle(), and then test that the callback was unregistered (I'm not a Mockk expert, but there should be something to check thatremoveListenerwas called)
Note: maybe I'm old school but if your CallbackApi is an interface (it's a class in your example, but I'm not sure to which extent it reflects the reality), I'd rather implement a mock manually using a channel to simulate events and assert expectations. I find it easier to reason about and to debug. Here is an example of what I mean
Answered By - Joffrey
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.