Category: fork join

Java calling multiple times parallel stream (ForkJoin pool) vs number of threads started

Scenario 1. More tasks than CPU cores in parallel stream processing.

public class ParallelStreamMoreTasksThanThreads {

    private static final Logger LOGGER 
                  = LoggerFactory.getLogger(ParallelStreamMoreTasksThanThreads.class);

    public static void main(String[] args) throws InterruptedException {
      LOGGER.debug("Available cores: {}",Runtime.getRuntime().availableProcessors());// n=4 CPU cores 
                                                                                    // on my notebook

    //In general CPU core count = ForkJoin pool size that consists of 1 current and n-1 worker threads

      List<Integer> sleepSecondsList  
                = Arrays.asList(3, 3, 3, 3, 3); // each parallel task will sleep 3 seconds
                            // set list size to CPU core count + 1 to have more tasks than threads

        sleepSeconds("main", 15);

        Thread t0 = new Thread(createTask("t0", sleepSecondsList));
        t0.start(); 

        t0.join(); // wait until thread t0 finishes (and all ForkJoin pool workers)
        LOGGER.debug("Thread t0 finished");

        sleepSeconds("main", 100);
    }

    private static Runnable createTask(String id, final List<Integer> sleepSecondsList) {
        return () -> {
            sleepSecondsList
                .parallelStream() //parallel processing by current and fork-join threads
                .forEach(t -> sleepSeconds(id, t));
        };
    }

