Tag: java

Java Serialization – static nested, inner and anonymous classes

public class SerializationUtils {
    public static ByteArrayOutputStream serialize(Object object) throws IOException {
        System.out.print("Serializing: " + object);
        ByteArrayOutputStream bout = new ByteArrayOutputStream();
        ObjectOutputStream out = new ObjectOutputStream(bout);
        out.writeObject(object);
        out.close();
        return bout;
    }

    public static void deserialize(ByteArrayOutputStream bout) throws IOException, 
                                                           ClassNotFoundException {
        ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray());
        ObjectInputStream in = new ObjectInputStream(bin);
        Object readObject = in.readObject();
        System.out.println(", deserialized: " + readObject);
    }
}

Junit:

public class MySerializableClass implements Serializable {
}

public class MySerializationTest {
    private String myJunitStringField = "hello";

    static class MyStaticNestedSerializableClass implements Serializable {
        public int getLength(String s) {
            // Error: java: non-static variable this cannot be referenced from a static context
            //System.out.println(MySerializationTest.this);// static nested class does not have reference to outer class
            return s.length();
        }
    }

    interface MySerializableFunction<T, R> extends Function<T, R>, Serializable {}

    @Test
    public void testSerialization() throws IOException, ClassNotFoundException {
       deserialize(serialize(1));     // Serializing: 1, deserialized: 1
       deserialize(serialize("aaa")); // Serializing: aaa, deserialized: aaa
       deserialize(serialize(new MySerializableClass())); // Serializing: com.bawi.serialization.MySerializableClass@4520ebad, deserialized: com.bawi.serialization.MySerializableClass@4f933fd1
       deserialize(serialize(new MyStaticNestedSerializableClass())); // Serializing: com.bawi.serialization.MySerializationTest$MyStaticNestedSerializableClass@4520ebad, deserialized: com.bawi.serialization.MySerializationTest$MyStaticNestedSerializableClass@4f933fd1
       deserialize(serialize((MySerializableFunction<String, Integer>) s -> s.length()));       
    }

    static class MyStaticNestedNonSerializableClass {} // missing implements Serializable

    class MyInnerSerializableClass implements Serializable {
        public int getLength(String s) {
            System.out.println("myJunitStringField=" + myJunitStringField); // direct access to private outer class fields
            System.out.println(MySerializationTest.this); // reference to enclosing instance of the outer class
            return s.length();
        }
    }

    @Test//(expected = java.io.NotSerializableException.class)
    public void failWithNotSerializableException() throws IOException, ClassNotFoundException {
        // NotSerializableException: com.bawi.serialization.MySerializationTest$MyStaticNestedNonSerializableClass
//        deserialize(serialize(new MyStaticNestedNonSerializableClass()));

        // inner (non static nested) class has reference to non-serializable outer class (junit com.bawi.serialization.MySerializationTest)

        MyInnerSerializableClass myInnerSerializableClass = new MyInnerSerializableClass();
        myInnerSerializableClass.getLength("a"); // myJunitStringField=hello
                                                 // com.bawi.serialization.MySerializationTest@1324409e
        // java.io.NotSerializableException: com.bawi.serialization.MySerializationTest
//        deserialize(serialize(myInnerSerializableClass));

        class MyLocalSerializableClass implements Serializable {
            @Override
            public String toString() {
                return MySerializationTest.this + "";
            }
        }
        // java.io.NotSerializableException: com.bawi.serialization.MySerializationTest
//        deserialize(serialize(new MyLocalSerializableClass()));


        //anonymous is inner class: java.io.NotSerializableException: com.bawi.serialization.MySerializationTest
//        deserialize(serialize(new Serializable(){
//            @Override
//            public String toString() {
//                return MySerializationTest.this + "";
//            }
//        }));

        // like local classes, anonymous classes have access to local variables of enclosing scope that are effectively final
        // anonymous class definition (within {}) is an expression and must be part of the statement and end with semicolon
        MyStaticNestedSerializableClass myAnonymousInnerSerializableClass = new MyStaticNestedSerializableClass() {
            @Override
            public int getLength(String s) {
                System.out.println("myJunitStringField=" + myJunitStringField); // direct access to private outer class fields
                System.out.println(MySerializationTest.this);
                return s.length() + 2;
            }
        };
        myAnonymousInnerSerializableClass.getLength("aa");
        // java.io.NotSerializableException: com.bawi.serialization.MySerializationTest
//        deserialize(serialize(myAnonymousInnerSerializableClass));

        //java.io.NotSerializableException: com.bawi.serialization.MySerializationTest$$Lambda$1/1576861390
//        deserialize(serialize((Function<String, Integer>) s -> s.length()));
    }
}

If we make MySerializationTest implements Serializable then we will not get java.io.NotSerializableException: com.bawi.serialization.MySerializationTest

Generics with java collection

import java.util.Arrays;
import java.util.List;

public class MyGenerics {
    static class SuperClass {}
    static class SubClass extends SuperClass {}

    public static void main(String[] args) {
// get1:
        List<SuperClass> superClasses = get1(Arrays.asList(new SuperClass()));

        // Error: java: incompatible types: List<MyGenerics.SuperClass> cannot be converted to List<MyGenerics.SubClass>
        //List<SubClass> subClasses = get1(Arrays.asList(new SubClass()));

// get2 (none compiles):
        // Error: java: incompatible types: List<capture#1 of ? extends MyGenerics.SuperClass> cannot be converted to List<MyGenerics.SuperClass>
        //List<SuperClass> superClasses2 = get2(Arrays.asList(new SuperClass()));

        // Error: java: incompatible types: List<capture#1 of ? extends MyGenerics.SuperClass> cannot be converted to List<MyGenerics.SubClass>
        //List<SubClass> subClasses2 = get2(Arrays.asList(new SubClass()));

// get3: (all compiles):
        List<SuperClass> superClasses3 = get3(Arrays.asList(new SuperClass()));
        List<SubClass> subClasses3 = get3(Arrays.asList(new SubClass()));
        List<SuperClass> subClasses3_1 = get3(Arrays.asList(new SubClass()));
        List<SuperClass> subClasses3_2 = get3(Arrays.asList(new SubClass(), new SuperClass()));
    }

    static List<SuperClass> get1(List<SuperClass> input) { return null; }

    static List<? extends SuperClass> get2(List<? extends SuperClass> input) { return null; }

    static <T extends SuperClass> List<T> get3(List<T> input) { return null; }
}

http forward and redirect in servlet with Intellij dynamic code classes reload to Jetty

1. Create simplest project with HttpServlet and JSP.

me@MacBook:~/dev/my-servlet$ find . -type f | grep -v .idea | grep -v target | grep -v *.iml
./pom.xml
./src/main/webapp/index.html
./src/main/webapp/redirected.jsp
./src/main/webapp/forwarded.jsp
./src/main/java/com/bawi/servlet/MyServlet.java

