Parallel Map in Kotlin

written in collections, coroutines, kotlin, parallel

Ever wonder how to run map in parallel using coroutines? This is how you do it.

import kotlinx.coroutines.experimental.async import kotlinx.coroutines.experimental.runBlocking //sampleStart fun <A, B>Iterable<A>.pmap(f: suspend (A) -> B): List<B> = runBlocking { map { async { f(it) } }.map { it.await() } } //sampleEnd

Confused? Let’s unpack it.

First we have the function signature which is pretty similar to the actual map extension function signature on Iterable. The only thing we added was the suspend keyword on the parameter, which let’s us use suspend functions in f (as we’re going to see in a moment).

Then we have the runBlocking which let’s us bridge the blocking code with the coroutine world. As the name suggests this will block the current thread until everything inside the block finishes executing. Which is exactly what we want.

Finally we have the actual execution which is divided in 2 steps. The first step launches a new coroutine for each function application using async. This effectively wraps the type of each element with Deferred. In the second step we wait for all function applications to complete and unwrap the result with await.1

How to use it

Easy! Just like you use map:

import kotlinx.coroutines.experimental.async import kotlinx.coroutines.experimental.runBlocking fun <A, B>Iterable<A>.pmap(f: suspend (A) -> B): List<B> = runBlocking { map { async { f(it) } }.map { it.await() } } //sampleStart fun main(args: Array<String>) { println((1..100).pmap { it * 2 }) } //sampleEnd

(Psst! I’m using Kotlin Playground so you can actually run this code!)

Prove that it’s running in parallel

Ok so let’s resort to the good old delay to prove that this is actually running in parallel. We are going to add a delay of 1 second on each multiplication and measure the time it takes to run.

Running over 100 elements the result should be: close to 1,000 milliseconds if it’s running in parallel and close to 100,000 milliseconds if it’s running sequentially.

import kotlinx.coroutines.experimental.async import kotlinx.coroutines.experimental.runBlocking import kotlin.system.measureTimeMillis import kotlinx.coroutines.experimental.delay fun <A, B>Iterable<A>.pmap(f: suspend (A) -> B): List<B> = runBlocking { map { async { f(it) } }.map { it.await() } } //sampleStart fun main(args: Array<String>) { val time = measureTimeMillis { val output = (1..100).pmap { delay(1000) it * 2 } println(output) } println("Total time: $time") } //sampleEnd

  1. Since I’m not explicitly passing any CoroutineContext the DefaultDispatcher will be used.


Comments