Scenario 1. More tasks than CPU cores in parallel stream processing.
public class ParallelStreamMoreTasksThanThreads { private static final Logger LOGGER = LoggerFactory.getLogger(ParallelStreamMoreTasksThanThreads.class); public static void main(String[] args) throws InterruptedException { LOGGER.debug("Available cores: {}",Runtime.getRuntime().availableProcessors());// n=4 CPU cores // on my notebook //In general CPU core count = ForkJoin pool size that consists of 1 current and n-1 worker threads List<Integer> sleepSecondsList = Arrays.asList(3, 3, 3, 3, 3); // each parallel task will sleep 3 seconds // set list size to CPU core count + 1 to have more tasks than threads sleepSeconds("main", 15); Thread t0 = new Thread(createTask("t0", sleepSecondsList)); t0.start(); t0.join(); // wait until thread t0 finishes (and all ForkJoin pool workers) LOGGER.debug("Thread t0 finished"); sleepSeconds("main", 100); } private static Runnable createTask(String id, final List<Integer> sleepSecondsList) { return () -> { sleepSecondsList .parallelStream() //parallel processing by current and fork-join threads .forEach(t -> sleepSeconds(id, t)); }; } private static void sleepSeconds(String id, int sleepSeconds) { LOGGER.debug("{} about to sleep {}", id, sleepSeconds); try{ Thread.sleep(1000 * sleepSeconds); } catch (InterruptedException e) { e.printStackTrace(); } } }
Output:
2015-10-25 10:47:51,073|main |Thread id=1 |Available cores: 4
2015-10-25 10:47:51,074|main |Thread id=1 |main about to sleep 152015-10-25 10:48:06,143|Thread-0 |Thread id=10|t0 about to sleep 3
2015-10-25 10:48:06,143|ForkJoinPool.commonPool-worker-3|Thread id=13|t0 about to sleep 3
2015-10-25 10:48:06,143|ForkJoinPool.commonPool-worker-1|Thread id=11|t0 about to sleep 3
2015-10-25 10:48:06,143|ForkJoinPool.commonPool-worker-2|Thread id=12|t0 about to sleep 32015-10-25 10:48:09,143|ForkJoinPool.commonPool-worker-1|Thread id=11|t0 about to sleep 3 <- completes work in next 3 seconds "slot"
2015-10-25 10:48:12,144|main |Thread id=1 |Thread t0 finished
2015-10-25 10:48:12,144|main |Thread id=1 |main about to sleep 100
Note: JVM initially started with 10 threads (the main thread and other daemon threads). The main thread started new t0 thread to process 5 tasks using parallelStream()
. Each task was about to sleep for 3 seconds. Internally parallelStream()
started ForkJoinPool
with 3 ForkJoinWorkerThread
s. Parallel stream processed first 4 tasks parallelly in 3 seconds using the main thread and 3 worker threads. After 3 seconds 4 threads finished processing first 4 tasks and since there was one task left it was picked by one of four threads for processing. All tasks were processed in around 6 seconds by 4 threads that were started apart from the main thread. See the JVisualVM screenshots for details:
Scenario 2. Reusing ForkJoin pool worker threads for next processing
This time after first 3 threads (t0-t2) finished parallelStream() processing we started threads new three threads (t3-t5) to process new tasks. As the ForkJoin pool worker has not yet been stopped the same workers were reused for subsequent processing:
public static void main(String[] args) throws InterruptedException { LOGGER.debug("Available cores: {}", Runtime.getRuntime().availableProcessors()); // n=4 CPU cores // on my notebook // In general CPU core count = ForkJoin pool size that consists of 1 current and n-1 worker threads List<Integer> sleepSecondsList = Arrays.asList(5, 5);// assuming all 3 workers reuse then lets start // 6 tasks for 3 manually started threads + 3 workers what gives 2 tasks (list size = 2) per thread sleepSeconds("main", 15); Thread t0 = new Thread(createTask("t0", sleepSecondsList)); Thread t1 = new Thread(createTask("t1", sleepSecondsList)); Thread t2 = new Thread(createTask("t2", sleepSecondsList)); t0.start(); t1.start(); t2.start(); t0.join(); t1.join(); t2.join(); LOGGER.debug("Threads t0, t1, t2 finished"); sleepSeconds("main", 1); // fork-join pool keeps all its worker threads for a couple of seconds // so that they can be reused Thread t3 = new Thread(createTask("t3", sleepSecondsList)); Thread t4 = new Thread(createTask("t4", sleepSecondsList)); Thread t5 = new Thread(createTask("t5", sleepSecondsList)); t3.start(); t4.start(); t5.start(); t3.join(); t4.join(); t5.join(); LOGGER.debug("Threads t3, t4, t5 finished"); sleepSeconds("main", 100); }
Output:
2015-11-02 21:09:55,822|main |Thread id=1 |Available cores: 4
2015-11-02 21:09:55,823|main |Thread id=1 |main about to sleep 152015-11-02 21:10:10,899|Thread-0 |Thread id=10|t0 about to sleep 5
2015-11-02 21:10:10,899|Thread-2 |Thread id=12|t2 about to sleep 5
2015-11-02 21:10:10,899|ForkJoinPool.commonPool-worker-1|Thread id=13|t2 about to sleep 5
2015-11-02 21:10:10,900|ForkJoinPool.commonPool-worker-3|Thread id=15|t0 about to sleep 5
2015-11-02 21:10:10,900|Thread-1 |Thread id=11|t1 about to sleep 5
2015-11-02 21:10:10,900|ForkJoinPool.commonPool-worker-2|Thread id=14|t1 about to sleep 52015-11-02 21:10:15,901|main |Thread id=1 |Threads t0, t1, t2 finished
2015-11-02 21:10:15,902|main |Thread id=1 |main about to sleep 1
2015-11-02 21:10:16,903|Thread-3 |Thread id=16|t3 about to sleep 5
2015-11-02 21:10:16,903|ForkJoinPool.commonPool-worker-3|Thread id=15|t3 about to sleep 5
2015-11-02 21:10:16,904|Thread-4 |Thread id=17|t4 about to sleep 5
2015-11-02 21:10:16,904|ForkJoinPool.commonPool-worker-2|Thread id=14|t4 about to sleep 5
2015-11-02 21:10:16,905|Thread-5 |Thread id=18|t5 about to sleep 5
2015-11-02 21:10:16,905|ForkJoinPool.commonPool-worker-1|Thread id=13|t5 about to sleep 5
Note that ForkJoinPool threads were reused (thread ids: 13, 14 and 15)
Notice on the JVisualVM screenshot below that forkjoin pool keeps all its worker threads for a couple of seconds so that they can be reused:
Java started initially with 9 daemon threads and one main thread from the application. Then 6 threads were explicitly started to process 2 tasks each. Each thread called parallelStream() that internally started a shared fork join pool to create 3 shared worker threads. That makes 19 threads started in total. See screenshot:
Scenario 3. Creating new ForkJoin pool when longer delay between subsequent parallelStream processing
This time after first 3 threads (t0-t2) finished then main thread waited much longer (here 13 seconds) and then started new 3 threads (t3-t5). 13 seconds was apparently long enough to stop all the 3 forkjoin pool workers and new forkjoin workers needed to be created to handle parallel processing.
... LOGGER.debug("Threads t0, t1, t2 finished"); sleepSeconds("main", 13); // 13 seconds was apparently enough to stop all the 3 fork-join workers // and new fork-join workers needed to be created to handle parallel processing Thread t3 = new Thread(createTask("t3", sleepSecondsList)); ...
2015-11-02 21:15:36,527|main |Thread id=1 |Available cores: 4
2015-11-02 21:15:36,528|main |Thread id=1 |main about to sleep 152015-11-02 21:15:51,598|Thread-1 |Thread id=11|t1 about to sleep 5
2015-11-02 21:15:51,598|ForkJoinPool.commonPool-worker-2|Thread id=15|t0 about to sleep 5
2015-11-02 21:15:51,598|ForkJoinPool.commonPool-worker-3|Thread id=13|t2 about to sleep 5
2015-11-02 21:15:51,598|Thread-0 |Thread id=10|t0 about to sleep 5
2015-11-02 21:15:51,598|Thread-2 |Thread id=12|t2 about to sleep 5
2015-11-02 21:15:51,598|ForkJoinPool.commonPool-worker-1|Thread id=14|t1 about to sleep 52015-11-02 21:15:56,600|main |Thread id=1 |Threads t0, t1, t2 finished
2015-11-02 21:15:56,602|main |Thread id=1 |main about to sleep 132015-11-02 21:16:09,605|Thread-4 |Thread id=17|t4 about to sleep 5
2015-11-02 21:16:09,605|Thread-3 |Thread id=16|t3 about to sleep 5
2015-11-02 21:16:09,606|Thread-5 |Thread id=18|t5 about to sleep 5
2015-11-02 21:16:09,606|ForkJoinPool.commonPool-worker-1|Thread id=19|t4 about to sleep 5
2015-11-02 21:16:09,606|ForkJoinPool.commonPool-worker-3|Thread id=21|t3 about to sleep 5
2015-11-02 21:16:09,607|ForkJoinPool.commonPool-worker-2|Thread id=20|t5 about to sleep 52015-11-02 21:16:14,611|main |Thread id=1 |Threads t3, t4, t5 finished
2015-11-02 21:16:14,612|main |Thread id=1 |main about to sleep 100
Note: ForkJoinPool threads were not reused: e.g.: ForkJoinPool.commonPool-worker-1 had thread id 14 and later 19.
Again Java started initially with 9 daemon threads and one main thread from the application. Then 6 threads were explicitly started to call parallelStream. In this case fork join pool workers (3 threads) were started and stopped twice. That makes 22 threads started in total. See screenshot: