Introduction - simple (and global) coroutine

FILE : Part1KoroutinesBasics.kt

Demo

infrastructure :

we are going to use simple object which simulates data repository with blocking operations.

typealias Image = String
typealias ImageData = Int

object ImageRepoExample1 {

    fun process(data: ImageData): Image {
        displayThread("start generating image data for $data")
        Thread.sleep(1000)
        displayThread("image data generated $data")
        return "image$data"
    }

    fun save(i: Image) {
        displayThread("start saving image $i")
        Thread.sleep(1000)
        displayThread("image saved $i")
    }

}

There is also a method with very straight forward measurement of execution time

fun withTimeMeasurement(title:String,isActive:Boolean=true,code:() -> Unit){
    if(!isActive) return

    val timeStart=System.currentTimeMillis()
    displayThread("start code")
    code()
    displayThread("end code")
    val timeEnd=System.currentTimeMillis()


    println("operation in '$title' took ${(timeEnd- timeStart)} ms")
}
Example 1 :

This is standard blocking sequential code. Three operations - where first two are independent - takes total 3 seconds to execute.

For the whole time main thread is blocked and can not process other requests.

val img1=ImageRepoExample1.process(1)
val img2=ImageRepoExample1.process(2)
ImageRepoExample1.save(img1,img2)
Example 2 :

To not block the main thread let's start our first coroutine in global context. We are going to talk about contexts later

 val job=GlobalScope.launch(Dispatchers.IO) {//TODO: remove dispatchers io and check what will happen
        withTimeMeasurement("Not blocking main thread") {
            val img1=ImageRepoExample1.process(1)
            val img2=ImageRepoExample1.process(2)
            ImageRepoExample1.save(img1,img2)
        }
    }



    runBlocking {
        job.join()
    }

We have an improvement that the main thread is not blocked and can handle other requests but all three operations are executed sequentially and still takes 3 seconds.

Example 3 :

This time we will use async builder method to run coroutine asynchronously :

val img1: Deferred<Image> =GlobalScope.async{
            displayThread("async block1")
            ImageRepoExample1.process(1)
}

val job=GlobalScope.launch {
            ImageRepoExample1.save(img1.await(),img2.await())
}

this way image are processed at the same time and program finishes after 2 seconds.

Also in this example you can see how to declare and use custom thread pool / dispatcher

val workersDispatcher = Executors.newFixedThreadPool(4).asCoroutineDispatcher()

...
 val img2=GlobalScope.async{
            displayThread("async block2 start")
            withContext(workersDispatcher){
                displayThread("async block2 switched context")
                ImageRepoExample1.process(2)
         }
}

observe in logs how easily you are able to change threads execution context.

Example 4 :

However we were able to execute our computation in parallel - the solution we created has serious problem. It works as expected when we are on happy path but in case of any error we can easily leak resources, for example if we have two parallel tasks which will be used to create final result

 val img1:Deferred<Image> = GlobalScope.async{
        println("job 1 fails")
        throw RuntimeException("error")
    }

    val img2=GlobalScope.async{
        ImageRepoExample1.process(2)
    }

    val job=GlobalScope.launch {
        ImageRepoExample1.save(img1.await(),img2.await())
    }

then there is no sense to continue one task when the other one already failed. Unfortunately in our sample code the second task will occupy one core till the end - not matter if the result will be used later or not.

job 1 fails
thread : main : main thread unblocked
thread : DefaultDispatcher-worker-2 : start generating image data for 2
Exception in thread "DefaultDispatcher-worker-3" java.lang.RuntimeException: error
    at lodz.jug.kotlin.coroutines.Part1SimpleCoroutineKt$example4ResourceLeak$img1$1.invokeSuspend(Part1SimpleCoroutine.kt:98)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    at kotlinx.coroutines.DispatchedTask.run(Dispatched.kt:241)
    at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:594)
    at kotlinx.coroutines.scheduling.CoroutineScheduler.access$runSafely(CoroutineScheduler.kt:60)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:740)
thread : main : main thread still unblocked
thread : DefaultDispatcher-worker-2 : image data generated 2

we will see in section about structured concurrency how to solve this problem.

Example 5 :

This time let's try to cancel coroutine by calling cancel method. There are two critical lines in that example sleep and _delay. _They stop program execution for 200 milliseconds , but how they differ exactly?

val job=GlobalScope.launch(Dispatchers.IO) {
        repeat(100){iteration ->
            println("iteration number : $iteration")
            Thread.sleep(200)
//            delay(200)
        }
    }


    runBlocking {
        delay(900)
        job.cancelAndJoin()
    }

If you run your code with Thread.sleep uncommented and delay commented then you may be surprised that there is no cancellation at all - the program will never end. However if you uncomment delay then program cancels after 4 loops.