./pom.xml:

<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.bawi</groupId>
    <artifactId>my-servlet</artifactId>
    <packaging>war</packaging>
    <version>0.1-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-war-plugin</artifactId>
                <version>3.3.1</version>
            </plugin>

            <plugin>
                <groupId>org.eclipse.jetty</groupId>
                <artifactId>jetty-maven-plugin</artifactId>
                <version>9.4.35.v20201120</version>
            </plugin>

        </plugins>
    </build>

    <dependencies>
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>javax.servlet-api</artifactId>
            <version>4.0.1</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

</project>

./src/main/java/com/bawi/servlet/MyServlet.java:

package com.bawi.servlet;

import javax.servlet.RequestDispatcher;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.Date;

@WebServlet(urlPatterns = {"/do"})
public class MyServlet extends HttpServlet {
    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        String action = req.getParameter("action");
        if (action == null) {
            resp.setContentType("text/plain;charset=UTF-8");
            ServletOutputStream out = resp.getOutputStream();
            out.print("Hello from MyServlet!" +  new Date());
            return;
        }

        switch (action) {
            case "forward": {
                ServletContext servletContext = getServletContext();
                RequestDispatcher dispatcher = servletContext.getRequestDispatcher("/forwarded.jsp");
                dispatcher.forward(req, resp);
                break;
            }
            case "redirect": {
                resp.sendRedirect(req.getContextPath() + "/redirected.jsp");
                break;
            }
        }
    }
}

./src/main/webapp/index.html:

<!DOCTYPE html>
<html lang="en">
    <head>
        <meta charset="UTF-8">
        <title>Home page</title>
    </head>
    <body>
        <p>This is home page.</p>
        <p>Call <a href="/do?action=forward">forward</a></p>
        <p>Call <a href="/do?action=redirect">redirect</a></p>
    </body>
</html>

./src/main/webapp/forwarded.jsp:

<!DOCTYPE html>
<html lang="en">
    <head>
        <meta charset="UTF-8">
        <title>Forwarded page</title>
    </head>
    <body>
        <p>Action parameter: <%= request.getParameter("action") %>. This is forwarded page. Go to <a href="/">home</a></p>
    </body>
</html>

./src/main/webapp/redirected.jsp:

<!DOCTYPE html>
<html lang="en">
    <head>
        <meta charset="UTF-8">
        <title>Redirected page</title>
    </head>
    <body>
        <p>Action parameter: <%= request.getParameter("action") %>. This is redirected page. Go to <a href="/">home</a></p>
    </body>
