Completable future – async and non blocking callbacks

 

package com.bawi.completable.future;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.concurrent.CompletableFuture;

public class MyCompletableFuture {
    private static final Logger LOGGER = LoggerFactory.getLogger(MyCompletableFuture.class);

    public static void main(String[] args) throws InterruptedException {
        System.out.println(Thread.currentThread().getName() + ": started");
        Arrays.asList("a", "ab", "abc", "abcd")
            .stream()
            .forEach(text -> {
                CompletableFutur<String> stringCF = CompletableFuture.supplyAsync(() -> produceText(text));
                CompletableFuture<Integer> integerCF = stringCF.thenApply(MyCompletableFuture::calcStringLength);
                CompletableFuture<Double> doubleCF = integerCF.thenApply(MyCompletableFuture::calcCirclePerimeter);
                CompletableFuture<Void> voidCF = doubleCF.thenAccept(MyCompletableFuture::print);
                CompletableFuture<Void> voidCF2 = voidCF.thenRun(() -> System.out.println("DONE"));
                }
            );
        System.out.println(Thread.currentThread().getName() + ": created completable future, about to sleep");
        Thread.sleep(5000);
        System.out.println(Thread.currentThread().getName() + ": finished");
   }

    static String produceText(String text)  {
        LOGGER.info("[daemon={}] 1: Producing text: {}", Thread.currentThread().isDaemon(), text);
        sleepMillis(1000);
        return text;
    }

    static int calcStringLength(String text) {
        int length = text.length();
        LOGGER.info("[daemon={}] 2: Calculating string length: {}", Thread.currentThread().isDaemon(), length);
        sleepMillis(1000);
        return length;
    }

    static double calcCirclePerimeter(int r) {
        double perimeter = 2 * Math.PI * r;
        LOGGER.info("[daemon={}] 3: Calculating circle perimeter: {}", Thread.currentThread().isDaemon(), perimeter);
        sleepMillis(1000);
        return perimeter;
    }

    static void print(Double d) {
        LOGGER.info("[daemon={}] 4: Printing: {}", Thread.currentThread().isDaemon(), d);
        System.out.println("d=" + d);
    }

    private static void sleepMillis(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Output (filtered) – note 4 things:
1) that creation of first CompletableFuture is (only) via supplyAsync method that intenally uses a thread pool: either implicit ForkJoinPool (if not specified as above) or takes explicit executor
2) the CompletableFuture creation and execution is async and non blocking (main thread immediately move forward)
3) as soon as the CompletableFuture is created then the supplied task is executed (worker-1 started producing text before main thread started to sleep)
4) if all the subsequent methods e.g. thenApply are not async then the same thread will be used to execute subsequent processing callback on that future completion (the same thread worker-1 executed all the steps)

main: started
19:13:56.653 [ForkJoinPool.commonPool-worker-1] INFO [daemon=true] 1: Producing text: a
main: created completable future, about to sleep
19:13:57.664 [ForkJoinPool.commonPool-worker-1] INFO [daemon=true] 2: Calculating string length: 1
19:13:58.664 [ForkJoinPool.commonPool-worker-1] INFO [daemon=true] 3: Calculating circle perimeter: 6.283185307179586
19:13:59.667 [ForkJoinPool.commonPool-worker-1] INFO [daemon=true] 4: Printing: 6.283185307179586
main: finished

If we change the method to be all async to thenAplyAsynch, thenAcceptAsync and thenRunAsync then there is no quarantee that the same thread will execute all subsequent proccessing for that future – note below that worker-1 executed here first 2 tasks and worker-2 other 2 tasks:

main: started
19:04:37.022 [ForkJoinPool.commonPool-worker-1] [daemon=true] 1: Producing text: a
19:04:38.031 [ForkJoinPool.commonPool-worker-1] [daemon=true] 2: Calculating string length: 1
19:04:39.045 [ForkJoinPool.commonPool-worker-2] [daemon=true] 3: Calculating circle perimeter: 6.283185307179586
19:04:40.049 [ForkJoinPool.commonPool-worker-2] [daemon=true] 4: Printing: 6.283185307179586
DONE
main: finished

 

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s