It works with delay because it is a "framework method" - so called suspended function- which doesn't block current thread but only returns control to the framework so it can handle cancellation signal and cancel coroutine by throwing cancellation exception.

So coroutines won't cancel just like that. In official docs you can find term "cooperative cancellation" which means that both sides needs to be prepared for such signal.

Example 6 :

Time to look closely on how exactly threads are managed when suspending function are called. So we have a calculation which call 3 functions (4th indirectly) :

suspend fun blockingFunctionA():Int= withContext(Dispatchers.IO){
fun standardFunction():Int
suspend fun blockingFunC()=withContext(Dispatchers.IO){
suspend fun dependandFunD():Int = 4 + blockingFunC()

runBlocking {
        val result= blockingFunctionA() + standardFunction() + dependandFunD()
        println("result after all suspensions is : $result")
}

Critical for this example is standardFunction which doesn't change execution context and may be blocking :

If it is not blocking then main thread is not blocked and "waits in suspension" for suspended functions :

Status Park means that thread is not doing anything and is waiting for task/job. Violet means that thread is sleeping and can not handle any other task/job.

https://stackoverflow.com/questions/7497793/understanding-java-lang-thread-state-waiting-parkinghttps://stackoverflow.com/questions/27406200/visualvm-thread-states

If standardFunction blocks then status is different.

But there is more subtle problem with this code. Technically we don't know if function A,C or D blocks! It is just a convention that they should not block but it just a convention.

Example 7 :

To gain more intuition about difference between blocking and suspension let's look at the last example in this section.

We are using here our own "one-thread-dispatcher"

 val singleThread = Executors.newFixedThreadPool(1).asCoroutineDispatcher()

Then we start multiple iterations where we execute 4 functions mentioned in the previous example.

    val jobs=(1 .. 10).map{iterationNumber ->
        Thread.sleep(200)
        GlobalScope.launch(singleThread) {
            println("starting iteration $iterationNumber")
            val result= standardFunction() + blockingFunctionA() + dependandFunD()
            println("iteration $iterationNumber : result after all suspensions is : $result")
        }
    }

    runBlocking {
        jobs.forEach{it.join()}
    }

And again if standardFunction doesn't block then our thread can handle all 10 "requests" and all starting logs appear at the same time :

starting iteration 1
starting iteration 2
starting iteration 3
starting iteration 4
starting iteration 5
starting iteration 6
starting iteration 7
starting iteration 8
starting iteration 9
starting iteration 10

However add Thread sleep to standardFunction and iterations starts slowly one after another, moreover because our thread is blocked any of those can finish before thread is free after starting all 10 iterations .

EXERCISES :

FILE : Exercise1SimpleCoroutineExercises

Exercise1

You have Downloader object which can return "Image result" for 3 predefined urls. If url is not defined it returns empty result.

object Downloader {
        private val images = mapOf(
                "http://image1" to Image("img1"),
                "http://image2" to Image("img2"),
                "http://image3" to Image("img3")
        )


        fun download(url: String): Option<Image> {
            Thread.sleep(500)
            return images[url]?.let { Option.just(it) } ?: Option.empty()
        }
    }

test expects that you will download all three images - plus missing image for unknown url - within one second.

val result = CDownloaderExercise1.downloadAllInMax1Second()

        result.asIterable() shouldContainSame arrayOf(
                Image("img1"),
                Image("img2"),
                Image("img3"),
                MISSING_IMAGE
        )

        ...
 return runBlocking {
            withTimeout(1000L) {
                TODO("withing 1 second perform all 4 downloads and return sequence of images")
            }

 }
Exercise 2

There is a calculator object which can calculate sum of all numbers from range defined in the calculation task. This method blocks for 0,5 second.

 object Calculator {
        suspend fun calculateSumFor(t: CalculationTask1): BigInteger =
                withContext(TODO("choose proper context : remember that  ")) {
                    Thread.sleep(500)
                    ExerciseLogger.log("calculating task for : $t")
                    TODO("calculate sum from 't.from' to 't.to'")
                }
    }
}

you need to calculate all predefined ranges and then sum them to get final result. You have two seconds for that

val tasks = listOf(
                (1 to 5000),
                (5001 to 10000),
                (10001 to 20000),
                (20001 to 30000),
                (30001 to 40000),
                (40001 to 50000),
                (50001 to 60000)
        ).map { (from, to) -> CalculationTask1(BigInteger.valueOf(from.toLong()), BigInteger.valueOf(to.toLong())) }

        return withContext(TODO("choose proper context") ) {
            ExerciseLogger.log("starting main calculations")
            TODO("run all tasks and calculate sum")
        }

results matching ""

    No results matching ""