</html>
me@MacBook:~/dev/my-servlet$ mvn jetty:run
[INFO] webAppSourceDirectory not set. Trying src/main/webapp
[INFO] Reload Mechanic: automatic
[INFO] nonBlocking:false
[INFO] Classes = /Users/me/dev/my-servlet/target/classes
[INFO] Configuring Jetty for project: my-servlet
[INFO] Logging initialized @3943ms to org.eclipse.jetty.util.log.Slf4jLog
[INFO] Context path = /
[INFO] Tmp directory = /Users/me/dev/my-servlet/target/tmp
[INFO] Web defaults = org/eclipse/jetty/webapp/webdefault.xml
[INFO] Web overrides =  none
[INFO] web.xml file = null
[INFO] Webapp directory = /Users/me/dev/my-servlet/src/main/webapp
[INFO] jetty-9.4.35.v20201120; built: 2020-11-20T21:17:03.964Z; git: bdc54f03a5e0a7e280fab27f55c3c75ee8da89fb; jvm 1.8.0_241-b07
[INFO] Scanning elapsed time=26ms
[INFO] DefaultSessionIdManager workerName=node0
[INFO] No SessionScavenger set, using defaults
[INFO] node0 Scavenging every 660000ms
[INFO] Started o.e.j.m.p.JettyWebAppContext@760cf594{/,file:///Users/me/dev/my-servlet/src/main/webapp/,AVAILABLE}{file:///Users/me/dev/my-servlet/src/main/webapp/}
[INFO] Started ServerConnector@254f906e{HTTP/1.1, (http/1.1)}{0.0.0.0:8080}
[INFO] Started @4422ms
[INFO] Started Jetty Server

2. Testing

curl http://localhost:8080/do
Hello from MyServlet!Wed Dec 02 15:09:17 CET 2020

forward
Note action parameter is passed automatically

me@MacBook:~$ curl -vv http://localhost:8080/do?action=forward
*   Trying ::1...
* TCP_NODELAY set
* Connected to localhost (::1) port 8080 (#0)
> GET /do?action=forward HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.64.1
> Accept: */*
> 
< HTTP/1.1 200 OK
< Date: Wed, 02 Dec 2020 14:11:26 GMT
< Content-Type: text/html;charset=utf-8
< Set-Cookie: JSESSIONID=node0vruwimbjjsy5uwywzhy51gy81.node0; Path=/
< Expires: Thu, 01 Jan 1970 00:00:00 GMT
< Content-Length: 248
< Server: Jetty(9.4.35.v20201120)
< 
<!DOCTYPE html>
<html lang="en">
    <head>
        <meta charset="UTF-8">
        <title>Forwarded page</title>
    </head>
    <body>
        <p>Action parameter: forward. This is forwarded page. Go to <a href="/">home</a></p>
    </body>
* Connection #0 to host localhost left intact
</html>* Closing connection 0

redirect
Note response code is 302 and redirect location is returned.

me@MacBook:~$ curl -vv http://localhost:8080/do?action=redirect
*   Trying ::1...
* TCP_NODELAY set
* Connected to localhost (::1) port 8080 (#0)
> GET /do?action=redirect HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.64.1
> Accept: */*
> 
< HTTP/1.1 302 Found
< Date: Wed, 02 Dec 2020 14:11:39 GMT
< Location: http://localhost:8080/redirected.jsp
< Content-Length: 0
< Server: Jetty(9.4.35.v20201120)
< 
* Connection #0 to host localhost left intact
* Closing connection 0

With option to follow redirect location:
Note action parameter is NOT passed to second request.

me@MacBook:~$ curl -v -L http://localhost:8080/do?action=redirect
*   Trying ::1...
* TCP_NODELAY set
* Connected to localhost (::1) port 8080 (#0)
> GET /do?action=redirect HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.64.1
> Accept: */*
> 
< HTTP/1.1 302 Found
< Date: Wed, 02 Dec 2020 17:14:59 GMT
< Location: http://localhost:8080/redirected.jsp
< Content-Length: 0
< Server: Jetty(9.4.35.v20201120)
< 
* Connection #0 to host localhost left intact
* Issue another request to this URL: 'http://localhost:8080/redirected.jsp'
* Found bundle for host localhost: 0x7fe33a818310 [can pipeline]
* Could pipeline, but not asked to!
* Re-using existing connection! (#0) with host localhost
* Connected to localhost (::1) port 8080 (#0)
> GET /redirected.jsp HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.64.1
> Accept: */*
> 
< HTTP/1.1 200 OK
< Date: Wed, 02 Dec 2020 17:14:59 GMT
< Content-Type: text/html;charset=utf-8
< Set-Cookie: JSESSIONID=node0gd5vxmsu6xgkh3ltwabj2jh810.node0; Path=/
< Expires: Thu, 01 Jan 1970 00:00:00 GMT
< Content-Length: 247
< Server: Jetty(9.4.35.v20201120)
< 
<!DOCTYPE html>
<html lang="en">
    <head>
        <meta charset="UTF-8">
        <title>Redirected page</title>
    </head>
    <body>
        <p>Action parameter: null. This is redirected page. Go to <a href="/">home</a></p>
    </body>
* Connection #0 to host localhost left intact
</html>* Closing connection 0
browser automatically makes second request to redirected page

3. Automatically reload classes and resources when debugging (change is seen after first request)

jetty server with update classes and resources
add exploded war
overwrite root context to / instead of default artifact name and version
me@MacBook:~$ curl  http://localhost:8080/do
Hello from MyServlet!Wed Dec 02 18:47:14 CET 2020

Now modify servlet java code

first request to trigger reloading

me@MacBook:~$ curl  http://localhost:8080/do
Hello from MyServlet!Wed Dec 02 18:48:03 CET 2020

second request returns modified response

me@MacBook:~$ curl  http://localhost:8080/do
Hello from MyServlet! - modifiedWed Dec 02 18:48:07 CET 2020

Intellij debugging local apache flume with netcat source and logger sink

1. Download apache-flume-1.6.0-bin.tar.gz, un-compress and run normally flume using bash start script

me@MacBook:~/Downloads/apache-flume-1.6.0-bin$ bin/flume-ng agent \
  --conf conf --conf-file conf/example.conf --name a1 \
  -Dflume.root.logger=DEBUG,console -Dorg.apache.flume.log.printconfig=true \
  -Dorg.apache.flume.log.rawdata=true

with netcat source and logger sink

me@MacBook:~/Downloads/apache-flume-1.6.0-bin$ cat conf/example.conf
# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c

2. Examine process command via ps -ef or watch the logs:

+ exec /Library/Java/JavaVirtualMachines/jdk1.8.0_241.jdk/Contents/Home/bin/java -Xmx20m -Dflume.root.logger=DEBUG,console -Dorg.apache.flume.log.printconfig=true -Dorg.apache.flume.log.rawdata=true -cp '/Users/me/Downloads/apache-flume-1.6.0-bin/conf:/Users/me/Downloads/apache-flume-1.6.0-bin/lib/*' -Djava.library.path= org.apache.flume.node.Application --conf-file conf/example.conf --name a1

3. Add debugging options -Xdebug -Xrunjdwp:server=y,transport=dt_socket,address=5005,suspend=y:

me@MacBook:~/Downloads/apache-flume-1.6.0-bin$ /Library/Java/JavaVirtualMachines/jdk1.8.0_241.jdk/Contents/Home/bin/java \
  -Xdebug -Xrunjdwp:server=y,transport=dt_socket,address=5005,suspend=y \
  -Xmx20m -Dflume.root.logger=DEBUG,console -Dorg.apache.flume.log.printconfig=true \
  -Dorg.apache.flume.log.rawdata=true \
  -cp /Users/me/Downloads/apache-flume-1.6.0-bin/conf:/Users/me/Downloads/apache-flume-1.6.0-bin/lib/* \
  -Djava.library.path= org.apache.flume.node.Application \
  --conf-file conf/example.conf --name a1
Listening for transport dt_socket at address: 5005

or when using kerberized kafka:

/Library/Java/JavaVirtualMachines/jdk1.8.0_241.jdk/Contents/Home/bin/java \
-Xdebug -Xrunjdwp:server=y,transport=dt_socket,address=5005,suspend=y \
-Xmx20m -Dflume.root.logger=DEBUG,console -Dorg.apache.flume.log.printconfig=true \
-Dorg.apache.flume.log.rawdata=true\
-Dflume.log.dir=/tmp/flume -Dagent.id=localhost \
-Djava.security.auth.login.config=/Users/me/jaas.conf \
-Djava.security.krb5.conf=/Users/me/krb5.conf \
-cp '/Users/me/apache-flume-1.6.0-bin/conf:/Users/me/apache-flume-1.6.0-bin/lib/*' \
org.apache.flume.node.Application --conf-file conf/kafka-dev.conf --name a1

4. Create or use maven project with flume-ng-core dependency:

<dependency>
  <groupId>org.apache.flume</groupId>
  <artifactId>flume-ng-core</artifactId>
  <version>1.6.0</version>
</dependency>

5. In Intellij create a debug configuration:

and click debug to get connected to the process:

6. Set the breakpoint in the process method of org.apache.flume.sink.LoggerSink:

7. Start netcat and send sample text:

me@MacBook:~$ telnet localhost 44444
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
aaaaa
OK

Intellij caller, method and type hierarchy + UML classes diagram

public interface MyIface {
    Number getNumericValue() throws RuntimeException;
}

public class MyIfaceImpl implements MyIface {
    @Override
    public Number getNumericValue() throws RuntimeException {
        System.out.println("MyIfaceImpl: in getNumericValue");
        return null;
    }
}

public abstract class MyAbstractClass implements MyIface {
    public abstract Number doInGet();

    @Override
    public Integer getNumericValue() throws IllegalArgumentException { // OK
        System.out.println("MyAbstractClass: in getNumericValue");
        doInGet();
        return null;
    }

    /*@Override
    public Long getNumericValue() { // OK (neither RuntimeException nor Exception does not need to included in overriding method signature)
        return null;
    }*/

    /*@Override
    public Number getNumericValue() throws Exception {  //Error:(7, 20) java: getNumericValue() in myhierarchy.MyAbstractClass cannot implement getNumericValue() in myhierarchy.MyIface overridden method does not throw java.lang.Exception
        return null;
    }*/

    /*@Override
    public Object getNumericValue() throws RuntimeException {  //Error:(17, 19) java: getNumericValue() in myhierarchy.MyAbstractClass cannot implement getNumericValue() in myhierarchy.MyIface         return type java.lang.Object is not compatible with java.lang.Number
        return null;
    }*/
}

public class MyAbstractClassImpl extends MyAbstractClass {
    @Override
    public Number doInGet() {
        System.out.println("MyImpl: in doInGet");
        return null;
    }
}

public class AllCaller {
    public static void main(String[] args) {
        MyIface myIface = new MyIfaceImpl();
        myIface.getNumericValue();
        MyAbstractClass myAbstractClass = new MyAbstractClassImpl();
        myAbstractClass.getNumericValue();
    }
}

import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.function.Supplier;

public class RandomCallerApp {
    public static void main(String[] args) {
        Supplier myIfaceSupplier = get();
        System.out.println("supplier created, value lazy");
        myIfaceSupplier.get().getNumericValue();
    }

    private static Supplier get() {
        List myIfaces = Arrays.asList(new MyIfaceImpl(), new MyAbstractClassImpl());
        int i = new Random().nextInt(myIfaces.size());
        return () -> myIfaces.get(i);
    }
}

 

Ctrl+Alt+Shift+U – generate UML

Screenshot 2020-04-26 at 08.18.55

My shortcut:
F3 – go to declaration or view usages
Ctrl+Shift+C – caller hierarchy

Screenshot 2020-04-26 at 08.14.51

My shortcut:
F4 or Ctrl+Shift+I – navigate/choose method implementations
Ctrl+Shift+M – method hierarchy

Screenshot 2020-04-26 at 08.12.15

My shortcut: Ctrl+Shift+G – find usages:

Screenshot 2020-04-26 at 08.43.13

 

 

 

 

Scala for Java developers – decompiles Scala classes to Java byte code

Classes in scala (decompiled to Java byte code to see generated methods)

scala> me@MacBook:~$ scala
Welcome to Scala 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_192).
Type in expressions for evaluation. Or try :help.

scala> class Person(name: String, age: Int) {
     | println("In default constructor") 
     | override def toString = "Person(name=" + name + ",age=" + age + ")"
     | }
defined class Person

scala> new Person("Bob",18)
In default constructor
res0: Person = Person(name=Bob,age=18)

scala> :javap -p Person
Compiled from ""
public class Person {
  private final java.lang.String name;
  private final int age;
  public java.lang.String toString();
  public Person(java.lang.String, int);
}


scala> :paste
// Entering paste mode (ctrl-D to finish)

class Person(name: String, age: Int) {
  override def toString = "Person(name=" + name + ",age=" + age + ")"
}

object Person {
  def apply(name: String, age: Int) = {   // create factory method(s) using apply method(s)
    new Person(name,age)
  }
}

// Exiting paste mode, now interpreting.

defined class Person
defined object Person   // companion object

scala> new Person("Alice",19)
res0: Person = Person(name=Alice,age=19)

scala> Person.apply("Alice", 19)         // access companion object methods by object-name.method-name
res1: Person = Person(name=Alice,age=19)

scala> Person("Alice", 19)               // can even skip typing apply method
res2: Person = Person(name=Alice,age=19)

scala> :javap -p Person$
Compiled from ""
public class Person$ {
  public static final Person$ MODULE$;
  public static {};
  public Person apply(java.lang.String, int);
  public Person$();
}

scala> class Person(val name: String, val age: Int) {
     | println("In default constructor") 
     | override def toString = "Person(name=" + name + ",age=" + age + ")"
     | }
defined class Person

scala> :javap -p Person
Compiled from ""
public class Person {
  private final java.lang.String name;
  private final int age;
  public java.lang.String name();
  public int age();
  public java.lang.String toString();
  public Person(java.lang.String, int);
}

scala> new Person("Bob",18)
In default constructor
res1: Person = Person(name=Bob,age=18)

scala> res1.name
res2: String = Bob

scala> res1.name()
:13: error: not enough arguments for method apply: (index: Int)Char in class StringOps.
Unspecified value parameter index.
       res1.name()
                ^

scala> class Person(var name: String, var age: Int) {
     | override def toString = "Person(name=" + name + ",age=" + age + ")"
     | }
defined class Person

scala> :javap -p Person
Compiled from ""
public class Person {
  private java.lang.String name;
  private int age;
  public java.lang.String name();
  public void name_$eq(java.lang.String);
  public int age();
  public void age_$eq(int);
  public java.lang.String toString();
  public Person(java.lang.String, int);
}

scala> new Person("Bob",18)
res4: Person = Person(name=Bob,age=18)

scala> res4.name_=("Bobby")

scala> res4
res6: Person = Person(name=Bobby,age=18)

scala> res4.name = "Bobby2"
res4.name: String = Bobby2

scala> res4
res7: Person = Person(name=Bobby2,age=18)

scala> case class Person(var name: String, var age: Int) // var to add setter methods
defined class Person

scala> new Person("Alice",17)
res0: Person = Person(Alice,17) // case class has by default toString with fields (instead byte address)

scala> Person("Bobby",18)  // automatically added companion object with apply method
res1: Person = Person(Bobby,18)

scala> :javap -p Person
Compiled from ""
public class Person implements scala.Product,scala.Serializable {
  private java.lang.String name;
  private int age;
  public java.lang.String name();
  public void name_$eq(java.lang.String);  // var to add setter methods
  public int age();
  public void age_$eq(int);
  public Person copy(java.lang.String, int);
  public java.lang.String copy$default$1();
  public int copy$default$2();
  public java.lang.String productPrefix();
  public int productArity();
  public java.lang.Object productElement(int);
  public scala.collection.Iterator productIterator();
  public boolean canEqual(java.lang.Object);
  public int hashCode();
  public java.lang.String toString();
  public boolean equals(java.lang.Object);
  public Person(java.lang.String, int);
}

scala> :javap -p scala.Serializable
Compiled from "Serializable.scala"
public interface scala.Serializable extends java.io.Serializable {
}

scala> :javap -p java.io.Serializable
Compiled from "Serializable.java"
public interface java.io.Serializable {
}

scala> :javap -p scala.Product
Compiled from "Product.scala"
public interface scala.Product extends scala.Equals {
  public abstract java.lang.Object productElement(int);
  public abstract int productArity();
  public abstract scala.collection.Iterator productIterator();
  public abstract java.lang.String productPrefix();
}

scala> :javap -p Person$  // automatically added companion object with apply method
Compiled from ""
public class Person$ extends scala.runtime.AbstractFunction2 implements scala.Serializable {
  public static final Person$ MODULE$;
  public static {};
  public final java.lang.String toString();
  public Person apply(java.lang.String, int);
  public scala.Option<scala.Tuple2> unapply(Person);
  private java.lang.Object readResolve();
  public java.lang.Object apply(java.lang.Object, java.lang.Object);
  public Person$();
}

scala> case class Person(name: String, age: Int)  // no var before args - no setters generated
defined class Person

scala> new Person("Alice",17)
res0: Person = Person(Alice,17)

scala> Person("Bobby",18)
res1: Person = Person(Bobby,18)

scala> :javap -p Person
Compiled from ""
public class Person implements scala.Product,scala.Serializable {
  private final java.lang.String name;
  private final int age;
  public java.lang.String name();
  public int age();
  public Person copy(java.lang.String, int);
  public java.lang.String copy$default$1();
  public int copy$default$2();
  public java.lang.String productPrefix();
  public int productArity();
  public java.lang.Object productElement(int);
  public scala.collection.Iterator productIterator();
  public boolean canEqual(java.lang.Object);
  public int hashCode();
  public java.lang.String toString();
  public boolean equals(java.lang.Object);
  public Person(java.lang.String, int);
}

scala> :javap -p Person$
Compiled from ""
public class Person$ extends scala.runtime.AbstractFunction2 implements scala.Serializable {
  public static final Person$ MODULE$;
  public static {};
  public final java.lang.String toString();
  public Person apply(java.lang.String, int);
  public scala.Option<scala.Tuple2> unapply(Person);
  private java.lang.Object readResolve();
  public java.lang.Object apply(java.lang.Object, java.lang.Object);
  public Person$();
}

scala> case class Person(name: String, age: Int) { // override constructor and toString method in case class
     | println("In my constructor")
     | override def toString = name + age
     | }
defined class Person

scala> new Person("a",1)
In my constructor
res4: Person = a1

 

Scala for Java developers


class MyClass {
  var myname: String = _ // unitialize to default value // var so that cannot be changed
  var myseq: Seq[Any] = Nil // empty seq
  def mytrim: String = myname.trim
  
}
  
case class Person(age: Int, name: String)
  
object PersonImplicits {
  implicit class PersonHelper(person: Person) {
    def describe: String = s"Person[${person.age},${person.name}]"
  }
}
  
  
object MyClass {
  def main(args: Array[String]): Unit = {
 
    println(sum(1, 10, i => i)) // 55 pass directly function body
 
    println(sum(1, 10, square)) // 385 pass method calculating a square
 
    def square(i: Int): Int = i * i // declare method after referencing it
  
    import java.util
  
    val javaList: util.List[String] = new util.ArrayList[String] // import java jdk List and ArrayList
    javaList.add("a")
  
    import scala.collection.mutable
    import scala.collection.JavaConverters._
    val scalaSeq: mutable.Seq[String] = javaList.asScala // convert java javaList to scala mutable seq
  
    // alternative ways to println elements
    scalaSeq.foreach(s => println(s))
    scalaSeq.foreach(println(_))
    scalaSeq.foreach(println _)
    scalaSeq.foreach(println)
  
    val list: List[String] = scalaSeq.toList // convert from seq to list
    scalaSeq.foreach(println)
  
    val seq: Seq[String] = Seq("a", "b", "c") // scala Seq is a trait so equivalent to Java List interface
    seq.foreach(println)
  
    val immutableList = List(1, 2 ,3) // scala list is immutable and is an abstract class (that is extended by Nil == empty list)
    immutableList.map(10 * _).foreach(println)
  
    val ints: mutable.ListBuffer[Int] = mutable.ListBuffer(11, 22, 33)
    ints += 44 // mutable ListBuffer allows adding elements via +=, Seq does not
  
    val immutableMap: Map[String, Int] = Map("a" -> 1, "b" -> 2)
    println("map:" + immutableMap("a")) // get value by key = "a"
  
    val newImmutableMap: Map[String, Int] = immutableMap + ("c" -> 3) // create a new map based on existing and new entry
    newImmutableMap.foreach(e => println(e._1 + ":" + e._2))
  
    val mutableMap = mutable.Map("aa" -> 10)
    mutableMap += "bb" -> 20
    mutableMap.foreach{ case(k, v) => println(k + ":" + v) }
  
    printOptional(Some("a")) // value=a
    printOptional(None)      // value=empty
  
    println(divide(12,4,3))  // result 1
    println(divide(b=4,a=12,c=3)) // same result 1: alternatively with named params and different order
  
    println(divide(12,4)) // result 3: since third param has default value then no need to specify it unless overwrite
  
    val clazz = new MyClass
    clazz.myname = " aaa bbb "
    println(s"'${clazz.mytrim}'")
    clazz.myseq = Seq(12)
  
    matchcase(0)
    matchcase(1)
  
    import PersonImplicits._
    println(Person(18, "me").describe) // implicitly add describe method
  }
  
  def printOptional (optionalValue: Option[String]): Unit = {
    val value = if (optionalValue.isDefined) optionalValue.get else "empty"
    println("value=" + value)
  }
  
  def divide (a: Int, b: Int, c: Int = 1): Int = a / b / c    // value of last statement is the method return value
  
  def matchcase (n: Int): Unit = {
    n match {
      case -1 | 0 => println("non positive")
      case other => println("positive " + other)
    }
  }
 
  def sum(lb: Int, ub: Int, fun: Int => Int) = { // define a method (before referencing it) accepting function as 3rd arg
    var sum = 0 // declare mutable variable (val is immutable)
    for (el <- lb to ub) { // iterate by 1 from lb to ub
      sum += fun(el)
    }
    sum
  }
}

Unions and default value in apache avro serialization and deserialization

Initial avro schema (user.avsc) defines a User record with a name field only.

{
  "namespace": "com.bawi.avro.model",
  "type": "record",
  "name": "User",
  "fields": [
    {
      "name": "name",
      "type": "string"
    }
  ]
}

Maven pom.xml defines avro dependency

        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.8.1</version>
        </dependency>

so we can serialize the User data in Java to disc to user.avro file

        Schema schema = new Schema.Parser().parse(new File("user.avsc"));
        File avroFile = new File("target/user.avro");
        GenericRecord user = new GenericData.Record(schema);
        user.put("name", "Alyssa");
        DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
        DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
        dataFileWriter.create(schema, avroFile);
        dataFileWriter.append(user);
        dataFileWriter.close();

we can read (deserialize) User using the same schema from the disc either by Java

        DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
        DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(avroFile, datumReader);
        GenericRecord user2 = null;
        while (dataFileReader.hasNext()) {
            user2 = dataFileReader.next(user2);
            System.out.println(user2);
        }

or by using avro-utils jar that can be downloaded by maven when declared maven test dependency:

        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-tools</artifactId>
            <version>1.8.1</version>
            <scope>test</scope>
        </dependency>

and running with ‘tojson’ argument

me@MacBook:~/dev/my-projects/my-avro$ java -jar /Users/me/.m2/repository/org/apache/avro/avro-tools/1.8.1/avro-tools-1.8.1.jar tojson users.avro 
{"name":"Alyssa"}

Then we will add a new favorite_number element to the schema:

{
  "namespace": "com.bawi.avro.model",
  "type": "record",
  "name": "User",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "favorite_number",
      "type": "int"
    }
  ]
}

