Tag: java

Unions and default value in apache avro serialization and deserialization

Initial avro schema (schema/user.avsc) defines a User record with a name field only.

{
  "namespace": "com.bawi.avro.model",
  "type": "record",
  "name": "User",
  "fields": [
    {
      "name": "name",
      "type": "string"
    }
  ]
}

Maven pom.xml defines avro dependency

        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.8.1</version>
        </dependency>

so we can serialize the User data in Java to disc to user.avro file

        Schema schema = new Schema.Parser().parse(new File("schema/user.avsc"));
        File avroFile = new File("target/user.avro");
        GenericRecord user = new GenericData.Record(schema);
        user.put("name", "Alyssa");
        DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
        DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
        dataFileWriter.create(schema, avroFile);
        dataFileWriter.append(user);
        dataFileWriter.close();

we can read (deserialize) User from the disc either by Java

        DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
        DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(avroFile, datumReader);
        GenericRecord user = null;
        while (dataFileReader.hasNext()) {
            user = dataFileReader.next(user);
            System.out.println(user);
        }

or by using avro-utils jar that can be downloaded by maven when declared maven test dependency:

        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-tools</artifactId>
            <version>1.8.1</version>
            <scope>test</scope>
        </dependency>

and running with ‘tojson’ argument

me@MacBook:~/dev/my-projects/my-avro$ java -jar /Users/me/.m2/repository/org/apache/avro/avro-tools/1.8.1/avro-tools-1.8.1.jar tojson users.avro 
{"name":"Alyssa"}

Then we will add a new favorite_number element to the schema:

{
  "namespace": "com.bawi.avro.model",
  "type": "record",
  "name": "User",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "favorite_number",
      "type": "int"
    }
  ]
}

and run the deserialization Java code for existing data in the user.avro but against the new schema, then we get:

Exception in thread "main" org.apache.avro.AvroTypeException: Found com.bawi.avro.model.User, expecting com.bawi.avro.model.User, missing required field favorite_number

since the favorite_number does not exist in avro file.

Adding only a union of int and null value does not help to get rid the error above.

The solution is to add a default value with a union for favorite_number e.g.:

    {
      "name": "favorite_number",
      "type": [
        "null",
        "int"
      ],
      "default": null
    }

to get: {“name”: “Alyssa”, “favorite_number”: null}
or add

    {
      "name": "favorite_number",
      "type": "int",
      "default": 0
    }

to get: {“name”: “Alyssa”, “favorite_number”: 0}

Please note that placing int as first argument of a union and having null as default value such as:

    {
      "name": "favorite_number",
      "type": [
        "int",
        "null"
      ],
      "default": null
    }

gives an error:

Exception in thread "main" org.apache.avro.AvroTypeException: Non-numeric default value for int: null

or

Exception in thread "main" org.apache.avro.AvroTypeException: Non-null default value for null type: 0

when

    {
      "name": "favorite_number",
      "type": [
        "null",
        "int"
      ],
      "default": 0
    }

as described in https://avro.apache.org/docs/1.7.7/spec.html#Unions

Advertisements

Multiple approaches to concurrent processing in Java

Goal: Lets assume we want to execute 2 long running operations: download content and parse it and we want to do it for the list of resources.

The blog points multiple approaches:
1) Thread(s)
2) ExecutorService to submit Callable and return blocking Future get()
3) ExecutionCompletionService implementing CompletionService to submit Callable and returning Future.  CompletionService has take or poll methods waiting and returning first completed task (returned Future’s get() is not blocking)
4) Parallel streams
5) CompletableFuture

public class TestConcurrent {

    private static final Logger LOGGER = LoggerFactory.getLogger(TestConcurrent.class);

