Tag: java

Scala for Java developers – decompiles Scala classes to Java byte code

Classes in scala (decompiled to Java byte code to see generated methods)

scala> me@MacBook:~$ scala
Welcome to Scala 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_192).
Type in expressions for evaluation. Or try :help.

scala> class Person(name: String, age: Int) {
     | println("In default constructor") 
     | override def toString = "Person(name=" + name + ",age=" + age + ")"
     | }
defined class Person

scala> new Person("Bob",18)
In default constructor
res0: Person = Person(name=Bob,age=18)

scala> :javap -p Person
Compiled from ""
public class Person {
  private final java.lang.String name;
  private final int age;
  public java.lang.String toString();
  public Person(java.lang.String, int);
}


scala> :paste
// Entering paste mode (ctrl-D to finish)

class Person(name: String, age: Int) {
  override def toString = "Person(name=" + name + ",age=" + age + ")"
}

object Person {
  def apply(name: String, age: Int) = {   // create factory method(s) using apply method(s)
    new Person(name,age)
  }
}

// Exiting paste mode, now interpreting.

defined class Person
defined object Person   // companion object

scala> new Person("Alice",19)
res0: Person = Person(name=Alice,age=19)

scala> Person.apply("Alice", 19)         // access companion object methods by object-name.method-name
res1: Person = Person(name=Alice,age=19)

scala> Person("Alice", 19)               // can even skip typing apply method
res2: Person = Person(name=Alice,age=19)

scala> :javap -p Person$
Compiled from ""
public class Person$ {
  public static final Person$ MODULE$;
  public static {};
  public Person apply(java.lang.String, int);
  public Person$();
}

scala> class Person(val name: String, val age: Int) {
     | println("In default constructor") 
     | override def toString = "Person(name=" + name + ",age=" + age + ")"
     | }
defined class Person

scala> :javap -p Person
Compiled from ""
public class Person {
  private final java.lang.String name;
  private final int age;
  public java.lang.String name();
  public int age();
  public java.lang.String toString();
  public Person(java.lang.String, int);
}

scala> new Person("Bob",18)
In default constructor
res1: Person = Person(name=Bob,age=18)

scala> res1.name
res2: String = Bob

scala> res1.name()
:13: error: not enough arguments for method apply: (index: Int)Char in class StringOps.
Unspecified value parameter index.
       res1.name()
                ^

scala> class Person(var name: String, var age: Int) {
     | override def toString = "Person(name=" + name + ",age=" + age + ")"
     | }
defined class Person

scala> :javap -p Person
Compiled from ""
public class Person {
  private java.lang.String name;
  private int age;
  public java.lang.String name();
  public void name_$eq(java.lang.String);
  public int age();
  public void age_$eq(int);
  public java.lang.String toString();
  public Person(java.lang.String, int);
}

scala> new Person("Bob",18)
res4: Person = Person(name=Bob,age=18)

scala> res4.name_=("Bobby")

scala> res4
res6: Person = Person(name=Bobby,age=18)

scala> res4.name = "Bobby2"
res4.name: String = Bobby2

scala> res4
res7: Person = Person(name=Bobby2,age=18)

scala> case class Person(var name: String, var age: Int) // var to add setter methods
defined class Person

scala> new Person("Alice",17)
res0: Person = Person(Alice,17) // case class has by default toString with fields (instead byte address)

scala> Person("Bobby",18)  // automatically added companion object with apply method
res1: Person = Person(Bobby,18)

scala> :javap -p Person
Compiled from ""
public class Person implements scala.Product,scala.Serializable {
  private java.lang.String name;
  private int age;
  public java.lang.String name();
  public void name_$eq(java.lang.String);  // var to add setter methods
  public int age();
  public void age_$eq(int);
  public Person copy(java.lang.String, int);
  public java.lang.String copy$default$1();
  public int copy$default$2();
  public java.lang.String productPrefix();
  public int productArity();
  public java.lang.Object productElement(int);
  public scala.collection.Iterator productIterator();
  public boolean canEqual(java.lang.Object);
  public int hashCode();
  public java.lang.String toString();
  public boolean equals(java.lang.Object);
  public Person(java.lang.String, int);
}

scala> :javap -p scala.Serializable
Compiled from "Serializable.scala"
public interface scala.Serializable extends java.io.Serializable {
}

scala> :javap -p java.io.Serializable
Compiled from "Serializable.java"
public interface java.io.Serializable {
}