but not yet write favourite_number in the Java code.

When trying to write we get

org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.NullPointerException: null of int in field favorite_number of com.bawi.avro.model.User

since the favorite_number field is required by the avro schema but was not written by the writer.

Add a union of null and int value fixes the writing problem (union of int and null also works)

    {
      "name": "favorite_number",
      "type": [
        "null",
        "int"
      ]
    }

or

    {
      "name": "favorite_number",
      "type": [
        "int",
        "null"
      ]
    }

If written avro file has schema with favorite_number and it is written as null then it will always be read as null irregardless how the read schema looks like (default value affects only reading fields that were not defined in schema used for writing so the null values were not written, only schema used for reading should define that field (including default), schema used for writing should not define that field at all)

Lets assume different scenario where the write schema has only name field (without favorite_number):

{
  "namespace": "com.bawi.avro.model",
  "type": "record",
  "name": "User",
  "fields": [
    {
      "name": "name",
      "type": "string"
    }
  ]
}

and we write only name field into avro

Lets assume we want favorite_number to be set to -1 (with lets say new requirement to always populate in java code the favorite_number since we do not want to check for null for favorite_number fields when reading avro/hive table on the top of avro). Then lets modify the read schema to include default -1:
user_with_default_favourite_number.avsc:

{
  "namespace": "com.bawi.avro.model",
  "type": "record",
  "name": "User",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "favorite_number",
      "type": [
        "int",
        "null"
      ],
      "default": -1
    }
  ]
}

