Issue
I am rewriting very basic webflux application into kotlin coroutines for comparison purposes, meaning instead of reactor API, I am using suspend functions with Flows.
I am using webflux with postgres r2dbc running on the netty.
The code that is meant to be rewritten:
override fun run(vararg args: String) {
val sql = StreamUtils.copyToString(schema.inputStream, Charsets.UTF_8)
val insertProducts = Flux.range(0, 1000)
.map { Product(null, "product-$it", ThreadLocalRandom.current().nextDouble(10.0, 1000.0)) }
.collectList()
.flatMap { productRepository.saveAll(it).then() }
template.databaseClient.sql(sql)
.then()
.then(insertProducts)
.doFinally { logger.info { "Db data setup is finished." } }
.subscribe()
}
What I managed to implement:
override fun run(vararg args: String) {
val sqlInit = StreamUtils.copyToString(schema.inputStream, Charsets.UTF_8)
runBlocking(Dispatchers.IO) {
val products = mutableListOf<Product>()
for (i in 1..1000) {
Product(null, "product-$i", ThreadLocalRandom.current().nextDouble(10.0, 1000.0))
.also { products.add(it) }
}
val sqlExecutionFlow =
template.databaseClient.sql(sqlInit)
.then()
.asFlow()
val productsFlow = productRepository.saveAll(products) //CoroutineCrudRepository
sqlExecutionFlow.onCompletion { logger.info { "Database initialized." } }.collect()
productsFlow.onCompletion { logger.info { "Database filled with products." } }.collect()
}
}
It works, however I'd like to know if this is a good way how to do it.
It's a fire-and-forget job that should be triggered on the application startup.
Should I use runBlocking with IO dispatcher or without? Should I rather use GlobalScope for it?
I read it is in general discouraged to use, but this seems like a valid case.
Thanks for any input.
Solution
Should I use
runBlocking
with IO dispatcher or without?
Your code doesn't actually exercise the dispatcher at all, so you can safely remove that with no change in behavior. The dispatcher comes into action only when you launch more coroutines from runBlocking
. The coroutine that starts automatically to execute the block of code you pass to runBlocking
runs in the ad-hoc dispatcher it creates on the calling thread, which it occupies until completion.
Should I rather use
GlobalScope
for it?
By this I think you mean "use GlobalScope.launch
", and for your exercise I think that would be a step in the right direction.
If I compare your current solution to the original, I notice that the original is asynchronous in nature and override fun run()
completes immediately, before the work is done. On the other hand, your usage of runBlocking
results in synchronous operation.
If you replace runBlocking
with GlobalScope.launch
, you'll get the same asynchronous behavior.
One more point, which could be crucial in your phase of learning: coroutines do not need the declarative programming style to achieve their non-blocking behavior, and that is their main point. This gives you the freedom to write regular, procedural-looking code with if
and for
, and it will still be non-blocking — as long as you use suspendable API for IO.
This point relates to the initial one as well: as long as you use suspendable APIs, you never need Dispatchers.IO
, which is there only as a workaround for the situations where you use a blocking API.
Answered By - Marko Topolnik
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.