scala> :javap -p scala.Product
Compiled from "Product.scala"
public interface scala.Product extends scala.Equals {
  public abstract java.lang.Object productElement(int);
  public abstract int productArity();
  public abstract scala.collection.Iterator productIterator();
  public abstract java.lang.String productPrefix();
}

scala> :javap -p Person$  // automatically added companion object with apply method
Compiled from ""
public class Person$ extends scala.runtime.AbstractFunction2 implements scala.Serializable {
  public static final Person$ MODULE$;
  public static {};
  public final java.lang.String toString();
  public Person apply(java.lang.String, int);
  public scala.Option<scala.Tuple2> unapply(Person);
  private java.lang.Object readResolve();
  public java.lang.Object apply(java.lang.Object, java.lang.Object);
  public Person$();
}

scala> case class Person(name: String, age: Int)  // no var before args - no setters generated
defined class Person

scala> new Person("Alice",17)
res0: Person = Person(Alice,17)

scala> Person("Bobby",18)
res1: Person = Person(Bobby,18)

scala> :javap -p Person
Compiled from ""
public class Person implements scala.Product,scala.Serializable {
  private final java.lang.String name;
  private final int age;
  public java.lang.String name();
  public int age();
  public Person copy(java.lang.String, int);
  public java.lang.String copy$default$1();
  public int copy$default$2();
  public java.lang.String productPrefix();
  public int productArity();
  public java.lang.Object productElement(int);
  public scala.collection.Iterator productIterator();
  public boolean canEqual(java.lang.Object);
  public int hashCode();
  public java.lang.String toString();
  public boolean equals(java.lang.Object);
  public Person(java.lang.String, int);
}

scala> :javap -p Person$
Compiled from ""
public class Person$ extends scala.runtime.AbstractFunction2 implements scala.Serializable {
  public static final Person$ MODULE$;
  public static {};
  public final java.lang.String toString();
  public Person apply(java.lang.String, int);
  public scala.Option<scala.Tuple2> unapply(Person);
  private java.lang.Object readResolve();
  public java.lang.Object apply(java.lang.Object, java.lang.Object);
  public Person$();
}

scala> case class Person(name: String, age: Int) { // override constructor and toString method in case class
     | println("In my constructor")
     | override def toString = name + age
     | }
defined class Person

scala> new Person("a",1)
In my constructor
res4: Person = a1

 

Advertisements

Scala for Java developers


class MyClass {
  var myname: String = _ // unitialize to default value // var so that cannot be changed
  var myseq: Seq[Any] = Nil // empty seq
  def mytrim: String = myname.trim
  
}
  
case class Person(age: Int, name: String)
  
object PersonImplicits {
  implicit class PersonHelper(person: Person) {
    def describe: String = s"Person[${person.age},${person.name}]"
  }
}
  
  
object MyClass {
  def main(args: Array[String]): Unit = {
 
    println(sum(1, 10, i => i)) // 55 pass directly function body
 
    println(sum(1, 10, square)) // 385 pass method calculating a square
 
    def square(i: Int): Int = i * i // declare method after referencing it
  
    import java.util
  
    val javaList: util.List[String] = new util.ArrayList[String] // import java jdk List and ArrayList
    javaList.add("a")
  
    import scala.collection.mutable
    import scala.collection.JavaConverters._
    val scalaSeq: mutable.Seq[String] = javaList.asScala // convert java javaList to scala mutable seq
  
    // alternative ways to println elements
    scalaSeq.foreach(s => println(s))
    scalaSeq.foreach(println(_))
    scalaSeq.foreach(println _)
    scalaSeq.foreach(println)
  
    val list: List[String] = scalaSeq.toList // convert from seq to list
    scalaSeq.foreach(println)
  
    val seq: Seq[String] = Seq("a", "b", "c") // scala Seq is a trait so equivalent to Java List interface
    seq.foreach(println)
  
    val immutableList = List(1, 2 ,3) // scala list is immutable and is an abstract class (that is extended by Nil == empty list)
    immutableList.map(10 * _).foreach(println)
  
    val ints: mutable.ListBuffer[Int] = mutable.ListBuffer(11, 22, 33)
    ints += 44 // mutable ListBuffer allows adding elements via +=, Seq does not
  
    val immutableMap: Map[String, Int] = Map("a" -> 1, "b" -> 2)
    println("map:" + immutableMap("a")) // get value by key = "a"
  
    val newImmutableMap: Map[String, Int] = immutableMap + ("c" -> 3) // create a new map based on existing and new entry
    newImmutableMap.foreach(e => println(e._1 + ":" + e._2))
  
    val mutableMap = mutable.Map("aa" -> 10)
    mutableMap += "bb" -> 20
    mutableMap.foreach{ case(k, v) => println(k + ":" + v) }
  
    printOptional(Some("a")) // value=a
    printOptional(None)      // value=empty
  
    println(divide(12,4,3))  // result 1
    println(divide(b=4,a=12,c=3)) // same result 1: alternatively with named params and different order
  
    println(divide(12,4)) // result 3: since third param has default value then no need to specify it unless overwrite
  
    val clazz = new MyClass
    clazz.myname = " aaa bbb "
    println(s"'${clazz.mytrim}'")
    clazz.myseq = Seq(12)
  
    matchcase(0)
    matchcase(1)
  
    import PersonImplicits._
    println(Person(18, "me").describe) // implicitly add describe method
  }
  