with

File file2 = new File("user_with_default_favourite_number.avsc");
Schema schema2 = new Schema.Parser().parse(file2);
DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema2);

and the output is:

{"name": "Alyssa", "favorite_number": -1}

If we change read schema for favorite_number to invalid:

    {
      "name": "favorite_number",
      "type": [
        "null",
        "int"
      ],
      "default": -1
    }

then we get:

org.apache.avro.AvroTypeException: Non-null default value for null type: -1

so if default non-null value is given then null in union needs to on second place.

If we want to have "default": null then on the first place in the union needs to be null:

    {
      "name": "favorite_number",
      "type": [
        "null",
        "int"
      ],
      "default": null
    }

since for invalid:

    {
      "name": "favorite_number",
      "type": [
        "int",
        "null"
      ],
      "default": null
    }

we will get

org.apache.avro.AvroTypeException: Non-numeric default value for int: null

as described in https://avro.apache.org/docs/1.7.7/spec.html#Unions

Multiple approaches to concurrent processing in Java

Goal: Lets assume we want to execute 2 long running operations: download content and parse it and we want to do it for the list of resources.

The blog points multiple approaches:
1) Thread(s)
2) ExecutorService to submit Callable and return blocking Future get()
3) ExecutionCompletionService implementing CompletionService to submit Callable and returning Future.  CompletionService has take or poll methods waiting and returning first completed task (returned Future’s get() is not blocking)
4) Parallel streams
5) CompletableFuture