    public static void main(String[] args) {
        LOGGER.info("Started");
        long startMillis = System.currentTimeMillis();

        // for each of the resource: download it and parse it and aggregate results to list:
        List<Integer> results = ...

        long stopMillis = System.currentTimeMillis();
        LOGGER.info("Finished in {} seconds, results={}", (stopMillis - startMillis) / 1000, results);
    }

Lets define the download and parse methods and simulate long running execution using configurable TimeUnit.SECONDS.sleep(seconds).

static int download(int id, int seconds) {
    LOGGER.info("Downloading {} and sleeping {} second(s)", id, seconds);
    try {
        TimeUnit.SECONDS.sleep(seconds);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    LOGGER.info("Downloaded {}", id);
    return id;
}

static int parse(int id, int seconds) {
    LOGGER.info("Parsing {} and sleeping {} second(s)", id, seconds);
    try {
        TimeUnit.SECONDS.sleep(seconds);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    LOGGER.info("Parsed {}", id);
    return id;
}

In order to make the scenario even more representative between approaches lets define number of resources as 3 and each of the resource needed to be downloaded before it can be parsed. Moreover, 1st resource will be downloaded in 3 seconds, 2nd in two seconds and 3rd in 3 seconds. Conversely, it takes 1 second to parse 1st resource, 2 seconds to parse 2nd resource and 3 seconds to parse 3rd resource.

Obviously we do not want to execute the tasks sequentially using the same thread as it would take too long (12 seconds)

private static List<Integer> mapSequentiallyWithOnlyMainThread() {
    List<Integer> numbers = Arrays.asList(1, 2, 3);

    return numbers
        .stream()
        .map(n -> download(n, numbers.size() - n + 1))
        .map(n -> parse(n, n))
        .collect(Collectors.toList());
}
17:54:06.281 [main] Started
17:54:06.391 [main] Downloading 1 and sleeping 3 second(s)
17:54:09.394 [main] Downloaded 1
17:54:09.394 [main] Parsing 1 and sleeping 1 second(s)
17:54:10.394 [main] Parsed 1
17:54:10.394 [main] Downloading 2 and sleeping 2 second(s)
17:54:12.394 [main] Downloaded 2
17:54:12.394 [main] Parsing 2 and sleeping 2 second(s)
17:54:14.394 [main] Parsed 2
17:54:14.394 [main] Downloading 3 and sleeping 1 second(s)
17:54:15.394 [main] Downloaded 3
17:54:15.395 [main] Parsing 3 and sleeping 3 second(s)
17:54:18.395 [main] Parsed 3
17:54:18.395 [main] Finished in 12 seconds, results=[1, 2, 3]

So lets use multiple threads.

  1. Threads
    private static List<Integer> threads() {
        List<Integer> numbers = Arrays.asList(1, 2, 3);
    
        return numbers
            .stream()
            .map(n -> startThreadAndJoin(() -> download(n, numbers.size() - n + 1)))
            .map(n -> startThreadAndJoin(() -> parse(n, n)))
            .collect(Collectors.toList());
    }
    
    private static Integer startThreadAndJoin(Supplier<Integer> supplier) {
        AtomicInteger value = new AtomicInteger();
        Thread thread = new Thread(() -> {
            value.set(supplier.get());
        });
        thread.start();
        try {
            thread.join();
            return value.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }
    18:02:35.132 [main] Started
    18:02:35.244 [Thread-0] Downloading 1 and sleeping 3 second(s)
    18:02:38.246 [Thread-0] Downloaded 1
    18:02:38.248 [Thread-1] Parsing 1 and sleeping 1 second(s)
    18:02:39.248 [Thread-1] Parsed 1
    18:02:39.249 [Thread-2] Downloading 2 and sleeping 2 second(s)
    18:02:41.250 [Thread-2] Downloaded 2
    18:02:41.251 [Thread-3] Parsing 2 and sleeping 2 second(s)
    18:02:43.251 [Thread-3] Parsed 2
    18:02:43.252 [Thread-4] Downloading 3 and sleeping 1 second(s)
    18:02:44.252 [Thread-4] Downloaded 3
    18:02:44.253 [Thread-5] Parsing 3 and sleeping 3 second(s)
    18:02:47.253 [Thread-5] Parsed 3
    18:02:47.253 [main] Finished in 12 seconds, results=[1, 2, 3]
  2. ExecutorService
    private static List<Integer> executorServiceGet() {
        List<Integer> numbers = Arrays.asList(1, 2, 3);
    
        ExecutorService executorService = Executors.newFixedThreadPool(numbers.size());
        List<Future<Integer>> downloadedFutures = numbers
            .stream()
            .map(n -> {
                Callable<Integer> callable = () -> download(n, numbers.size() - n + 1);
                return executorService.submit(callable);
            })
            .collect(Collectors.toList());
    
        List<Future<Integer>> parsedFutures = numbers
            .stream()
            .map(n -> {
                Future<Integer> future = downloadedFutures.get(n - 1);
                try {
                    Integer value = future.get();
                    return executorService.submit(() -> parse(value, n));
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
                return null;
            })
            .collect(Collectors.toList());
    
        return parsedFutures
            .stream()
            .map(future -> {
                try {
                    return future.get();
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
                return null;
            })
            .collect(Collectors.toList());
    }
    
    18:10:11.988 [main] Started
    18:10:12.106 [pool-1-thread-1] Downloading 1 and sleeping 3 second(s)
    18:10:12.110 [pool-1-thread-2] Downloading 2 and sleeping 2 second(s)
    18:10:12.111 [pool-1-thread-3] Downloading 3 and sleeping 1 second(s)
    18:10:13.111 [pool-1-thread-3] Downloaded 3
    18:10:14.111 [pool-1-thread-2] Downloaded 2
    18:10:15.109 [pool-1-thread-1] Downloaded 1
    18:10:15.111 [pool-1-thread-3] Parsing 1 and sleeping 1 second(s)
    18:10:15.112 [pool-1-thread-2] Parsing 2 and sleeping 2 second(s)
    18:10:15.112 [pool-1-thread-1] Parsing 3 and sleeping 3 second(s)
    18:10:16.112 [pool-1-thread-3] Parsed 1
    18:10:17.113 [pool-1-thread-2] Parsed 2
    18:10:18.112 [pool-1-thread-1] Parsed 3
    18:10:18.112 [main] Finished in 6 seconds, results=[1, 2, 3]
  3. ExecutionCompletionService
    private static List<Integer> executionCompletionServiceTakeAndGet() {
        List<Integer> numbers = Arrays.asList(1, 2, 3);
    
        ExecutorService executorService = Executors.newFixedThreadPool(numbers.size());
        ExecutorCompletionService<Integer> executorCompletionService = new ExecutorCompletionService<>(executorService);
    
        numbers
            .stream()
            .forEach(n -> executorCompletionService.submit(() -> download(n, numbers.size() - n + 1)));
    
        numbers
            .stream()
            .forEach(ignored -> {
                try {
                    Future<Integer>future = executorCompletionService.take();
                    Integer value = future.get();
                    executorCompletionService.submit(() -> parse(value, value));
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            });
    
        return numbers
            .stream()
            .map(ignore -> {
                try {
                    Future<Integer> future = executorCompletionService.take();
                    return future.get();
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
                return null;
            })
            .collect(Collectors.toList());
    }
    
    18:18:16.548 [main] Started
    18:18:16.655 [pool-1-thread-2] Downloading 2 and sleeping 2 second(s)
    18:18:16.655 [pool-1-thread-3] Downloading 3 and sleeping 1 second(s)
    18:18:16.655 [pool-1-thread-1] Downloading 1 and sleeping 3 second(s)
    18:18:17.658 [pool-1-thread-3] Downloaded 3
    18:18:17.660 [pool-1-thread-3] Parsing 3 and sleeping 3 second(s)
    18:18:18.658 [pool-1-thread-2] Downloaded 2
    18:18:18.658 [pool-1-thread-2] Parsing 2 and sleeping 2 second(s)
    18:18:19.658 [pool-1-thread-1] Downloaded 1
    18:18:19.658 [pool-1-thread-1] Parsing 1 and sleeping 1 second(s)
    18:18:20.658 [pool-1-thread-2] Parsed 2
    18:18:20.658 [pool-1-thread-1] Parsed 1
    18:18:20.660 [pool-1-thread-3] Parsed 3
    18:18:20.660 [main] Finished in 4 seconds, results=[2, 1, 3]
  4. Parallel streams
    private static List<Integer> mapParallel() {
        List<Integer> numbers = Arrays.asList(1, 2, 3);
    
        return numbers
            .parallelStream()
            .map(n -> download(n, numbers.size() - n + 1))
            .map(n -> parse(n, n))
            .collect(Collectors.toList());
    }
    18:23:51.300 [main] Started
    18:23:51.413 [main] Downloading 2 and sleeping 2 second(s)
    18:23:51.413 [ForkJoinPool.commonPool-worker-1] Downloading 1 and sleeping 3 second(s)
    18:23:51.413 [ForkJoinPool.commonPool-worker-2] Downloading 3 and sleeping 1 second(s)
    18:23:52.416 [ForkJoinPool.commonPool-worker-2] Downloaded 3
    18:23:52.416 [ForkJoinPool.commonPool-worker-2] Parsing 3 and sleeping 3 second(s)
    18:23:53.416 [main] Downloaded 2
    18:23:53.416 [main] Parsing 2 and sleeping 2 second(s)
    18:23:54.416 [ForkJoinPool.commonPool-worker-1] Downloaded 1
    18:23:54.416 [ForkJoinPool.commonPool-worker-1] Parsing 1 and sleeping 1 second(s)
    18:23:55.416 [main] Parsed 2
    18:23:55.416 [ForkJoinPool.commonPool-worker-1] Parsed 1
    18:23:55.416 [ForkJoinPool.commonPool-worker-2] Parsed 3
    18:23:55.416 [main] Finished in 4 seconds, results=[1, 2, 3]
  5. CompletableFuture
    private static List<Integer> completableFuture() {
        List<Integer> numbers = Arrays.asList(1, 2, 3);
    
        List<CompletableFuture<Integer>> cfs = numbers
            .stream()
            .map(n ->
                CompletableFuture.supplyAsync(() -> download(n, numbers.size() - n + 1))
                    .thenApply(id -> parse(id, n)))
            .collect(Collectors.toList());
    
        CompletableFuture<List<Integer>> allResultsCF = CompletableFuture.allOf(cfs.toArray(new CompletableFuture[numbers.size()]))
            .thenApply(v -> cfs
                .stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
            );
    
        return allResultsCF.join();
    
18:25:25.722 [main] Started
18:25:25.867 [ForkJoinPool.commonPool-worker-1] Downloading 1 and sleeping 3 second(s)
18:25:25.869 [ForkJoinPool.commonPool-worker-3] Downloading 3 and sleeping 1 second(s)
18:25:25.868 [ForkJoinPool.commonPool-worker-2] Downloading 2 and sleeping 2 second(s)
18:25:26.871 [ForkJoinPool.commonPool-worker-3] Downloaded 3
18:25:26.872 [ForkJoinPool.commonPool-worker-3] Parsing 3 and sleeping 3 second(s)
18:25:27.871 [ForkJoinPool.commonPool-worker-2] Downloaded 2
18:25:27.871 [ForkJoinPool.commonPool-worker-2] Parsing 2 and sleeping 2 second(s)
18:25:28.871 [ForkJoinPool.commonPool-worker-1] Downloaded 1
18:25:28.871 [ForkJoinPool.commonPool-worker-1] Parsing 1 and sleeping 1 second(s)
18:25:29.871 [ForkJoinPool.commonPool-worker-1] Parsed 1
18:25:29.871 [ForkJoinPool.commonPool-worker-2] Parsed 2
18:25:29.872 [ForkJoinPool.commonPool-worker-3] Parsed 3
18:25:29.874 [main] Finished in 4 seconds, results=[1, 2, 3]

If we provide our own pool we get:

CompletableFuture.supplyAsync(() -> download(n, numbers.size() - n + 1), executorService)
    .thenApplyAsync(id -> parse(id, n), executorService)
18:31:47.902 [main] Started
18:31:47.995 [pool-1-thread-1] Downloading 1 and sleeping 3 second(s)
18:31:47.997 [pool-1-thread-3] Downloading 3 and sleeping 1 second(s)
18:31:47.997 [pool-1-thread-2] Downloading 2 and sleeping 2 second(s)
18:31:48.999 [pool-1-thread-3] Downloaded 3
18:31:48.999 [pool-1-thread-3] Parsing 3 and sleeping 3 second(s)
18:31:49.999 [pool-1-thread-2] Downloaded 2
18:31:49.999 [pool-1-thread-2] Parsing 2 and sleeping 2 second(s)
18:31:50.999 [pool-1-thread-1] Downloaded 1
18:31:50.999 [pool-1-thread-1] Parsing 1 and sleeping 1 second(s)
18:31:51.999 [pool-1-thread-1] Parsed 1
18:31:51.999 [pool-1-thread-3] Parsed 3
18:31:51.999 [pool-1-thread-2] Parsed 2
18:31:52.001 [main] Finished in 4 seconds, results=[1, 2, 3]

 

Completable future – async and non blocking callbacks

 

package com.bawi.completable.future;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.concurrent.CompletableFuture;

public class MyCompletableFuture {
    private static final Logger LOGGER = LoggerFactory.getLogger(MyCompletableFuture.class);

    public static void main(String[] args) throws InterruptedException {
        System.out.println(Thread.currentThread().getName() + ": started");
        Arrays.asList("a", "ab", "abc", "abcd")
            .stream()
            .forEach(text -> {
                CompletableFutur<String> stringCF = CompletableFuture.supplyAsync(() -> produceText(text));
                CompletableFuture<Integer> integerCF = stringCF.thenApply(MyCompletableFuture::calcStringLength);
                CompletableFuture<Double> doubleCF = integerCF.thenApply(MyCompletableFuture::calcCirclePerimeter);
                CompletableFuture<Void> voidCF = doubleCF.thenAccept(MyCompletableFuture::print);
                CompletableFuture<Void> voidCF2 = voidCF.thenRun(() -> System.out.println("DONE"));
                }
            );
        System.out.println(Thread.currentThread().getName() + ": created completable future, about to sleep");
        Thread.sleep(5000);
        System.out.println(Thread.currentThread().getName() + ": finished");
   }

    static String produceText(String text)  {
        LOGGER.info("[daemon={}] 1: Producing text: {}", Thread.currentThread().isDaemon(), text);
        sleepMillis(1000);
        return text;
    }

    static int calcStringLength(String text) {
        int length = text.length();
        LOGGER.info("[daemon={}] 2: Calculating string length: {}", Thread.currentThread().isDaemon(), length);
        sleepMillis(1000);
        return length;
    }

    static double calcCirclePerimeter(int r) {
        double perimeter = 2 * Math.PI * r;
        LOGGER.info("[daemon={}] 3: Calculating circle perimeter: {}", Thread.currentThread().isDaemon(), perimeter);
        sleepMillis(1000);
        return perimeter;
    }

    static void print(Double d) {
        LOGGER.info("[daemon={}] 4: Printing: {}", Thread.currentThread().isDaemon(), d);
        System.out.println("d=" + d);
    }

    private static void sleepMillis(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Output (filtered) – note 4 things:
1) that creation of first CompletableFuture is (only) via supplyAsync method that intenally uses a thread pool: either implicit ForkJoinPool (if not specified as above) or takes explicit executor
2) the CompletableFuture creation and execution is async and non blocking (main thread immediately move forward)
3) as soon as the CompletableFuture is created then the supplied task is executed (worker-1 started producing text before main thread started to sleep)
4) if all the subsequent methods e.g. thenApply are not async then the same thread will be used to execute subsequent processing callback on that future completion (the same thread worker-1 executed all the steps)

main: started
19:13:56.653 [ForkJoinPool.commonPool-worker-1] INFO [daemon=true] 1: Producing text: a
main: created completable future, about to sleep
19:13:57.664 [ForkJoinPool.commonPool-worker-1] INFO [daemon=true] 2: Calculating string length: 1
19:13:58.664 [ForkJoinPool.commonPool-worker-1] INFO [daemon=true] 3: Calculating circle perimeter: 6.283185307179586
19:13:59.667 [ForkJoinPool.commonPool-worker-1] INFO [daemon=true] 4: Printing: 6.283185307179586
main: finished

If we change the method to be all async to thenAplyAsynch, thenAcceptAsync and thenRunAsync then there is no quarantee that the same thread will execute all subsequent proccessing for that future – note below that worker-1 executed here first 2 tasks and worker-2 other 2 tasks:

main: started
19:04:37.022 [ForkJoinPool.commonPool-worker-1] [daemon=true] 1: Producing text: a
19:04:38.031 [ForkJoinPool.commonPool-worker-1] [daemon=true] 2: Calculating string length: 1
19:04:39.045 [ForkJoinPool.commonPool-worker-2] [daemon=true] 3: Calculating circle perimeter: 6.283185307179586
19:04:40.049 [ForkJoinPool.commonPool-worker-2] [daemon=true] 4: Printing: 6.283185307179586
DONE
main: finished

 

default memory setting for java

$ jinfo <pid>
Attaching to process ID 8170, please wait…
Debugger attached successfully.

sun.boot.library.path = /apps/java/jdk1.8.0_71/jre/lib/amd64
java.vm.name = Java HotSpot(TM) 64-Bit Server VM
java.runtime.version = 1.8.0_71-b15
os.arch = amd64
os.name = Linux
java.vm.specification.version = 1.8
sun.arch.data.model = 64
java.version = 1.8.0_71
VM Flags:
Non-default VM flags: -XX:CICompilerCount=3 -XX:InitialHeapSize=526385152 –XX:MaxHeapSize=8417968128 -XX:MaxNewSize=2805989376 -XX:MinHeapDeltaBytes=524288 -XX:NewSize=175112192 -XX:OldSize=351272960 -XX:+UseCompressedClassPointers -XX:+UseCompressedOops -XX:+UseFastUnorderedTimeStamps -XX:+UseParallelGC
Command line: -Dspring.profiles.active=dev

so InitialHeapSize=0.49GBMaxHeapSize=7.9GB, MaxNewSize=2.6GB, OldSize=0.3GB

 

Understanding unix load for java processes

Given: Unix has 16 cores.

1.State before the load:
top – 15:52:37 up 128 days, 11:58, 2 users, load average: 0.20, 0.97, 0.83
Tasks: 343 total, 1 running, 340 sleeping, 0 stopped, 2 zombie
Cpu(s): 0.0%us, 0.0%sy, 0.0%ni, 99.8%id, 0.1%wa, 0.0%hi, 0.0%si, 0.0%st
Mem: 49453936k total, 36820724k used, 12633212k free, 1120572k buffers
Swap: 8388600k total, 0k used, 8388600k free, 33165052k cached

PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
18198 tktdev 15 0 11016 1264 784 R 0.3 0.0 0:00.07 top

2. State during load:
$ top
top – 16:00:53 up 128 days, 12:06, 2 users, load average: 18.05, 12.28, 6.18
Tasks: 359 total, 7 running, 349 sleeping, 0 stopped, 3 zombie
Cpu(s): 99.6%us, 0.3%sy, 0.0%ni, 0.1%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Mem: 49453936k total, 36900812k used, 12553124k free, 1120572k buffers
Swap: 8388600k total, 0k used, 8388600k free, 33165072k cached

load of 18.05 from last minute means that work load is getting bigger than it can consume (usually system admins tend to keep load as 0.75 * number of cores, so here 12)

PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
31111 tktdev 16 0 14.0g 33m 11m S 1576.2 0.1 30:19.38 java

Cpu0 :100.0%us, 0.0%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu1 :100.0%us, 0.0%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu2 :100.0%us, 0.0%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu3 :100.0%us, 0.0%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu4 : 99.7%us, 0.3%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu5 :100.0%us, 0.0%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu6 : 99.7%us, 0.3%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu7 :100.0%us, 0.0%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu8 :100.0%us, 0.0%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu9 :100.0%us, 0.0%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu10 :100.0%us, 0.0%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu11 :100.0%us, 0.0%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu12 :100.0%us, 0.0%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu13 :100.0%us, 0.0%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu14 : 99.0%us, 0.3%sy, 0.0%ni, 0.7%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu15 :100.0%us, 0.0%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st

Thread dump analysis

It can happen that a thread name does not include a unique counter that is increase whenever new thread is created in the pool. In that case, use thread nid, e.g.

"Thread-1" #19 prio=5 os_prio=0 tid=0x00000000051db800 nid=0xdd5 runnable [0x00002ae6063ac000]

that can be scripted by e.g.:

grep -e '".*"' jstack.log | sed 's/"\ .*nid=/",/' | egrep -oh '.*,[0-9A-Za-z]+' | sort

to get:

"Thread-1",0xdd5

other examples after scripting:

"JMX Monitor ThreadGroup<main> Executor Pool [Thread-71]",0x52ed
"JMX Monitor ThreadGroup<main> Executor Pool [Thread-72]",0x7a8f
"RMI TCP Connection(idle)",0x580a
"RMI TCP Connection(idle)",0x5d7d

example:

One JVM process id : 3103 (PID – Process ID)
One JVM parent process id (PPID – Parent Process ID)
Many threads ids: 3103, 3104, …, 3602, 3605 , 3643  (LWP – light weight process id) that are listed in the thread dump as shown below:

 

UID PID PPID LWP C NLWP STIME TTY TIME CMD
$ ps -eLf | grep tktdev | grep 3103 | grep java
tktdev 3103 2746 3103 0 37 03:25 pts/0 00:00:00 /opt/jdk1.8.0_45/bin/java -cp . com.bawi.cyclicbarrier.MyCyclicBarrier
tktdev 3103 2746 3104 0 37 03:25 pts/0 00:00:00 /opt/jdk1.8.0_45/bin/java -cp . com.bawi.cyclicbarrier.MyCyclicBarrier
...
tktdev 3103 2746 3602 99 37 03:25 pts/0 00:09:17 /opt/jdk1.8.0_45/bin/java -cp . com.bawi.cyclicbarrier.MyCyclicBarrier
tktdev 3103 2746 3605 99 37 03:25 pts/0 00:09:17 /opt/jdk1.8.0_45/bin/java -cp . com.bawi.cyclicbarrier.MyCyclicBarrier
tktdev 3103 2746 3643 99 37 03:25 pts/0 00:09:17 /opt/jdk1.8.0_45/bin/java -cp . com.bawi.cyclicbarrier.MyCyclicBarrier
tktdev 3103 2746 11475 0 37 03:29 pts/0 00:00:00 /opt/jdk1.8.0_45/bin/java -cp . com.bawi.cyclicbarrier.MyCyclicBarrier

Showing CPU usage per thread (with Hyper-threading option) (result filtered)
top -H 

 PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
 3602 tktdev 25 0 13.8g 28m 11m R 99.9 0.1 10:38.74 java
 3605 tktdev 25 0 13.8g 28m 11m R 99.9 0.1 10:38.74 java
 3643 tktdev 25 0 13.8g 28m 11m R 99.9 0.1 10:38.74 java

Getting java PID:

$ /opt/jdk1.8.0_45/bin/jcmd
3103 com.bawi.cyclicbarrier.MyCyclicBarrier

Creating thread dump via jstack:

$ /opt/jdk1.8.0_45/bin/jstack -l 3103 > jstack.log

Filtered content of thread dump:

"Thread-2" #20 prio=5 os_prio=0 tid=0x00002ac1f4013800 nid=0xe3b runnable [0x00002ac1eec0f000] (note: nid=0xe3b (in hex)== 3643 (in dec))
 java.lang.Thread.State: RUNNABLE
 at com.bawi.cyclicbarrier.MyCyclicBarrier.lambda$createAndStartNewThread$0(MyCyclicBarrier.java:23)
 at com.bawi.cyclicbarrier.MyCyclicBarrier$$Lambda$1/791452441.run(Unknown Source)
 at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
 - None
"Thread-1" #19 prio=5 os_prio=0 tid=0x00002ac1f4011800 nid=0xe15 runnable [0x00002ac1eeb0e000] (note: nid=0xe15 (in hex)== 3605 (in dec))
 java.lang.Thread.State: RUNNABLE
 at com.bawi.cyclicbarrier.MyCyclicBarrier.lambda$createAndStartNewThread$0(MyCyclicBarrier.java:23)
 at com.bawi.cyclicbarrier.MyCyclicBarrier$$Lambda$1/791452441.run(Unknown Source)
 at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
 - None
"Thread-0" #18 prio=5 os_prio=0 tid=0x00002ac1f4010000 nid=0xe12 runnable [0x00002ac1eea0d000] (note: nid=0xe12 (in hex)== 3602 (in dec))
 java.lang.Thread.State: RUNNABLE
 at com.bawi.cyclicbarrier.MyCyclicBarrier.lambda$createAndStartNewThread$0(MyCyclicBarrier.java:23)
 at com.bawi.cyclicbarrier.MyCyclicBarrier$$Lambda$1/791452441.run(Unknown Source)
 at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
 - None


			

Java serial vs parallel stream

use parallel stream when:
1) operation applied to data element is independent
and
2) either
a) cpu intensive (e.g. isPrime(number))
or
b) there are many of these data elements and they are efficiently splitable (like collections with random access and/or efficient search e.g. ArrayList; also HashMap and plain arrays, as opposed to not so efficiently splittable LinkedList, BlockinQueue and most IO based)

– does NOT make sense to paralellize relatively small number (e.g. less than 10000) of cpu non-intensive (not time-consuming) oprerations, instead use serial stream: numerbs.stream().filter(this::isEven).forEach(System.out::print)
*it may take some startup time for JVM to start all the cores for parallel processing,
* tasks subdivision takes also time for the framework to makes little sense to divide further already very small tasks
– it does make sense to paralellize cpu intensive operations: numbers.parallelStream().filter(this::isPrime).forEach(System.out::print)

Note that most IO sources (e.g. read lines) are not efficiently splitable and should be used sequentially.