    private static void sleepSeconds(String id, int sleepSeconds) {
        LOGGER.debug("{} about to sleep {}", id, sleepSeconds);
        try{
            Thread.sleep(1000 * sleepSeconds);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Output:
2015-10-25 10:47:51,073|main |Thread id=1 |Available cores: 4
2015-10-25 10:47:51,074|main |Thread id=1 |main about to sleep 15

2015-10-25 10:48:06,143|Thread-0 |Thread id=10|t0 about to sleep 3
2015-10-25 10:48:06,143|ForkJoinPool.commonPool-worker-3|Thread id=13|t0 about to sleep 3
2015-10-25 10:48:06,143|ForkJoinPool.commonPool-worker-1|Thread id=11|t0 about to sleep 3
2015-10-25 10:48:06,143|ForkJoinPool.commonPool-worker-2|Thread id=12|t0 about to sleep 3

2015-10-25 10:48:09,143|ForkJoinPool.commonPool-worker-1|Thread id=11|t0 about to sleep 3 <- completes work in next 3 seconds "slot"

2015-10-25 10:48:12,144|main |Thread id=1 |Thread t0 finished
2015-10-25 10:48:12,144|main |Thread id=1 |main about to sleep 100

parallel-stream--fork-join-threads

Note: JVM initially started with 10 threads (the main thread and other daemon threads). The main thread started new t0 thread to process 5 tasks using parallelStream(). Each task was about to sleep for 3 seconds. Internally parallelStream() started ForkJoinPool with 3 ForkJoinWorkerThreads. Parallel stream processed first 4 tasks parallelly in 3 seconds using the main thread and 3 worker threads. After 3 seconds 4 threads finished processing first 4 tasks and since there was one task left it was picked by one of four threads for processing. All tasks were processed in around 6 seconds by 4 threads that were started apart from the main thread. See the JVisualVM screenshots for details:

parallel-stream--fork-join-threads--threads-started

Scenario 2. Reusing ForkJoin pool worker threads for next processing

This time after first 3 threads (t0-t2) finished parallelStream() processing we started threads new three threads (t3-t5) to process new tasks. As the ForkJoin pool worker has not yet been stopped the same workers were reused for subsequent processing:

    public static void main(String[] args) throws InterruptedException {
      LOGGER.debug("Available cores: {}", Runtime.getRuntime().availableProcessors()); // n=4 CPU cores 
                                                                                       // on my notebook

      // In general CPU core count = ForkJoin pool size that consists of 1 current and n-1 worker threads

      List<Integer> sleepSecondsList = Arrays.asList(5, 5);// assuming all 3 workers reuse then lets start 
      // 6 tasks for 3 manually started threads + 3 workers what gives 2 tasks (list size = 2) per thread 

        sleepSeconds("main", 15);

        Thread t0 = new Thread(createTask("t0", sleepSecondsList));
        Thread t1 = new Thread(createTask("t1", sleepSecondsList));
        Thread t2 = new Thread(createTask("t2", sleepSecondsList));
        t0.start();
        t1.start();
        t2.start();

        t0.join();
        t1.join();
        t2.join();
        LOGGER.debug("Threads t0, t1, t2 finished");

        sleepSeconds("main", 1); // fork-join pool keeps all its worker threads for a couple of seconds 
                                 // so that they can be reused

        Thread t3 = new Thread(createTask("t3", sleepSecondsList));
        Thread t4 = new Thread(createTask("t4", sleepSecondsList));
        Thread t5 = new Thread(createTask("t5", sleepSecondsList));
        t3.start();
        t4.start();
        t5.start();

        t3.join();
        t4.join();
        t5.join();

        LOGGER.debug("Threads t3, t4, t5 finished");
        sleepSeconds("main", 100);
    }

Output:
2015-11-02 21:09:55,822|main |Thread id=1 |Available cores: 4
2015-11-02 21:09:55,823|main |Thread id=1 |main about to sleep 15

2015-11-02 21:10:10,899|Thread-0 |Thread id=10|t0 about to sleep 5
2015-11-02 21:10:10,899|Thread-2 |Thread id=12|t2 about to sleep 5
2015-11-02 21:10:10,899|ForkJoinPool.commonPool-worker-1|Thread id=13|t2 about to sleep 5
2015-11-02 21:10:10,900|ForkJoinPool.commonPool-worker-3|Thread id=15|t0 about to sleep 5
2015-11-02 21:10:10,900|Thread-1 |Thread id=11|t1 about to sleep 5
2015-11-02 21:10:10,900|ForkJoinPool.commonPool-worker-2|Thread id=14|t1 about to sleep 5

2015-11-02 21:10:15,901|main |Thread id=1 |Threads t0, t1, t2 finished
2015-11-02 21:10:15,902|main |Thread id=1 |main about to sleep 1
2015-11-02 21:10:16,903|Thread-3 |Thread id=16|t3 about to sleep 5
2015-11-02 21:10:16,903|ForkJoinPool.commonPool-worker-3|Thread id=15|t3 about to sleep 5
2015-11-02 21:10:16,904|Thread-4 |Thread id=17|t4 about to sleep 5
2015-11-02 21:10:16,904|ForkJoinPool.commonPool-worker-2|Thread id=14|t4 about to sleep 5
2015-11-02 21:10:16,905|Thread-5 |Thread id=18|t5 about to sleep 5
2015-11-02 21:10:16,905|ForkJoinPool.commonPool-worker-1|Thread id=13|t5 about to sleep 5

Note that ForkJoinPool threads were reused (thread ids: 13, 14 and 15)

Notice on the JVisualVM screenshot below that forkjoin pool keeps all its worker threads for a couple of seconds so that they can be reused:

parallel-stream--fork-join-worker-pool-reuse

Java started initially with 9 daemon threads and one main thread from the application. Then 6 threads were explicitly started to process 2 tasks each. Each thread called parallelStream() that internally started a shared fork join pool to create 3 shared worker threads. That makes 19 threads started in total. See screenshot:

parallel-stream--fork-join-worker-pool-reuse--threads-started

Scenario 3. Creating new ForkJoin pool when longer delay between subsequent parallelStream processing

This time after first 3 threads (t0-t2) finished then main thread waited much longer (here 13 seconds) and then started new 3 threads (t3-t5). 13 seconds was apparently long enough to stop all the 3 forkjoin pool workers and new forkjoin workers needed to be created to handle parallel processing.

        ...
        LOGGER.debug("Threads t0, t1, t2 finished");

        sleepSeconds("main", 13); // 13 seconds was apparently enough to stop all the 3 fork-join workers 
                                // and new fork-join workers needed to be created to handle parallel processing

        Thread t3 = new Thread(createTask("t3", sleepSecondsList));
        ...

2015-11-02 21:15:36,527|main |Thread id=1 |Available cores: 4
2015-11-02 21:15:36,528|main |Thread id=1 |main about to sleep 15

2015-11-02 21:15:51,598|Thread-1 |Thread id=11|t1 about to sleep 5
2015-11-02 21:15:51,598|ForkJoinPool.commonPool-worker-2|Thread id=15|t0 about to sleep 5
2015-11-02 21:15:51,598|ForkJoinPool.commonPool-worker-3|Thread id=13|t2 about to sleep 5
2015-11-02 21:15:51,598|Thread-0 |Thread id=10|t0 about to sleep 5
2015-11-02 21:15:51,598|Thread-2 |Thread id=12|t2 about to sleep 5
2015-11-02 21:15:51,598|ForkJoinPool.commonPool-worker-1|Thread id=14|t1 about to sleep 5

2015-11-02 21:15:56,600|main |Thread id=1 |Threads t0, t1, t2 finished
2015-11-02 21:15:56,602|main |Thread id=1 |main about to sleep 13

2015-11-02 21:16:09,605|Thread-4 |Thread id=17|t4 about to sleep 5
2015-11-02 21:16:09,605|Thread-3 |Thread id=16|t3 about to sleep 5
2015-11-02 21:16:09,606|Thread-5 |Thread id=18|t5 about to sleep 5
2015-11-02 21:16:09,606|ForkJoinPool.commonPool-worker-1|Thread id=19|t4 about to sleep 5
2015-11-02 21:16:09,606|ForkJoinPool.commonPool-worker-3|Thread id=21|t3 about to sleep 5
2015-11-02 21:16:09,607|ForkJoinPool.commonPool-worker-2|Thread id=20|t5 about to sleep 5

2015-11-02 21:16:14,611|main |Thread id=1 |Threads t3, t4, t5 finished
2015-11-02 21:16:14,612|main |Thread id=1 |main about to sleep 100

Note: ForkJoinPool threads were not reused: e.g.: ForkJoinPool.commonPool-worker-1 had thread id 14 and later 19.

parallel-stream--new-fork-join-worker-pool

Again Java started initially with 9 daemon threads and one main thread from the application. Then 6 threads were explicitly started to call parallelStream. In this case fork join pool workers (3 threads) were started and stopped twice. That makes 22 threads started in total. See screenshot:

parallel-stream--new-fork-join-worker-pool--threads-started