  def printOptional (optionalValue: Option[String]): Unit = {
    val value = if (optionalValue.isDefined) optionalValue.get else "empty"
    println("value=" + value)
  }
  
  def divide (a: Int, b: Int, c: Int = 1): Int = a / b / c    // value of last statement is the method return value
  
  def matchcase (n: Int): Unit = {
    n match {
      case -1 | 0 => println("non positive")
      case other => println("positive " + other)
    }
  }
 
  def sum(lb: Int, ub: Int, fun: Int => Int) = { // define a method (before referencing it) accepting function as 3rd arg
    var sum = 0 // declare mutable variable (val is immutable)
    for (el <- lb to ub) { // iterate by 1 from lb to ub
      sum += fun(el)
    }
    sum
  }
}

Unions and default value in apache avro serialization and deserialization

Initial avro schema (schema/user.avsc) defines a User record with a name field only.

{
  "namespace": "com.bawi.avro.model",
  "type": "record",
  "name": "User",
  "fields": [
    {
      "name": "name",
      "type": "string"
    }
  ]
}

Maven pom.xml defines avro dependency

        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.8.1</version>
        </dependency>

so we can serialize the User data in Java to disc to user.avro file

        Schema schema = new Schema.Parser().parse(new File("schema/user.avsc"));
        File avroFile = new File("target/user.avro");
        GenericRecord user = new GenericData.Record(schema);
        user.put("name", "Alyssa");
        DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
        DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
        dataFileWriter.create(schema, avroFile);
        dataFileWriter.append(user);
        dataFileWriter.close();

we can read (deserialize) User from the disc either by Java

        DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
        DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(avroFile, datumReader);
        GenericRecord user = null;
        while (dataFileReader.hasNext()) {
            user = dataFileReader.next(user);
            System.out.println(user);
        }

or by using avro-utils jar that can be downloaded by maven when declared maven test dependency:

        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-tools</artifactId>
            <version>1.8.1</version>
            <scope>test</scope>
        </dependency>

and running with ‘tojson’ argument

me@MacBook:~/dev/my-projects/my-avro$ java -jar /Users/me/.m2/repository/org/apache/avro/avro-tools/1.8.1/avro-tools-1.8.1.jar tojson users.avro 
{"name":"Alyssa"}

Then we will add a new favorite_number element to the schema:

{
  "namespace": "com.bawi.avro.model",
  "type": "record",
  "name": "User",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "favorite_number",
      "type": "int"
    }
  ]
}

and run the deserialization Java code for existing data in the user.avro but against the new schema, then we get:

Exception in thread "main" org.apache.avro.AvroTypeException: Found com.bawi.avro.model.User, expecting com.bawi.avro.model.User, missing required field favorite_number

since the favorite_number does not exist in avro file.

Adding only a union of int and null value does not help to get rid the error above.

The solution is to add a default value with a union for favorite_number e.g.:

    {
      "name": "favorite_number",
      "type": [
        "null",
        "int"
      ],
      "default": null
    }

to get: {“name”: “Alyssa”, “favorite_number”: null}
or add

    {
      "name": "favorite_number",
      "type": "int",
      "default": 0
    }

to get: {“name”: “Alyssa”, “favorite_number”: 0}

Please note that placing int as first argument of a union and having null as default value such as:

    {
      "name": "favorite_number",
      "type": [
        "int",
        "null"
      ],
      "default": null
    }

