Multiple approaches to concurrent processing in Java

Goal: Lets assume we want to execute 2 long running operations: download content and parse it and we want to do it for the list of resources.

The blog points multiple approaches:
1) Thread(s)
2) ExecutorService to submit Callable and return blocking Future get()
3) ExecutionCompletionService implementing CompletionService to submit Callable and returning Future.  CompletionService has take or poll methods waiting and returning first completed task (returned Future’s get() is not blocking)
4) Parallel streams
5) CompletableFuture

public class TestConcurrent {

    private static final Logger LOGGER = LoggerFactory.getLogger(TestConcurrent.class);

    public static void main(String[] args) {
        LOGGER.info("Started");
        long startMillis = System.currentTimeMillis();

        // for each of the resource: download it and parse it and aggregate results to list:
        List<Integer> results = ...

        long stopMillis = System.currentTimeMillis();
        LOGGER.info("Finished in {} seconds, results={}", (stopMillis - startMillis) / 1000, results);
    }

Lets define the download and parse methods and simulate long running execution using configurable TimeUnit.SECONDS.sleep(seconds).

static int download(int id, int seconds) {
    LOGGER.info("Downloading {} and sleeping {} second(s)", id, seconds);
    try {
        TimeUnit.SECONDS.sleep(seconds);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    LOGGER.info("Downloaded {}", id);
    return id;
}

static int parse(int id, int seconds) {
    LOGGER.info("Parsing {} and sleeping {} second(s)", id, seconds);
    try {
        TimeUnit.SECONDS.sleep(seconds);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    LOGGER.info("Parsed {}", id);
    return id;
}

In order to make the scenario even more representative between approaches lets define number of resources as 3 and each of the resource needed to be downloaded before it can be parsed. Moreover, 1st resource will be downloaded in 3 seconds, 2nd in two seconds and 3rd in 3 seconds. Conversely, it takes 1 second to parse 1st resource, 2 seconds to parse 2nd resource and 3 seconds to parse 3rd resource.

Obviously we do not want to execute the tasks sequentially using the same thread as it would take too long (12 seconds)

private static List<Integer> mapSequentiallyWithOnlyMainThread() {
    List<Integer> numbers = Arrays.asList(1, 2, 3);

    return numbers
        .stream()
        .map(n -> download(n, numbers.size() - n + 1))
        .map(n -> parse(n, n))
        .collect(Collectors.toList());
}
17:54:06.281 [main] Started
17:54:06.391 [main] Downloading 1 and sleeping 3 second(s)
17:54:09.394 [main] Downloaded 1
17:54:09.394 [main] Parsing 1 and sleeping 1 second(s)
17:54:10.394 [main] Parsed 1
17:54:10.394 [main] Downloading 2 and sleeping 2 second(s)
17:54:12.394 [main] Downloaded 2
17:54:12.394 [main] Parsing 2 and sleeping 2 second(s)
17:54:14.394 [main] Parsed 2
17:54:14.394 [main] Downloading 3 and sleeping 1 second(s)
17:54:15.394 [main] Downloaded 3
17:54:15.395 [main] Parsing 3 and sleeping 3 second(s)
17:54:18.395 [main] Parsed 3
17:54:18.395 [main] Finished in 12 seconds, results=[1, 2, 3]

So lets use multiple threads.

  1. Threads
    private static List<Integer> threads() {
        List<Integer> numbers = Arrays.asList(1, 2, 3);
    
        return numbers
            .stream()
            .map(n -> startThreadAndJoin(() -> download(n, numbers.size() - n + 1)))
            .map(n -> startThreadAndJoin(() -> parse(n, n)))
            .collect(Collectors.toList());
    }
    
    private static Integer startThreadAndJoin(Supplier<Integer> supplier) {
        AtomicInteger value = new AtomicInteger();
        Thread thread = new Thread(() -> {
            value.set(supplier.get());
        });
        thread.start();
        try {
            thread.join();
            return value.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }
    18:02:35.132 [main] Started
    18:02:35.244 [Thread-0] Downloading 1 and sleeping 3 second(s)
    18:02:38.246 [Thread-0] Downloaded 1
    18:02:38.248 [Thread-1] Parsing 1 and sleeping 1 second(s)
    18:02:39.248 [Thread-1] Parsed 1
    18:02:39.249 [Thread-2] Downloading 2 and sleeping 2 second(s)
    18:02:41.250 [Thread-2] Downloaded 2
    18:02:41.251 [Thread-3] Parsing 2 and sleeping 2 second(s)
    18:02:43.251 [Thread-3] Parsed 2
    18:02:43.252 [Thread-4] Downloading 3 and sleeping 1 second(s)
    18:02:44.252 [Thread-4] Downloaded 3
    18:02:44.253 [Thread-5] Parsing 3 and sleeping 3 second(s)
    18:02:47.253 [Thread-5] Parsed 3
    18:02:47.253 [main] Finished in 12 seconds, results=[1, 2, 3]
  2. ExecutorService
    private static List<Integer> executorServiceGet() {
        List<Integer> numbers = Arrays.asList(1, 2, 3);
    
        ExecutorService executorService = Executors.newFixedThreadPool(numbers.size());
        List<Future<Integer>> downloadedFutures = numbers
            .stream()
            .map(n -> {
                Callable<Integer> callable = () -> download(n, numbers.size() - n + 1);
                return executorService.submit(callable);
            })
            .collect(Collectors.toList());
    
        List<Future<Integer>> parsedFutures = numbers
            .stream()
            .map(n -> {
                Future<Integer> future = downloadedFutures.get(n - 1);
                try {
                    Integer value = future.get();
                    return executorService.submit(() -> parse(value, n));
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
                return null;
            })
            .collect(Collectors.toList());
    
        return parsedFutures
            .stream()
            .map(future -> {
                try {
                    return future.get();
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
                return null;
            })
            .collect(Collectors.toList());
    }
    
    18:10:11.988 [main] Started
    18:10:12.106 [pool-1-thread-1] Downloading 1 and sleeping 3 second(s)
    18:10:12.110 [pool-1-thread-2] Downloading 2 and sleeping 2 second(s)
    18:10:12.111 [pool-1-thread-3] Downloading 3 and sleeping 1 second(s)
    18:10:13.111 [pool-1-thread-3] Downloaded 3
    18:10:14.111 [pool-1-thread-2] Downloaded 2
    18:10:15.109 [pool-1-thread-1] Downloaded 1
    18:10:15.111 [pool-1-thread-3] Parsing 1 and sleeping 1 second(s)
    18:10:15.112 [pool-1-thread-2] Parsing 2 and sleeping 2 second(s)
    18:10:15.112 [pool-1-thread-1] Parsing 3 and sleeping 3 second(s)
    18:10:16.112 [pool-1-thread-3] Parsed 1
    18:10:17.113 [pool-1-thread-2] Parsed 2
    18:10:18.112 [pool-1-thread-1] Parsed 3
    18:10:18.112 [main] Finished in 6 seconds, results=[1, 2, 3]
  3. ExecutionCompletionService
    private static List<Integer> executionCompletionServiceTakeAndGet() {
        List<Integer> numbers = Arrays.asList(1, 2, 3);
    
        ExecutorService executorService = Executors.newFixedThreadPool(numbers.size());
        ExecutorCompletionService<Integer> executorCompletionService = new ExecutorCompletionService<>(executorService);
    
        numbers
            .stream()
            .forEach(n -> executorCompletionService.submit(() -> download(n, numbers.size() - n + 1)));
    
        numbers
            .stream()
            .forEach(ignored -> {
                try {
                    Future<Integer>future = executorCompletionService.take();
                    Integer value = future.get();
                    executorCompletionService.submit(() -> parse(value, value));
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            });
    
        return numbers
            .stream()
            .map(ignore -> {
                try {
                    Future<Integer> future = executorCompletionService.take();
                    return future.get();
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
                return null;
            })
            .collect(Collectors.toList());
    }
    
    18:18:16.548 [main] Started
    18:18:16.655 [pool-1-thread-2] Downloading 2 and sleeping 2 second(s)
    18:18:16.655 [pool-1-thread-3] Downloading 3 and sleeping 1 second(s)
    18:18:16.655 [pool-1-thread-1] Downloading 1 and sleeping 3 second(s)
    18:18:17.658 [pool-1-thread-3] Downloaded 3
    18:18:17.660 [pool-1-thread-3] Parsing 3 and sleeping 3 second(s)
    18:18:18.658 [pool-1-thread-2] Downloaded 2
    18:18:18.658 [pool-1-thread-2] Parsing 2 and sleeping 2 second(s)
    18:18:19.658 [pool-1-thread-1] Downloaded 1
    18:18:19.658 [pool-1-thread-1] Parsing 1 and sleeping 1 second(s)
    18:18:20.658 [pool-1-thread-2] Parsed 2
    18:18:20.658 [pool-1-thread-1] Parsed 1
    18:18:20.660 [pool-1-thread-3] Parsed 3
    18:18:20.660 [main] Finished in 4 seconds, results=[2, 1, 3]
  4. Parallel streams
    private static List<Integer> mapParallel() {
        List<Integer> numbers = Arrays.asList(1, 2, 3);
    
        return numbers
            .parallelStream()
            .map(n -> download(n, numbers.size() - n + 1))
            .map(n -> parse(n, n))
            .collect(Collectors.toList());
    }
    18:23:51.300 [main] Started
    18:23:51.413 [main] Downloading 2 and sleeping 2 second(s)
    18:23:51.413 [ForkJoinPool.commonPool-worker-1] Downloading 1 and sleeping 3 second(s)
    18:23:51.413 [ForkJoinPool.commonPool-worker-2] Downloading 3 and sleeping 1 second(s)
    18:23:52.416 [ForkJoinPool.commonPool-worker-2] Downloaded 3
    18:23:52.416 [ForkJoinPool.commonPool-worker-2] Parsing 3 and sleeping 3 second(s)
    18:23:53.416 [main] Downloaded 2
    18:23:53.416 [main] Parsing 2 and sleeping 2 second(s)
    18:23:54.416 [ForkJoinPool.commonPool-worker-1] Downloaded 1
    18:23:54.416 [ForkJoinPool.commonPool-worker-1] Parsing 1 and sleeping 1 second(s)
    18:23:55.416 [main] Parsed 2
    18:23:55.416 [ForkJoinPool.commonPool-worker-1] Parsed 1
    18:23:55.416 [ForkJoinPool.commonPool-worker-2] Parsed 3
    18:23:55.416 [main] Finished in 4 seconds, results=[1, 2, 3]
  5. CompletableFuture
    private static List<Integer> completableFuture() {
        List<Integer> numbers = Arrays.asList(1, 2, 3);
    
        List<CompletableFuture<Integer>> cfs = numbers
            .stream()
            .map(n ->
                CompletableFuture.supplyAsync(() -> download(n, numbers.size() - n + 1))
                    .thenApply(id -> parse(id, n)))
            .collect(Collectors.toList());
    
        CompletableFuture<List<Integer>> allResultsCF = CompletableFuture.allOf(cfs.toArray(new CompletableFuture[numbers.size()]))
            .thenApply(v -> cfs
                .stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
            );
    
        return allResultsCF.join();
    
18:25:25.722 [main] Started
18:25:25.867 [ForkJoinPool.commonPool-worker-1] Downloading 1 and sleeping 3 second(s)
18:25:25.869 [ForkJoinPool.commonPool-worker-3] Downloading 3 and sleeping 1 second(s)
18:25:25.868 [ForkJoinPool.commonPool-worker-2] Downloading 2 and sleeping 2 second(s)
18:25:26.871 [ForkJoinPool.commonPool-worker-3] Downloaded 3
18:25:26.872 [ForkJoinPool.commonPool-worker-3] Parsing 3 and sleeping 3 second(s)
18:25:27.871 [ForkJoinPool.commonPool-worker-2] Downloaded 2
18:25:27.871 [ForkJoinPool.commonPool-worker-2] Parsing 2 and sleeping 2 second(s)
18:25:28.871 [ForkJoinPool.commonPool-worker-1] Downloaded 1
18:25:28.871 [ForkJoinPool.commonPool-worker-1] Parsing 1 and sleeping 1 second(s)
18:25:29.871 [ForkJoinPool.commonPool-worker-1] Parsed 1
18:25:29.871 [ForkJoinPool.commonPool-worker-2] Parsed 2
18:25:29.872 [ForkJoinPool.commonPool-worker-3] Parsed 3
18:25:29.874 [main] Finished in 4 seconds, results=[1, 2, 3]

If we provide our own pool we get:

CompletableFuture.supplyAsync(() -> download(n, numbers.size() - n + 1), executorService)
    .thenApplyAsync(id -> parse(id, n), executorService)
18:31:47.902 [main] Started
18:31:47.995 [pool-1-thread-1] Downloading 1 and sleeping 3 second(s)
18:31:47.997 [pool-1-thread-3] Downloading 3 and sleeping 1 second(s)
18:31:47.997 [pool-1-thread-2] Downloading 2 and sleeping 2 second(s)
18:31:48.999 [pool-1-thread-3] Downloaded 3
18:31:48.999 [pool-1-thread-3] Parsing 3 and sleeping 3 second(s)
18:31:49.999 [pool-1-thread-2] Downloaded 2
18:31:49.999 [pool-1-thread-2] Parsing 2 and sleeping 2 second(s)
18:31:50.999 [pool-1-thread-1] Downloaded 1
18:31:50.999 [pool-1-thread-1] Parsing 1 and sleeping 1 second(s)
18:31:51.999 [pool-1-thread-1] Parsed 1
18:31:51.999 [pool-1-thread-3] Parsed 3
18:31:51.999 [pool-1-thread-2] Parsed 2
18:31:52.001 [main] Finished in 4 seconds, results=[1, 2, 3]

 

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s