public class TestConcurrent {

    private static final Logger LOGGER = LoggerFactory.getLogger(TestConcurrent.class);

    public static void main(String[] args) {
        LOGGER.info("Started");
        long startMillis = System.currentTimeMillis();

        // for each of the resource: download it and parse it and aggregate results to list:
        List<Integer> results = ...

        long stopMillis = System.currentTimeMillis();
        LOGGER.info("Finished in {} seconds, results={}", (stopMillis - startMillis) / 1000, results);
    }

Lets define the download and parse methods and simulate long running execution using configurable TimeUnit.SECONDS.sleep(seconds).

static int download(int id, int seconds) {
    LOGGER.info("Downloading {} and sleeping {} second(s)", id, seconds);
    try {
        TimeUnit.SECONDS.sleep(seconds);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    LOGGER.info("Downloaded {}", id);
    return id;
}

static int parse(int id, int seconds) {
    LOGGER.info("Parsing {} and sleeping {} second(s)", id, seconds);
    try {
        TimeUnit.SECONDS.sleep(seconds);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    LOGGER.info("Parsed {}", id);
    return id;
}

In order to make the scenario even more representative between approaches lets define number of resources as 3 and each of the resource needed to be downloaded before it can be parsed. Moreover, 1st resource will be downloaded in 3 seconds, 2nd in two seconds and 3rd in 3 seconds. Conversely, it takes 1 second to parse 1st resource, 2 seconds to parse 2nd resource and 3 seconds to parse 3rd resource.

Obviously we do not want to execute the tasks sequentially using the same thread as it would take too long (12 seconds)

private static List<Integer> mapSequentiallyWithOnlyMainThread() {
    List<Integer> numbers = Arrays.asList(1, 2, 3);

    return numbers
        .stream()
        .map(n -> download(n, numbers.size() - n + 1))
        .map(n -> parse(n, n))
        .collect(Collectors.toList());
}
17:54:06.281 [main] Started
17:54:06.391 [main] Downloading 1 and sleeping 3 second(s)
17:54:09.394 [main] Downloaded 1
17:54:09.394 [main] Parsing 1 and sleeping 1 second(s)
17:54:10.394 [main] Parsed 1
17:54:10.394 [main] Downloading 2 and sleeping 2 second(s)
17:54:12.394 [main] Downloaded 2
17:54:12.394 [main] Parsing 2 and sleeping 2 second(s)
17:54:14.394 [main] Parsed 2
17:54:14.394 [main] Downloading 3 and sleeping 1 second(s)
17:54:15.394 [main] Downloaded 3
17:54:15.395 [main] Parsing 3 and sleeping 3 second(s)
17:54:18.395 [main] Parsed 3
17:54:18.395 [main] Finished in 12 seconds, results=[1, 2, 3]

So lets use multiple threads.

  1. Threads
    private static List<Integer> threads() {
        List<Integer> numbers = Arrays.asList(1, 2, 3);
    
        return numbers
            .stream()
            .map(n -> startThreadAndJoin(() -> download(n, numbers.size() - n + 1)))
            .map(n -> startThreadAndJoin(() -> parse(n, n)))
            .collect(Collectors.toList());
    }
    
    private static Integer startThreadAndJoin(Supplier<Integer> supplier) {
        AtomicInteger value = new AtomicInteger();
        Thread thread = new Thread(() -> {
            value.set(supplier.get());
        });
        thread.start();
        try {
            thread.join();
            return value.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }
    18:02:35.132 [main] Started
    18:02:35.244 [Thread-0] Downloading 1 and sleeping 3 second(s)
    18:02:38.246 [Thread-0] Downloaded 1
    18:02:38.248 [Thread-1] Parsing 1 and sleeping 1 second(s)
    18:02:39.248 [Thread-1] Parsed 1
    18:02:39.249 [Thread-2] Downloading 2 and sleeping 2 second(s)
    18:02:41.250 [Thread-2] Downloaded 2
    18:02:41.251 [Thread-3] Parsing 2 and sleeping 2 second(s)
    18:02:43.251 [Thread-3] Parsed 2
    18:02:43.252 [Thread-4] Downloading 3 and sleeping 1 second(s)
    18:02:44.252 [Thread-4] Downloaded 3
    18:02:44.253 [Thread-5] Parsing 3 and sleeping 3 second(s)
    18:02:47.253 [Thread-5] Parsed 3
    18:02:47.253 [main] Finished in 12 seconds, results=[1, 2, 3]
  2. ExecutorService
    private static List<Integer> executorServiceGet() {
        List<Integer> numbers = Arrays.asList(1, 2, 3);
    
        ExecutorService executorService = Executors.newFixedThreadPool(numbers.size());
        List<Future<Integer>> downloadedFutures = numbers
            .stream()
            .map(n -> {
                Callable<Integer> callable = () -> download(n, numbers.size() - n + 1);
                return executorService.submit(callable);
            })
            .collect(Collectors.toList());
    
        List<Future<Integer>> parsedFutures = numbers
            .stream()
            .map(n -> {
                Future<Integer> future = downloadedFutures.get(n - 1);
                try {
                    Integer value = future.get();
                    return executorService.submit(() -> parse(value, n));
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
                return null;
            })
            .collect(Collectors.toList());
    
        return parsedFutures
            .stream()
            .map(future -> {
                try {
                    return future.get();
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
                return null;
            })
            .collect(Collectors.toList());
    }
    
    18:10:11.988 [main] Started
    18:10:12.106 [pool-1-thread-1] Downloading 1 and sleeping 3 second(s)
    18:10:12.110 [pool-1-thread-2] Downloading 2 and sleeping 2 second(s)
    18:10:12.111 [pool-1-thread-3] Downloading 3 and sleeping 1 second(s)
    18:10:13.111 [pool-1-thread-3] Downloaded 3
    18:10:14.111 [pool-1-thread-2] Downloaded 2
    18:10:15.109 [pool-1-thread-1] Downloaded 1
    18:10:15.111 [pool-1-thread-3] Parsing 1 and sleeping 1 second(s)
    18:10:15.112 [pool-1-thread-2] Parsing 2 and sleeping 2 second(s)
    18:10:15.112 [pool-1-thread-1] Parsing 3 and sleeping 3 second(s)
    18:10:16.112 [pool-1-thread-3] Parsed 1
    18:10:17.113 [pool-1-thread-2] Parsed 2
    18:10:18.112 [pool-1-thread-1] Parsed 3
    18:10:18.112 [main] Finished in 6 seconds, results=[1, 2, 3]
  3. ExecutionCompletionService
    private static List<Integer> executionCompletionServiceTakeAndGet() {
        List<Integer> numbers = Arrays.asList(1, 2, 3);
    
        ExecutorService executorService = Executors.newFixedThreadPool(numbers.size());
        ExecutorCompletionService<Integer> executorCompletionService = new ExecutorCompletionService<>(executorService);
    
        numbers
            .stream()
            .forEach(n -> executorCompletionService.submit(() -> download(n, numbers.size() - n + 1)));
    
        numbers
            .stream()
            .forEach(ignored -> {
                try {
                    Future<Integer>future = executorCompletionService.take();
                    Integer value = future.get();
                    executorCompletionService.submit(() -> parse(value, value));
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            });
    
        return numbers
            .stream()
            .map(ignore -> {
                try {
                    Future<Integer> future = executorCompletionService.take();
                    return future.get();
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
                return null;
            })
            .collect(Collectors.toList());
    }
    
    18:18:16.548 [main] Started
    18:18:16.655 [pool-1-thread-2] Downloading 2 and sleeping 2 second(s)
    18:18:16.655 [pool-1-thread-3] Downloading 3 and sleeping 1 second(s)
    18:18:16.655 [pool-1-thread-1] Downloading 1 and sleeping 3 second(s)
    18:18:17.658 [pool-1-thread-3] Downloaded 3
    18:18:17.660 [pool-1-thread-3] Parsing 3 and sleeping 3 second(s)
    18:18:18.658 [pool-1-thread-2] Downloaded 2
    18:18:18.658 [pool-1-thread-2] Parsing 2 and sleeping 2 second(s)
    18:18:19.658 [pool-1-thread-1] Downloaded 1
    18:18:19.658 [pool-1-thread-1] Parsing 1 and sleeping 1 second(s)
    18:18:20.658 [pool-1-thread-2] Parsed 2
    18:18:20.658 [pool-1-thread-1] Parsed 1
    18:18:20.660 [pool-1-thread-3] Parsed 3
    18:18:20.660 [main] Finished in 4 seconds, results=[2, 1, 3]
  4. Parallel streams
    private static List<Integer> mapParallel() {
        List<Integer> numbers = Arrays.asList(1, 2, 3);
    
        return numbers
            .parallelStream()
            .map(n -> download(n, numbers.size() - n + 1))
            .map(n -> parse(n, n))
            .collect(Collectors.toList());
    }
    18:23:51.300 [main] Started
    18:23:51.413 [main] Downloading 2 and sleeping 2 second(s)
    18:23:51.413 [ForkJoinPool.commonPool-worker-1] Downloading 1 and sleeping 3 second(s)
    18:23:51.413 [ForkJoinPool.commonPool-worker-2] Downloading 3 and sleeping 1 second(s)
    18:23:52.416 [ForkJoinPool.commonPool-worker-2] Downloaded 3
    18:23:52.416 [ForkJoinPool.commonPool-worker-2] Parsing 3 and sleeping 3 second(s)
    18:23:53.416 [main] Downloaded 2
    18:23:53.416 [main] Parsing 2 and sleeping 2 second(s)
    18:23:54.416 [ForkJoinPool.commonPool-worker-1] Downloaded 1
    18:23:54.416 [ForkJoinPool.commonPool-worker-1] Parsing 1 and sleeping 1 second(s)
    18:23:55.416 [main] Parsed 2
    18:23:55.416 [ForkJoinPool.commonPool-worker-1] Parsed 1
    18:23:55.416 [ForkJoinPool.commonPool-worker-2] Parsed 3
    18:23:55.416 [main] Finished in 4 seconds, results=[1, 2, 3]
  5. CompletableFuture
    private static List<Integer> completableFuture() {
        List<Integer> numbers = Arrays.asList(1, 2, 3);
    
        List<CompletableFuture<Integer>> cfs = numbers
            .stream()
            .map(n ->
                CompletableFuture.supplyAsync(() -> download(n, numbers.size() - n + 1))
                    .thenApply(id -> parse(id, n)))
            .collect(Collectors.toList());
    
        CompletableFuture<List<Integer>> allResultsCF = CompletableFuture.allOf(cfs.toArray(new CompletableFuture[numbers.size()]))
            .thenApply(v -> cfs
                .stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
            );
    
        return allResultsCF.join();
    
18:25:25.722 [main] Started
18:25:25.867 [ForkJoinPool.commonPool-worker-1] Downloading 1 and sleeping 3 second(s)
18:25:25.869 [ForkJoinPool.commonPool-worker-3] Downloading 3 and sleeping 1 second(s)
18:25:25.868 [ForkJoinPool.commonPool-worker-2] Downloading 2 and sleeping 2 second(s)
18:25:26.871 [ForkJoinPool.commonPool-worker-3] Downloaded 3
18:25:26.872 [ForkJoinPool.commonPool-worker-3] Parsing 3 and sleeping 3 second(s)
18:25:27.871 [ForkJoinPool.commonPool-worker-2] Downloaded 2
18:25:27.871 [ForkJoinPool.commonPool-worker-2] Parsing 2 and sleeping 2 second(s)
18:25:28.871 [ForkJoinPool.commonPool-worker-1] Downloaded 1
18:25:28.871 [ForkJoinPool.commonPool-worker-1] Parsing 1 and sleeping 1 second(s)
18:25:29.871 [ForkJoinPool.commonPool-worker-1] Parsed 1
18:25:29.871 [ForkJoinPool.commonPool-worker-2] Parsed 2
18:25:29.872 [ForkJoinPool.commonPool-worker-3] Parsed 3
18:25:29.874 [main] Finished in 4 seconds, results=[1, 2, 3]

If we provide our own pool we get:

CompletableFuture.supplyAsync(() -> download(n, numbers.size() - n + 1), executorService)
    .thenApplyAsync(id -> parse(id, n), executorService)
18:31:47.902 [main] Started
18:31:47.995 [pool-1-thread-1] Downloading 1 and sleeping 3 second(s)
18:31:47.997 [pool-1-thread-3] Downloading 3 and sleeping 1 second(s)
18:31:47.997 [pool-1-thread-2] Downloading 2 and sleeping 2 second(s)
18:31:48.999 [pool-1-thread-3] Downloaded 3
18:31:48.999 [pool-1-thread-3] Parsing 3 and sleeping 3 second(s)
18:31:49.999 [pool-1-thread-2] Downloaded 2
18:31:49.999 [pool-1-thread-2] Parsing 2 and sleeping 2 second(s)
18:31:50.999 [pool-1-thread-1] Downloaded 1
18:31:50.999 [pool-1-thread-1] Parsing 1 and sleeping 1 second(s)
18:31:51.999 [pool-1-thread-1] Parsed 1
18:31:51.999 [pool-1-thread-3] Parsed 3
18:31:51.999 [pool-1-thread-2] Parsed 2
18:31:52.001 [main] Finished in 4 seconds, results=[1, 2, 3]

 

Completable future – async and non blocking callbacks

 

package com.bawi.completable.future;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.concurrent.CompletableFuture;

public class MyCompletableFuture {
    private static final Logger LOGGER = LoggerFactory.getLogger(MyCompletableFuture.class);

    public static void main(String[] args) throws InterruptedException {
        System.out.println(Thread.currentThread().getName() + ": started");
        Arrays.asList("a", "ab", "abc", "abcd")
            .stream()
            .forEach(text -> {
                CompletableFutur<String> stringCF = CompletableFuture.supplyAsync(() -> produceText(text));
                CompletableFuture<Integer> integerCF = stringCF.thenApply(MyCompletableFuture::calcStringLength);
                CompletableFuture<Double> doubleCF = integerCF.thenApply(MyCompletableFuture::calcCirclePerimeter);
                CompletableFuture<Void> voidCF = doubleCF.thenAccept(MyCompletableFuture::print);
                CompletableFuture<Void> voidCF2 = voidCF.thenRun(() -> System.out.println("DONE"));
                }
            );
        System.out.println(Thread.currentThread().getName() + ": created completable future, about to sleep");
        Thread.sleep(5000);
        System.out.println(Thread.currentThread().getName() + ": finished");
   }

    static String produceText(String text)  {
        LOGGER.info("[daemon={}] 1: Producing text: {}", Thread.currentThread().isDaemon(), text);
        sleepMillis(1000);
        return text;
    }

    static int calcStringLength(String text) {
        int length = text.length();
        LOGGER.info("[daemon={}] 2: Calculating string length: {}", Thread.currentThread().isDaemon(), length);
        sleepMillis(1000);
        return length;
    }

    static double calcCirclePerimeter(int r) {
        double perimeter = 2 * Math.PI * r;
        LOGGER.info("[daemon={}] 3: Calculating circle perimeter: {}", Thread.currentThread().isDaemon(), perimeter);
        sleepMillis(1000);
        return perimeter;
    }

    static void print(Double d) {
        LOGGER.info("[daemon={}] 4: Printing: {}", Thread.currentThread().isDaemon(), d);
        System.out.println("d=" + d);
    }

    private static void sleepMillis(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Output (filtered) – note 4 things:
1) that creation of first CompletableFuture is (only) via supplyAsync method that intenally uses a thread pool: either implicit ForkJoinPool (if not specified as above) or takes explicit executor
2) the CompletableFuture creation and execution is async and non blocking (main thread immediately move forward)
3) as soon as the CompletableFuture is created then the supplied task is executed (worker-1 started producing text before main thread started to sleep)
4) if all the subsequent methods e.g. thenApply are not async then the same thread will be used to execute subsequent processing callback on that future completion (the same thread worker-1 executed all the steps)

main: started
19:13:56.653 [ForkJoinPool.commonPool-worker-1] INFO [daemon=true] 1: Producing text: a
main: created completable future, about to sleep
19:13:57.664 [ForkJoinPool.commonPool-worker-1] INFO [daemon=true] 2: Calculating string length: 1
19:13:58.664 [ForkJoinPool.commonPool-worker-1] INFO [daemon=true] 3: Calculating circle perimeter: 6.283185307179586
19:13:59.667 [ForkJoinPool.commonPool-worker-1] INFO [daemon=true] 4: Printing: 6.283185307179586
main: finished

If we change the method to be all async to thenAplyAsynch, thenAcceptAsync and thenRunAsync then there is no quarantee that the same thread will execute all subsequent proccessing for that future – note below that worker-1 executed here first 2 tasks and worker-2 other 2 tasks:

main: started
19:04:37.022 [ForkJoinPool.commonPool-worker-1] [daemon=true] 1: Producing text: a
19:04:38.031 [ForkJoinPool.commonPool-worker-1] [daemon=true] 2: Calculating string length: 1
19:04:39.045 [ForkJoinPool.commonPool-worker-2] [daemon=true] 3: Calculating circle perimeter: 6.283185307179586
19:04:40.049 [ForkJoinPool.commonPool-worker-2] [daemon=true] 4: Printing: 6.283185307179586
DONE
main: finished