gives an error:

Exception in thread "main" org.apache.avro.AvroTypeException: Non-numeric default value for int: null

or

Exception in thread "main" org.apache.avro.AvroTypeException: Non-null default value for null type: 0

when

    {
      "name": "favorite_number",
      "type": [
        "null",
        "int"
      ],
      "default": 0
    }

as described in https://avro.apache.org/docs/1.7.7/spec.html#Unions

Multiple approaches to concurrent processing in Java

Goal: Lets assume we want to execute 2 long running operations: download content and parse it and we want to do it for the list of resources.

The blog points multiple approaches:
1) Thread(s)
2) ExecutorService to submit Callable and return blocking Future get()
3) ExecutionCompletionService implementing CompletionService to submit Callable and returning Future.  CompletionService has take or poll methods waiting and returning first completed task (returned Future’s get() is not blocking)
4) Parallel streams
5) CompletableFuture

public class TestConcurrent {

    private static final Logger LOGGER = LoggerFactory.getLogger(TestConcurrent.class);

    public static void main(String[] args) {
        LOGGER.info("Started");
        long startMillis = System.currentTimeMillis();

        // for each of the resource: download it and parse it and aggregate results to list:
        List<Integer> results = ...

        long stopMillis = System.currentTimeMillis();
        LOGGER.info("Finished in {} seconds, results={}", (stopMillis - startMillis) / 1000, results);
    }

Lets define the download and parse methods and simulate long running execution using configurable TimeUnit.SECONDS.sleep(seconds).

static int download(int id, int seconds) {
    LOGGER.info("Downloading {} and sleeping {} second(s)", id, seconds);
    try {
        TimeUnit.SECONDS.sleep(seconds);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    LOGGER.info("Downloaded {}", id);
    return id;
}

static int parse(int id, int seconds) {
    LOGGER.info("Parsing {} and sleeping {} second(s)", id, seconds);
    try {
        TimeUnit.SECONDS.sleep(seconds);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    LOGGER.info("Parsed {}", id);
    return id;
}

In order to make the scenario even more representative between approaches lets define number of resources as 3 and each of the resource needed to be downloaded before it can be parsed. Moreover, 1st resource will be downloaded in 3 seconds, 2nd in two seconds and 3rd in 3 seconds. Conversely, it takes 1 second to parse 1st resource, 2 seconds to parse 2nd resource and 3 seconds to parse 3rd resource.

Obviously we do not want to execute the tasks sequentially using the same thread as it would take too long (12 seconds)

private static List<Integer> mapSequentiallyWithOnlyMainThread() {
    List<Integer> numbers = Arrays.asList(1, 2, 3);

    return numbers
        .stream()
        .map(n -> download(n, numbers.size() - n + 1))
        .map(n -> parse(n, n))
        .collect(Collectors.toList());
}
17:54:06.281 [main] Started
17:54:06.391 [main] Downloading 1 and sleeping 3 second(s)
17:54:09.394 [main] Downloaded 1
17:54:09.394 [main] Parsing 1 and sleeping 1 second(s)
17:54:10.394 [main] Parsed 1
17:54:10.394 [main] Downloading 2 and sleeping 2 second(s)
17:54:12.394 [main] Downloaded 2
17:54:12.394 [main] Parsing 2 and sleeping 2 second(s)
17:54:14.394 [main] Parsed 2
17:54:14.394 [main] Downloading 3 and sleeping 1 second(s)
17:54:15.394 [main] Downloaded 3
17:54:15.395 [main] Parsing 3 and sleeping 3 second(s)
17:54:18.395 [main] Parsed 3
17:54:18.395 [main] Finished in 12 seconds, results=[1, 2, 3]

So lets use multiple threads.

  1. Threads
    private static List<Integer> threads() {
        List<Integer> numbers = Arrays.asList(1, 2, 3);
    
        return numbers
            .stream()
            .map(n -> startThreadAndJoin(() -> download(n, numbers.size() - n + 1)))
            .map(n -> startThreadAndJoin(() -> parse(n, n)))
            .collect(Collectors.toList());
    }
    
    private static Integer startThreadAndJoin(Supplier<Integer> supplier) {
        AtomicInteger value = new AtomicInteger();
        Thread thread = new Thread(() -> {
            value.set(supplier.get());
        });
        thread.start();
        try {
            thread.join();
            return value.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }
    18:02:35.132 [main] Started
    18:02:35.244 [Thread-0] Downloading 1 and sleeping 3 second(s)
    18:02:38.246 [Thread-0] Downloaded 1
    18:02:38.248 [Thread-1] Parsing 1 and sleeping 1 second(s)
    18:02:39.248 [Thread-1] Parsed 1
    18:02:39.249 [Thread-2] Downloading 2 and sleeping 2 second(s)
    18:02:41.250 [Thread-2] Downloaded 2
    18:02:41.251 [Thread-3] Parsing 2 and sleeping 2 second(s)
    18:02:43.251 [Thread-3] Parsed 2
    18:02:43.252 [Thread-4] Downloading 3 and sleeping 1 second(s)
    18:02:44.252 [Thread-4] Downloaded 3
    18:02:44.253 [Thread-5] Parsing 3 and sleeping 3 second(s)
    18:02:47.253 [Thread-5] Parsed 3
    18:02:47.253 [main] Finished in 12 seconds, results=[1, 2, 3]
  2. ExecutorService
    private static List<Integer> executorServiceGet() {
        List<Integer> numbers = Arrays.asList(1, 2, 3);
    
        ExecutorService executorService = Executors.newFixedThreadPool(numbers.size());
        List<Future<Integer>> downloadedFutures = numbers
            .stream()
            .map(n -> {
                Callable<Integer> callable = () -> download(n, numbers.size() - n + 1);
                return executorService.submit(callable);
            })
            .collect(Collectors.toList());
    
        List<Future<Integer>> parsedFutures = numbers
            .stream()
            .map(n -> {
                Future<Integer> future = downloadedFutures.get(n - 1);
                try {
                    Integer value = future.get();
                    return executorService.submit(() -> parse(value, n));
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
                return null;
            })
            .collect(Collectors.toList());
    
        return parsedFutures
            .stream()
            .map(future -> {
                try {
                    return future.get();
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
                return null;
            })
            .collect(Collectors.toList());
    }
    
    18:10:11.988 [main] Started
    18:10:12.106 [pool-1-thread-1] Downloading 1 and sleeping 3 second(s)
    18:10:12.110 [pool-1-thread-2] Downloading 2 and sleeping 2 second(s)
    18:10:12.111 [pool-1-thread-3] Downloading 3 and sleeping 1 second(s)
    18:10:13.111 [pool-1-thread-3] Downloaded 3
    18:10:14.111 [pool-1-thread-2] Downloaded 2
    18:10:15.109 [pool-1-thread-1] Downloaded 1
    18:10:15.111 [pool-1-thread-3] Parsing 1 and sleeping 1 second(s)
    18:10:15.112 [pool-1-thread-2] Parsing 2 and sleeping 2 second(s)
    18:10:15.112 [pool-1-thread-1] Parsing 3 and sleeping 3 second(s)
    18:10:16.112 [pool-1-thread-3] Parsed 1
    18:10:17.113 [pool-1-thread-2] Parsed 2
    18:10:18.112 [pool-1-thread-1] Parsed 3
    18:10:18.112 [main] Finished in 6 seconds, results=[1, 2, 3]
  3. ExecutionCompletionService
    private static List<Integer> executionCompletionServiceTakeAndGet() {
        List<Integer> numbers = Arrays.asList(1, 2, 3);
    
        ExecutorService executorService = Executors.newFixedThreadPool(numbers.size());
        ExecutorCompletionService<Integer> executorCompletionService = new ExecutorCompletionService<>(executorService);
    
        numbers
            .stream()
            .forEach(n -> executorCompletionService.submit(() -> download(n, numbers.size() - n + 1)));
    
        numbers
            .stream()
            .forEach(ignored -> {
                try {
                    Future<Integer>future = executorCompletionService.take();
                    Integer value = future.get();
                    executorCompletionService.submit(() -> parse(value, value));
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            });
    
        return numbers
            .stream()
            .map(ignore -> {
                try {
                    Future<Integer> future = executorCompletionService.take();
                    return future.get();
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
                return null;
            })
            .collect(Collectors.toList());
    }
    
    18:18:16.548 [main] Started
    18:18:16.655 [pool-1-thread-2] Downloading 2 and sleeping 2 second(s)
    18:18:16.655 [pool-1-thread-3] Downloading 3 and sleeping 1 second(s)
    18:18:16.655 [pool-1-thread-1] Downloading 1 and sleeping 3 second(s)
    18:18:17.658 [pool-1-thread-3] Downloaded 3
    18:18:17.660 [pool-1-thread-3] Parsing 3 and sleeping 3 second(s)
    18:18:18.658 [pool-1-thread-2] Downloaded 2
    18:18:18.658 [pool-1-thread-2] Parsing 2 and sleeping 2 second(s)
    18:18:19.658 [pool-1-thread-1] Downloaded 1
    18:18:19.658 [pool-1-thread-1] Parsing 1 and sleeping 1 second(s)
    18:18:20.658 [pool-1-thread-2] Parsed 2
    18:18:20.658 [pool-1-thread-1] Parsed 1
    18:18:20.660 [pool-1-thread-3] Parsed 3
    18:18:20.660 [main] Finished in 4 seconds, results=[2, 1, 3]
  4. Parallel streams
    private static List<Integer> mapParallel() {
        List<Integer> numbers = Arrays.asList(1, 2, 3);
    
        return numbers
            .parallelStream()
            .map(n -> download(n, numbers.size() - n + 1))
            .map(n -> parse(n, n))
            .collect(Collectors.toList());
    }
    18:23:51.300 [main] Started
    18:23:51.413 [main] Downloading 2 and sleeping 2 second(s)
    18:23:51.413 [ForkJoinPool.commonPool-worker-1] Downloading 1 and sleeping 3 second(s)
    18:23:51.413 [ForkJoinPool.commonPool-worker-2] Downloading 3 and sleeping 1 second(s)
    18:23:52.416 [ForkJoinPool.commonPool-worker-2] Downloaded 3
    18:23:52.416 [ForkJoinPool.commonPool-worker-2] Parsing 3 and sleeping 3 second(s)
    18:23:53.416 [main] Downloaded 2
    18:23:53.416 [main] Parsing 2 and sleeping 2 second(s)
    18:23:54.416 [ForkJoinPool.commonPool-worker-1] Downloaded 1
    18:23:54.416 [ForkJoinPool.commonPool-worker-1] Parsing 1 and sleeping 1 second(s)
    18:23:55.416 [main] Parsed 2
    18:23:55.416 [ForkJoinPool.commonPool-worker-1] Parsed 1
    18:23:55.416 [ForkJoinPool.commonPool-worker-2] Parsed 3
    18:23:55.416 [main] Finished in 4 seconds, results=[1, 2, 3]
  5. CompletableFuture
    private static List<Integer> completableFuture() {
        List<Integer> numbers = Arrays.asList(1, 2, 3);
    
        List<CompletableFuture<Integer>> cfs = numbers
            .stream()
            .map(n ->
                CompletableFuture.supplyAsync(() -> download(n, numbers.size() - n + 1))
                    .thenApply(id -> parse(id, n)))
            .collect(Collectors.toList());
    
        CompletableFuture<List<Integer>> allResultsCF = CompletableFuture.allOf(cfs.toArray(new CompletableFuture[numbers.size()]))
            .thenApply(v -> cfs
                .stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
            );
    
        return allResultsCF.join();
    
18:25:25.722 [main] Started
18:25:25.867 [ForkJoinPool.commonPool-worker-1] Downloading 1 and sleeping 3 second(s)
18:25:25.869 [ForkJoinPool.commonPool-worker-3] Downloading 3 and sleeping 1 second(s)
18:25:25.868 [ForkJoinPool.commonPool-worker-2] Downloading 2 and sleeping 2 second(s)
18:25:26.871 [ForkJoinPool.commonPool-worker-3] Downloaded 3
18:25:26.872 [ForkJoinPool.commonPool-worker-3] Parsing 3 and sleeping 3 second(s)
18:25:27.871 [ForkJoinPool.commonPool-worker-2] Downloaded 2
18:25:27.871 [ForkJoinPool.commonPool-worker-2] Parsing 2 and sleeping 2 second(s)
18:25:28.871 [ForkJoinPool.commonPool-worker-1] Downloaded 1
18:25:28.871 [ForkJoinPool.commonPool-worker-1] Parsing 1 and sleeping 1 second(s)
18:25:29.871 [ForkJoinPool.commonPool-worker-1] Parsed 1
18:25:29.871 [ForkJoinPool.commonPool-worker-2] Parsed 2
18:25:29.872 [ForkJoinPool.commonPool-worker-3] Parsed 3
18:25:29.874 [main] Finished in 4 seconds, results=[1, 2, 3]

If we provide our own pool we get:

CompletableFuture.supplyAsync(() -> download(n, numbers.size() - n + 1), executorService)
    .thenApplyAsync(id -> parse(id, n), executorService)
18:31:47.902 [main] Started
18:31:47.995 [pool-1-thread-1] Downloading 1 and sleeping 3 second(s)
18:31:47.997 [pool-1-thread-3] Downloading 3 and sleeping 1 second(s)
18:31:47.997 [pool-1-thread-2] Downloading 2 and sleeping 2 second(s)
18:31:48.999 [pool-1-thread-3] Downloaded 3
18:31:48.999 [pool-1-thread-3] Parsing 3 and sleeping 3 second(s)
18:31:49.999 [pool-1-thread-2] Downloaded 2
18:31:49.999 [pool-1-thread-2] Parsing 2 and sleeping 2 second(s)
18:31:50.999 [pool-1-thread-1] Downloaded 1
18:31:50.999 [pool-1-thread-1] Parsing 1 and sleeping 1 second(s)
18:31:51.999 [pool-1-thread-1] Parsed 1
18:31:51.999 [pool-1-thread-3] Parsed 3
18:31:51.999 [pool-1-thread-2] Parsed 2
18:31:52.001 [main] Finished in 4 seconds, results=[1, 2, 3]

 

Completable future – async and non blocking callbacks

 

package com.bawi.completable.future;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.concurrent.CompletableFuture;

public class MyCompletableFuture {
    private static final Logger LOGGER = LoggerFactory.getLogger(MyCompletableFuture.class);

    public static void main(String[] args) throws InterruptedException {
        System.out.println(Thread.currentThread().getName() + ": started");
        Arrays.asList("a", "ab", "abc", "abcd")
            .stream()
            .forEach(text -> {
                CompletableFutur<String> stringCF = CompletableFuture.supplyAsync(() -> produceText(text));
                CompletableFuture<Integer> integerCF = stringCF.thenApply(MyCompletableFuture::calcStringLength);
                CompletableFuture<Double> doubleCF = integerCF.thenApply(MyCompletableFuture::calcCirclePerimeter);
                CompletableFuture<Void> voidCF = doubleCF.thenAccept(MyCompletableFuture::print);
                CompletableFuture<Void> voidCF2 = voidCF.thenRun(() -> System.out.println("DONE"));
                }
            );
        System.out.println(Thread.currentThread().getName() + ": created completable future, about to sleep");
        Thread.sleep(5000);
        System.out.println(Thread.currentThread().getName() + ": finished");
   }

    static String produceText(String text)  {
        LOGGER.info("[daemon={}] 1: Producing text: {}", Thread.currentThread().isDaemon(), text);
        sleepMillis(1000);
        return text;
    }

    static int calcStringLength(String text) {
        int length = text.length();
        LOGGER.info("[daemon={}] 2: Calculating string length: {}", Thread.currentThread().isDaemon(), length);
        sleepMillis(1000);
        return length;
    }

    static double calcCirclePerimeter(int r) {
        double perimeter = 2 * Math.PI * r;
        LOGGER.info("[daemon={}] 3: Calculating circle perimeter: {}", Thread.currentThread().isDaemon(), perimeter);
        sleepMillis(1000);
        return perimeter;
    }

    static void print(Double d) {
        LOGGER.info("[daemon={}] 4: Printing: {}", Thread.currentThread().isDaemon(), d);
        System.out.println("d=" + d);
    }

    private static void sleepMillis(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Output (filtered) – note 4 things:
1) that creation of first CompletableFuture is (only) via supplyAsync method that intenally uses a thread pool: either implicit ForkJoinPool (if not specified as above) or takes explicit executor
2) the CompletableFuture creation and execution is async and non blocking (main thread immediately move forward)
3) as soon as the CompletableFuture is created then the supplied task is executed (worker-1 started producing text before main thread started to sleep)
4) if all the subsequent methods e.g. thenApply are not async then the same thread will be used to execute subsequent processing callback on that future completion (the same thread worker-1 executed all the steps)

main: started
19:13:56.653 [ForkJoinPool.commonPool-worker-1] INFO [daemon=true] 1: Producing text: a
main: created completable future, about to sleep
19:13:57.664 [ForkJoinPool.commonPool-worker-1] INFO [daemon=true] 2: Calculating string length: 1
19:13:58.664 [ForkJoinPool.commonPool-worker-1] INFO [daemon=true] 3: Calculating circle perimeter: 6.283185307179586
19:13:59.667 [ForkJoinPool.commonPool-worker-1] INFO [daemon=true] 4: Printing: 6.283185307179586
main: finished

If we change the method to be all async to thenAplyAsynch, thenAcceptAsync and thenRunAsync then there is no quarantee that the same thread will execute all subsequent proccessing for that future – note below that worker-1 executed here first 2 tasks and worker-2 other 2 tasks:

main: started
19:04:37.022 [ForkJoinPool.commonPool-worker-1] [daemon=true] 1: Producing text: a
19:04:38.031 [ForkJoinPool.commonPool-worker-1] [daemon=true] 2: Calculating string length: 1
19:04:39.045 [ForkJoinPool.commonPool-worker-2] [daemon=true] 3: Calculating circle perimeter: 6.283185307179586
19:04:40.049 [ForkJoinPool.commonPool-worker-2] [daemon=true] 4: Printing: 6.283185307179586
DONE
main: finished

 

default memory setting for java

$ jinfo <pid>
Attaching to process ID 8170, please wait…
Debugger attached successfully.

sun.boot.library.path = /apps/java/jdk1.8.0_71/jre/lib/amd64
java.vm.name = Java HotSpot(TM) 64-Bit Server VM
java.runtime.version = 1.8.0_71-b15
os.arch = amd64
os.name = Linux
java.vm.specification.version = 1.8
sun.arch.data.model = 64
java.version = 1.8.0_71
VM Flags:
Non-default VM flags: -XX:CICompilerCount=3 -XX:InitialHeapSize=526385152 –XX:MaxHeapSize=8417968128 -XX:MaxNewSize=2805989376 -XX:MinHeapDeltaBytes=524288 -XX:NewSize=175112192 -XX:OldSize=351272960 -XX:+UseCompressedClassPointers -XX:+UseCompressedOops -XX:+UseFastUnorderedTimeStamps -XX:+UseParallelGC
Command line: -Dspring.profiles.active=dev

so InitialHeapSize=0.49GBMaxHeapSize=7.9GB, MaxNewSize=2.6GB, OldSize=0.3GB

 

Understanding unix load for java processes

Given: Unix has 16 cores.

1.State before the load:
top – 15:52:37 up 128 days, 11:58, 2 users, load average: 0.20, 0.97, 0.83
Tasks: 343 total, 1 running, 340 sleeping, 0 stopped, 2 zombie
Cpu(s): 0.0%us, 0.0%sy, 0.0%ni, 99.8%id, 0.1%wa, 0.0%hi, 0.0%si, 0.0%st
Mem: 49453936k total, 36820724k used, 12633212k free, 1120572k buffers
Swap: 8388600k total, 0k used, 8388600k free, 33165052k cached

PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
18198 tktdev 15 0 11016 1264 784 R 0.3 0.0 0:00.07 top

2. State during load:
$ top
top – 16:00:53 up 128 days, 12:06, 2 users, load average: 18.05, 12.28, 6.18
Tasks: 359 total, 7 running, 349 sleeping, 0 stopped, 3 zombie
Cpu(s): 99.6%us, 0.3%sy, 0.0%ni, 0.1%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Mem: 49453936k total, 36900812k used, 12553124k free, 1120572k buffers
Swap: 8388600k total, 0k used, 8388600k free, 33165072k cached

load of 18.05 from last minute means that work load is getting bigger than it can consume (usually system admins tend to keep load as 0.75 * number of cores, so here 12)

PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
31111 tktdev 16 0 14.0g 33m 11m S 1576.2 0.1 30:19.38 java

Cpu0 :100.0%us, 0.0%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu1 :100.0%us, 0.0%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu2 :100.0%us, 0.0%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu3 :100.0%us, 0.0%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu4 : 99.7%us, 0.3%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu5 :100.0%us, 0.0%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu6 : 99.7%us, 0.3%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu7 :100.0%us, 0.0%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu8 :100.0%us, 0.0%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu9 :100.0%us, 0.0%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu10 :100.0%us, 0.0%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu11 :100.0%us, 0.0%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu12 :100.0%us, 0.0%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu13 :100.0%us, 0.0%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu14 : 99.0%us, 0.3%sy, 0.0%ni, 0.7%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu15 :100.0%us, 0.0%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st