Scala for Java developers – decompiles Scala classes to Java byte code

Classes in scala (decompiled to Java byte code to see generated methods)

scala> me@MacBook:~$ scala
Welcome to Scala 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_192).
Type in expressions for evaluation. Or try :help.

scala> class Person(name: String, age: Int) {
     | println("In default constructor") 
     | override def toString = "Person(name=" + name + ",age=" + age + ")"
     | }
defined class Person

scala> new Person("Bob",18)
In default constructor
res0: Person = Person(name=Bob,age=18)

scala> :javap -p Person
Compiled from ""
public class Person {
  private final java.lang.String name;
  private final int age;
  public java.lang.String toString();
  public Person(java.lang.String, int);
}


scala> :paste
// Entering paste mode (ctrl-D to finish)

class Person(name: String, age: Int) {
  override def toString = "Person(name=" + name + ",age=" + age + ")"
}

object Person {
  def apply(name: String, age: Int) = {   // create factory method(s) using apply method(s)
    new Person(name,age)
  }
}

// Exiting paste mode, now interpreting.

defined class Person
defined object Person   // companion object

scala> new Person("Alice",19)
res0: Person = Person(name=Alice,age=19)

scala> Person.apply("Alice", 19)         // access companion object methods by object-name.method-name
res1: Person = Person(name=Alice,age=19)

scala> Person("Alice", 19)               // can even skip typing apply method
res2: Person = Person(name=Alice,age=19)

scala> :javap -p Person$
Compiled from ""
public class Person$ {
  public static final Person$ MODULE$;
  public static {};
  public Person apply(java.lang.String, int);
  public Person$();
}

scala> class Person(val name: String, val age: Int) {
     | println("In default constructor") 
     | override def toString = "Person(name=" + name + ",age=" + age + ")"
     | }
defined class Person

scala> :javap -p Person
Compiled from ""
public class Person {
  private final java.lang.String name;
  private final int age;
  public java.lang.String name();
  public int age();
  public java.lang.String toString();
  public Person(java.lang.String, int);
}

scala> new Person("Bob",18)
In default constructor
res1: Person = Person(name=Bob,age=18)

scala> res1.name
res2: String = Bob

scala> res1.name()
:13: error: not enough arguments for method apply: (index: Int)Char in class StringOps.
Unspecified value parameter index.
       res1.name()
                ^

scala> class Person(var name: String, var age: Int) {
     | override def toString = "Person(name=" + name + ",age=" + age + ")"
     | }
defined class Person

scala> :javap -p Person
Compiled from ""
public class Person {
  private java.lang.String name;
  private int age;
  public java.lang.String name();
  public void name_$eq(java.lang.String);
  public int age();
  public void age_$eq(int);
  public java.lang.String toString();
  public Person(java.lang.String, int);
}

scala> new Person("Bob",18)
res4: Person = Person(name=Bob,age=18)

scala> res4.name_=("Bobby")

scala> res4
res6: Person = Person(name=Bobby,age=18)

scala> res4.name = "Bobby2"
res4.name: String = Bobby2

scala> res4
res7: Person = Person(name=Bobby2,age=18)

scala> case class Person(var name: String, var age: Int) // var to add setter methods
defined class Person

scala> new Person("Alice",17)
res0: Person = Person(Alice,17) // case class has by default toString with fields (instead byte address)

scala> Person("Bobby",18)  // automatically added companion object with apply method
res1: Person = Person(Bobby,18)

scala> :javap -p Person
Compiled from ""
public class Person implements scala.Product,scala.Serializable {
  private java.lang.String name;
  private int age;
  public java.lang.String name();
  public void name_$eq(java.lang.String);  // var to add setter methods
  public int age();
  public void age_$eq(int);
  public Person copy(java.lang.String, int);
  public java.lang.String copy$default$1();
  public int copy$default$2();
  public java.lang.String productPrefix();
  public int productArity();
  public java.lang.Object productElement(int);
  public scala.collection.Iterator productIterator();
  public boolean canEqual(java.lang.Object);
  public int hashCode();
  public java.lang.String toString();
  public boolean equals(java.lang.Object);
  public Person(java.lang.String, int);
}

scala> :javap -p scala.Serializable
Compiled from "Serializable.scala"
public interface scala.Serializable extends java.io.Serializable {
}

scala> :javap -p java.io.Serializable
Compiled from "Serializable.java"
public interface java.io.Serializable {
}

scala> :javap -p scala.Product
Compiled from "Product.scala"
public interface scala.Product extends scala.Equals {
  public abstract java.lang.Object productElement(int);
  public abstract int productArity();
  public abstract scala.collection.Iterator productIterator();
  public abstract java.lang.String productPrefix();
}

scala> :javap -p Person$  // automatically added companion object with apply method
Compiled from ""
public class Person$ extends scala.runtime.AbstractFunction2 implements scala.Serializable {
  public static final Person$ MODULE$;
  public static {};
  public final java.lang.String toString();
  public Person apply(java.lang.String, int);
  public scala.Option<scala.Tuple2> unapply(Person);
  private java.lang.Object readResolve();
  public java.lang.Object apply(java.lang.Object, java.lang.Object);
  public Person$();
}

scala> case class Person(name: String, age: Int)  // no var before args - no setters generated
defined class Person

scala> new Person("Alice",17)
res0: Person = Person(Alice,17)

scala> Person("Bobby",18)
res1: Person = Person(Bobby,18)

scala> :javap -p Person
Compiled from ""
public class Person implements scala.Product,scala.Serializable {
  private final java.lang.String name;
  private final int age;
  public java.lang.String name();
  public int age();
  public Person copy(java.lang.String, int);
  public java.lang.String copy$default$1();
  public int copy$default$2();
  public java.lang.String productPrefix();
  public int productArity();
  public java.lang.Object productElement(int);
  public scala.collection.Iterator productIterator();
  public boolean canEqual(java.lang.Object);
  public int hashCode();
  public java.lang.String toString();
  public boolean equals(java.lang.Object);
  public Person(java.lang.String, int);
}

scala> :javap -p Person$
Compiled from ""
public class Person$ extends scala.runtime.AbstractFunction2 implements scala.Serializable {
  public static final Person$ MODULE$;
  public static {};
  public final java.lang.String toString();
  public Person apply(java.lang.String, int);
  public scala.Option<scala.Tuple2> unapply(Person);
  private java.lang.Object readResolve();
  public java.lang.Object apply(java.lang.Object, java.lang.Object);
  public Person$();
}

scala> case class Person(name: String, age: Int) { // override constructor and toString method in case class
     | println("In my constructor")
     | override def toString = name + age
     | }
defined class Person

scala> new Person("a",1)
In my constructor
res4: Person = a1

 

Advertisements

Apache spark rdd / dataframe re-calculation

package com.bawi.spark

import java.io.File
import java.nio.file.{Files, Paths}

import org.apache.commons.io.FileUtils
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

object TestRecalculation {
  def main(args: Array[String]): Unit = {

    val output = if (args.length > 0) args(0) else "numbers"
    
    val sparkConf = new SparkConf().setAppName(getClass.getName)
    if (isLocal()) {
      sparkConf.setMaster("local[*]")
      val file = new File(output)
      if (file.exists())
        FileUtils.deleteDirectory(file)
    }

    val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    val sparkContext = sparkSession.sparkContext

    val rdd: RDD[Int] = sparkContext.parallelize(Seq(1, 2, 3))
    val rdd2: RDD[Int] = rdd.map(n => {
      println(s"processing: $n")
      n
    })
    import sparkSession.implicits._
    val df: DataFrame = rdd2.toDF("numbers")

    df.write.json(output)

    df.show() // re-calculates

    sparkSession.close()
  }

  def isMac(): Boolean = {
    if (System.getProperty("os.name").contains("Mac")) true else false
  }

  def isUbuntu(): Boolean = {
    if (System.getProperty("os.name").contains("Linux")){
      val path = Paths.get ("/etc/os-release")
      if (path.toFile.exists()) {
        new String(Files.readAllBytes(path)).toLowerCase.contains("Ubuntu")
      } else false
    } else false
  }

  def isLocal(): Boolean = {
    isMac() || isUbuntu()
  }
}

Output:

19/01/04 13:55:20 INFO SparkContext: Running Spark version 2.3.2
19/01/04 13:55:21 INFO Utils: Successfully started service 'SparkUI' on port 4040.
19/01/04 13:55:21 INFO SparkUI: Bound SparkUI to 127.0.0.1, and started at http://localhost:4040
19/01/04 13:55:21 INFO Executor: Starting executor ID driver on host localhost
19/01/04 13:55:24 INFO SparkContext: Starting job: json at TestRecalculation.scala:25
19/01/04 13:55:24 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
19/01/04 13:55:24 INFO Executor: Running task 4.0 in stage 0.0 (TID 4)
19/01/04 13:55:24 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
19/01/04 13:55:24 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
19/01/04 13:55:24 INFO Executor: Running task 5.0 in stage 0.0 (TID 5)
19/01/04 13:55:24 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
19/01/04 13:55:24 INFO Executor: Running task 6.0 in stage 0.0 (TID 6)
19/01/04 13:55:24 INFO Executor: Running task 7.0 in stage 0.0 (TID 7)
processing: 2
processing: 3
processing: 1
19/01/04 13:55:25 INFO SparkContext: Starting job: show at TestRecalculation.scala:27
19/01/04 13:55:25 INFO Executor: Running task 0.0 in stage 1.0 (TID 8)
19/01/04 13:55:25 INFO Executor: Running task 0.0 in stage 2.0 (TID 9)
19/01/04 13:55:25 INFO Executor: Running task 1.0 in stage 2.0 (TID 10)
19/01/04 13:55:25 INFO Executor: Running task 2.0 in stage 2.0 (TID 11)
19/01/04 13:55:25 INFO Executor: Running task 3.0 in stage 2.0 (TID 12)
processing: 1
19/01/04 13:55:25 INFO SparkContext: Starting job: show at TestRecalculation.scala:27
19/01/04 13:55:25 INFO Executor: Running task 1.0 in stage 3.0 (TID 14)
19/01/04 13:55:25 INFO Executor: Running task 0.0 in stage 3.0 (TID 13)
19/01/04 13:55:25 INFO Executor: Running task 2.0 in stage 3.0 (TID 15)
processing: 3
processing: 2
+-------+
|numbers|
+-------+
|      1|
|      2|
|      3|
+-------+
19/01/04 13:55:25 INFO SparkUI: Stopped Spark web UI at http://localhost:4040
19/01/04 13:55:25 INFO SparkContext: Successfully stopped SparkContext
Process finished with exit code 0

Note the numbers were processed twice.

 
Now with cached dataframe (before calling df.write):

    df.cache()

Output:

19/01/04 14:22:37 INFO SparkContext: Running Spark version 2.3.2
19/01/04 14:22:41 INFO SparkContext: Starting job: json at TestRecalculation.scala:37
19/01/04 14:22:41 INFO Executor: Running task 6.0 in stage 0.0 (TID 6)
19/01/04 14:22:41 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
19/01/04 14:22:41 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
19/01/04 14:22:41 INFO Executor: Running task 5.0 in stage 0.0 (TID 5)
19/01/04 14:22:41 INFO Executor: Running task 7.0 in stage 0.0 (TID 7)
19/01/04 14:22:41 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
19/01/04 14:22:41 INFO Executor: Running task 4.0 in stage 0.0 (TID 4)
19/01/04 14:22:41 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
19/01/04 14:22:42 INFO MemoryStore: Block rdd_4_0 stored as values in memory (estimated size 16.0 B, free 2004.4 MB)
19/01/04 14:22:42 INFO MemoryStore: Block rdd_4_3 stored as values in memory (estimated size 16.0 B, free 2004.4 MB)
19/01/04 14:22:42 INFO BlockManagerInfo: Added rdd_4_0 in memory on localhost:63790 (size: 16.0 B, free: 2004.5 MB)
19/01/04 14:22:42 INFO MemoryStore: Block rdd_4_6 stored as values in memory (estimated size 16.0 B, free 2004.4 MB)
19/01/04 14:22:42 INFO MemoryStore: Block rdd_4_1 stored as values in memory (estimated size 16.0 B, free 2004.4 MB)
19/01/04 14:22:42 INFO BlockManagerInfo: Added rdd_4_3 in memory on localhost:63790 (size: 16.0 B, free: 2004.5 MB)
19/01/04 14:22:42 INFO BlockManagerInfo: Added rdd_4_6 in memory on localhost:63790 (size: 16.0 B, free: 2004.5 MB)
19/01/04 14:22:42 INFO MemoryStore: Block rdd_4_4 stored as values in memory (estimated size 16.0 B, free 2004.4 MB)
19/01/04 14:22:42 INFO BlockManagerInfo: Added rdd_4_1 in memory on localhost:63790 (size: 16.0 B, free: 2004.5 MB)
19/01/04 14:22:42 INFO BlockManagerInfo: Added rdd_4_4 in memory on localhost:63790 (size: 16.0 B, free: 2004.5 MB)
processing: 3
processing: 1
processing: 2
19/01/04 14:22:42 INFO MemoryStore: Block rdd_4_7 stored as values in memory (estimated size 232.0 B, free 2004.4 MB)
19/01/04 14:22:42 INFO MemoryStore: Block rdd_4_5 stored as values in memory (estimated size 232.0 B, free 2004.4 MB)
19/01/04 14:22:42 INFO BlockManagerInfo: Added rdd_4_7 in memory on localhost:63790 (size: 232.0 B, free: 2004.5 MB)
19/01/04 14:22:42 INFO BlockManagerInfo: Added rdd_4_5 in memory on localhost:63790 (size: 232.0 B, free: 2004.5 MB)
19/01/04 14:22:42 INFO SparkContext: Starting job: show at TestRecalculation.scala:39
19/01/04 14:22:42 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 18.0 KB, free 2004.4 MB)
19/01/04 14:22:42 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 7.5 KB, free 2004.4 MB)
19/01/04 14:22:42 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:63790 (size: 7.5 KB, free: 2004.5 MB)
19/01/04 14:22:42 INFO Executor: Running task 0.0 in stage 1.0 (TID 8)
19/01/04 14:22:42 INFO BlockManager: Found block rdd_4_0 locally
19/01/04 14:22:42 INFO SparkContext: Starting job: show at TestRecalculation.scala:39
19/01/04 14:22:42 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 18.0 KB, free 2004.4 MB)
19/01/04 14:22:42 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 7.5 KB, free 2004.4 MB)
19/01/04 14:22:42 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:63790 (size: 7.5 KB, free: 2004.5 MB)
19/01/04 14:22:42 INFO Executor: Running task 0.0 in stage 2.0 (TID 9)
19/01/04 14:22:42 INFO Executor: Running task 2.0 in stage 2.0 (TID 11)
19/01/04 14:22:42 INFO Executor: Running task 1.0 in stage 2.0 (TID 10)
19/01/04 14:22:42 INFO Executor: Running task 3.0 in stage 2.0 (TID 12)
19/01/04 14:22:42 INFO BlockManager: Found block rdd_4_3 locally
19/01/04 14:22:42 INFO BlockManager: Found block rdd_4_4 locally
19/01/04 14:22:42 INFO BlockManager: Found block rdd_4_2 locally
19/01/04 14:22:42 INFO BlockManager: Found block rdd_4_1 locally
19/01/04 14:22:42 INFO SparkContext: Starting job: show at TestRecalculation.scala:39
19/01/04 14:22:42 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 18.0 KB, free 2004.3 MB)
19/01/04 14:22:42 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 7.5 KB, free 2004.3 MB)
19/01/04 14:22:42 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:63790 (size: 7.5 KB, free: 2004.5 MB)
19/01/04 14:22:42 INFO Executor: Running task 0.0 in stage 3.0 (TID 13)
19/01/04 14:22:42 INFO Executor: Running task 1.0 in stage 3.0 (TID 14)
19/01/04 14:22:42 INFO Executor: Running task 2.0 in stage 3.0 (TID 15)
19/01/04 14:22:42 INFO BlockManager: Found block rdd_4_5 locally
19/01/04 14:22:42 INFO BlockManager: Found block rdd_4_7 locally
19/01/04 14:22:42 INFO BlockManager: Found block rdd_4_6 locally
+-------+
|numbers|
+-------+
|      1|
|      2|
|      3|
+-------+
19/01/04 14:22:42 INFO SparkUI: Stopped Spark web UI at http://localhost:4040
19/01/04 14:22:42 INFO SparkContext: Successfully stopped SparkContext
19/01/04 14:22:42 INFO ShutdownHookManager: Shutdown hook called
Process finished with exit code 0

Note: the numbers were processed only once.

Apache Spark serialization with Java and Scala

For any Java or Scala class implementing java.io.Serializable we do not have issues in Apache Spark with java.io.NotSerializableException. However, sometimes between driver and/or workers/executors we need to pass some state e.g. instance of a config that may not necessarily implement Serializable interface. The workaround for that is to use KryoSerializer and declare that config as static class field, initialize that in the driver and pass it the workers.

public static class MyData {
    public String myData;

    public MyData(String myData) {
        this.myData = myData;
    }

    @Override
    public String toString() {
        return "MyData{myData='" + myData + "'}";
    }
}

with

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class JavaSparkExample {

    public static void main(String[] args) {
        String master = args.length == 0 ? "local[*]" : args[0];
        SparkConf sparkConf = new SparkConf().setAppName(JavaSparkExample.class.getName()).setMaster(master);
        SparkContext sparkContext = new SparkContext(sparkConf);
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkContext);

        List<String> strings = IntStream.range(0, 100).mapToObj(String::valueOf).collect(Collectors.toList());
        JavaRDD<String> javaStringRDD = javaSparkContext.parallelize(strings);
        JavaRDD<MyData> javaMyDatasRDD = javaStringRDD.map(MyData::new);
        MyData reducedMyData = javaMyDatasRDD.reduce((MyData y1, MyData y2) -> new MyData(String.valueOf(Integer.parseInt(y1.myData) + Integer
                .parseInt(y2.myData))));
        System.out.println(reducedMyData.myData);
    }
}

leads to java.io.NotSerializableException:

18/11/27 14:22:18 ERROR Executor: Exception in task 7.0 in stage 0.0 (TID 7)
java.io.NotSerializableException: com.bawi.spark.JavaSparkExample$MyData
Serialization stack:
- object not serializable (class: com.bawi.spark.JavaSparkExample$MyData, value: MyData{myData='1209'})
- field (class: scala.Some, name: x, type: class java.lang.Object)
- object (class scala.Some, Some(MyData{myData='1209'}))
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:383)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Solution to make it work would be to make MyData implement java.io.Serializable or to add KryoSerializer to sparkConf:
        .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

Another case would be if we want to create a local instance (in the main method) of MyData in the Spark driver and pass it to map or reduce method executed the executor/worker:

public static void main(String[] args) {
        ...
        MyData zeroMyData = new MyData("0");
        MyData zeroMyData = new MyData("0");
        JavaRDD<MyData> javaMyDatasRDD = javaStringRDD.map(s ->
            new MyData(String.valueOf(
                                    Integer.parseInt(zeroMyData.myData) +
                                    Integer.parseInt(s))));
        ...
}

then we will end up again with the java.io.NotSerializableException:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.map(RDD.scala:369)
at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:93)
at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:45)
at com.bawi.spark.JavaSparkExample.main(JavaSparkExample.java:65)
Caused by: java.io.NotSerializableException: com.bawi.spark.JavaSparkExample$MyData
Serialization stack:
- object not serializable (class: com.bawi.spark.JavaSparkExample$MyData, value: MyData{myData='0'})
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class com.bawi.spark.JavaSparkExample, functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic com/bawi/spark/spark2/JavaSparkExample.lambda$main$41578125$1:(Lcom/bawi/spark/spark2/JavaSparkExample$MyData;Ljava/lang/String;)Lcom/bawi/spark/spark2/JavaSparkExample$MyData;, instantiatedMethodType=(Ljava/lang/String;)Lcom/bawi/spark/spark2/JavaSparkExample$MyData;, numCaptured=1])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class com.bawi.spark.JavaSparkExample$$Lambda$15/818859466, com.bawi.spark.JavaSparkExample$$Lambda$15/818859466@7e46d648)
- field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)
- object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)

again the solution would be for the class MyData to implement Serializable or declare instance of MyData as static class field:

public class JavaSparkExample {

    static MyData zeroMyData;

    public static void main(String[] args) {
	 ...
        zeroMyData = new MyData("0");
        JavaRDD<MyData> javaMyDatasRDD = javaStringRDD.map(s ->
        new MyData(String.valueOf(
                                Integer.parseInt(zeroMyData.myData) +
                                Integer.parseInt(s))));
	 ...
    }
}

The same principle applies to Scala code (note that Scala case class automatically implements Serializable behind the scenes and there is not static keyword in Scala)

package com.bawi.spark.scala

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object ScalaSparkExample {

  class MyData2(val myData2: String) {
    override def toString: String = s"MyData2{myData2=$myData2}"
  }

//  case class MyData2(val myData2: String)

  var zeroMyData: MyData2 = _

  def main(args: Array[String]): Unit = {
    val master = if (args.length == 0) "local[*]" else args(0)
    val sparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster(master)
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sparkContext = new SparkContext(sparkConf)
    val ints: Seq[Int] = Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    val strings: Seq[String] = ints.map(i => i.toString)
    val stringsRDD: RDD[String] = sparkContext.parallelize(strings)
    zeroMyData = new MyData2("0")
    val myDatasRDD: RDD[MyData2] = stringsRDD.map(s => new MyData2((zeroMyData.myData2.toInt + s.toInt).toString))
    val aggregatedMyData2 = myDatasRDD.reduce((y1, y2) => new MyData2((y1.myData2.toInt + y2.myData2.toInt).toString))
    println(aggregatedMyData2)

  }
}

Scala for Java developers


class MyClass {
  var myname: String = _ // unitialize to default value // var so that cannot be changed
  var myseq: Seq[Any] = Nil // empty seq
  def mytrim: String = myname.trim
  
}
  
case class Person(age: Int, name: String)
  
object PersonImplicits {
  implicit class PersonHelper(person: Person) {
    def describe: String = s"Person[${person.age},${person.name}]"
  }
}
  
  
object MyClass {
  def main(args: Array[String]): Unit = {
 
    println(sum(1, 10, i => i)) // 55 pass directly function body
 
    println(sum(1, 10, square)) // 385 pass method calculating a square
 
    def square(i: Int): Int = i * i // declare method after referencing it
  
    import java.util
  
    val javaList: util.List[String] = new util.ArrayList[String] // import java jdk List and ArrayList
    javaList.add("a")
  
    import scala.collection.mutable
    import scala.collection.JavaConverters._
    val scalaSeq: mutable.Seq[String] = javaList.asScala // convert java javaList to scala mutable seq
  
    // alternative ways to println elements
    scalaSeq.foreach(s => println(s))
    scalaSeq.foreach(println(_))
    scalaSeq.foreach(println _)
    scalaSeq.foreach(println)
  
    val list: List[String] = scalaSeq.toList // convert from seq to list
    scalaSeq.foreach(println)
  
    val seq: Seq[String] = Seq("a", "b", "c") // scala Seq is a trait so equivalent to Java List interface
    seq.foreach(println)
  
    val immutableList = List(1, 2 ,3) // scala list is immutable and is an abstract class (that is extended by Nil == empty list)
    immutableList.map(10 * _).foreach(println)
  
    val ints: mutable.ListBuffer[Int] = mutable.ListBuffer(11, 22, 33)
    ints += 44 // mutable ListBuffer allows adding elements via +=, Seq does not
  
    val immutableMap: Map[String, Int] = Map("a" -> 1, "b" -> 2)
    println("map:" + immutableMap("a")) // get value by key = "a"
  
    val newImmutableMap: Map[String, Int] = immutableMap + ("c" -> 3) // create a new map based on existing and new entry
    newImmutableMap.foreach(e => println(e._1 + ":" + e._2))
  
    val mutableMap = mutable.Map("aa" -> 10)
    mutableMap += "bb" -> 20
    mutableMap.foreach{ case(k, v) => println(k + ":" + v) }
  
    printOptional(Some("a")) // value=a
    printOptional(None)      // value=empty
  
    println(divide(12,4,3))  // result 1
    println(divide(b=4,a=12,c=3)) // same result 1: alternatively with named params and different order
  
    println(divide(12,4)) // result 3: since third param has default value then no need to specify it unless overwrite
  
    val clazz = new MyClass
    clazz.myname = " aaa bbb "
    println(s"'${clazz.mytrim}'")
    clazz.myseq = Seq(12)
  
    matchcase(0)
    matchcase(1)
  
    import PersonImplicits._
    println(Person(18, "me").describe) // implicitly add describe method
  }
  
  def printOptional (optionalValue: Option[String]): Unit = {
    val value = if (optionalValue.isDefined) optionalValue.get else "empty"
    println("value=" + value)
  }
  
  def divide (a: Int, b: Int, c: Int = 1): Int = a / b / c    // value of last statement is the method return value
  
  def matchcase (n: Int): Unit = {
    n match {
      case -1 | 0 => println("non positive")
      case other => println("positive " + other)
    }
  }
 
  def sum(lb: Int, ub: Int, fun: Int => Int) = { // define a method (before referencing it) accepting function as 3rd arg
    var sum = 0 // declare mutable variable (val is immutable)
    for (el <- lb to ub) { // iterate by 1 from lb to ub
      sum += fun(el)
    }
    sum
  }
}

command line batch vlc convert mp4 to mp3 and files renaming on mac

Goal: downloaded many mp4 file, convert them from mp4 to mp3, rename them to add names and numbering:

  1. create a text file with a separate line for each file name, read files names into array in bash:
    me@MacBook:~mp4IFS=$'\n' read -d '' -r -a lines < ~/mp4/list.txt
  2.  convert files in batch mode to mp3:
    me@MacBook:~mp4$
    alias vlc=/Applications/VLC.app/Contents/MacOS/VLC
    
    i=0; for file in `ls -rt *.mp4`; do name=${lines[$i]} && i=$(expr $i + 1) && vlc -I dummy "$file" --sout="#transcode{vcodec=none,acodec=mp3,ab=128,channels=2,samplerate=44100,scodec=none}:standard{mux=raw,access=file{no-overwrite},dst=\"$(printf "%02d" $i)_${name}_$(echo "$file" | sed 's/\.[^\.]*$/.mp3/')\"}" vlc://quit; done

    simple example for hardcoded input and output file name:

    vlc -I dummy "902970061.mp4" --sout="#transcode{vcodec=none,acodec=mp3,ab=128,channels=2,samplerate=44100,scodec=none}:standard{mux=raw,access=file{no-overwrite},dst=\"1_MY_TITLE_ABC_902970061.mp3\"}" vlc://quit
    

    simple example transforming mp4 to mp3 with the same file name (changing only extension):

    for file in *.mp4; do vlc -I dummy "$file" --sout="#transcode{vcodec=none,acodec=mp3,ab=128,channels=2,samplerate=44100,scodec=none}:standard{mux=raw,access=file{no-overwrite},dst=\"$(echo "$file" | sed 's/\.[^\.]*$/.mp3/')\"}" vlc://quit; done
    
  3. rename files to remove polish chars:
    me@MacBook:~mp4find . -name \*.mp3 -exec ~/dev/my-projects/my-bash/rename-file-to-remove-polish-chars.sh "{}" \;

where rename script is:

me@MacBook:~/mp4$ vim ~/dev/my-projects/my-bash/rename-file-to-remove-polish-chars.sh

    sed 's/Ć/C/g' |\
    sed 's/ć/c/g' |\
    sed 's/ę/e/g' |\
    sed 's/Ę/E/g' |\
    sed 's/Ł/L/g' |\
    sed 's/ł/l/g' |\
    sed 's/ń/n/g' |\
    sed 's/ó/o/g' |\
    sed 's/Ś/S/g' |\
    sed 's/ś/s/g' |\
    sed 's/Ż/Z/g' |\
    sed 's/ż/z/g' |\
    sed 's/Ź/Z/g' |\
    sed 's/ź/z/g' |\
    sed 's/ą/a/g' |\
    sed 's/ć/c/g' |\
    sed 's/ę/e/g' |\
    sed 's/ł/l/g' |\
    sed 's/ł/l/g' |\
    sed 's/ń/n/g' |\
    sed 's/ó/o/g' |\
    sed 's/Ó/O/g' |\
    sed 's/ś/s/g' |\
    sed 's/Ś/S/g' |\
    sed 's/Ż/Z/g' |\
    sed 's/ż/z/g' |\
    sed 's/:/_/g' |\
    sed 's/,/_/g' |\
    sed 's/__/_/g' |\
    sed 's/)//g' |\
    sed 's/(//g' |\
    sed 's/\ /_/g' |\
    sed 's/\.\./\./g' |\
    sed 's/Jerzego //g' |\
    sed 's/Zieba/z Zieba/g' |\
    sed 's/?//g' )

if ! [ "${originalFileName}" == "${newFileName}" ]; then
    echo "Renamed file \"${originalFileName}\" to \"${newFileName}\""
    mv "${originalFileName}" "${newFileName}"
fi

vlc url –sout=”#std{access=file,mux=ts,dst=go.mpg}”

Debugging local Tomcat web applications

Project structure:

./context.xml
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/com
./src/main/java/com/bawi
./src/main/java/com/bawi/ConnectionUtils.java
./src/main/webapp
./src/main/webapp/index.jsp
./src/main/webapp/WEB-INF
./src/main/webapp/WEB-INF/web.xml

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.bawi</groupId>
    <artifactId>my-webapp-template</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>war</packaging>

    <build>
        <finalName>my-webapp-template</finalName>
        <plugins>
            <plugin>
                <groupId>org.apache.tomcat.maven</groupId>
                <artifactId>tomcat7-maven-plugin</artifactId>
                <version>2.2</version>
                <configuration>
                    <path>/${project.build.finalName}</path>
                    <contextFile>context.xml</contextFile>
                    <username>admin</username>
                    <password>admin</password>
                </configuration>
                <dependencies>
                    <dependency>
                        <groupId>org.hsqldb</groupId>
                        <artifactId>hsqldb</artifactId>
                        <version>2.4.0</version>
                    </dependency>
                </dependencies>
            </plugin>
        </plugins>
    </build>

</project>

1. First option: standalone tomcat (can be used with tomcat 7 and 8)

In this option update tomcat bin/startup.sh to:

export JPDA_ADDRESS=8000
export JPDA_TRANSPORT=dt_socket

#exec "$PRGDIR"/"$EXECUTABLE" start "$@"
exec "$PRGDIR"/"$EXECUTABLE" jpda start "$@"

and add drivers jars to standalone tomcat lib folder if needed, and update context.xml (if non-standard) in standalone tomcat/conf folder,  restart tomcat.

Then build my-webapp-template.war file by ‘mvn clean package’ and manually copy (redeploy) to standalone tomcat webapps folder

or even better redeploy war via

mvn clean tomcat7:redeploy

For that add username/password or reference server in tomcat7-maven-plugin to access standalone tomcat manager-script. Update tomcat-users.xml:

    <role rolename="manager-gui"/>
    <role rolename="manager-script"/>
    <user username="admin" password="admin" roles="manager-gui,manager-script"/>

For debugging in Intellij: Run tab -> Attach to local process and set breakpoint in java code

2. Second option: Embedded Tomcat:

In this option tomcat7-maven-plugin requires adding driver dependencies as plugin dependency and specify context.xml if non-standard.

Intellij: Maven Projects tab, Plugins: right click on

tomcat7:run

plugin to choose debug and set breakpoint. In this option if the source java classes change then you can use Intellij -> Run -> Reload changed classes.

Apache tomcat and hsql db, derby db and postgres db integration

HSQL DB:

Download latest hsqldb zip, extract it, start hdqldb and create automatically database:

me@MacBook:~/dev/env/hsqldb-2.4.0/hsqldb$ java -cp lib/hsqldb.jar org.hsqldb.server.Server --database.0 file:mydatabases/mydb --dbname.0 mydb

Connect to db from console (empty password, just hit enter):

me@MacBook:~/dev/env/hsqldb-2.4.0/hsqldb$ java -jar lib/sqltool.jar --inlineRc=url=jdbc:hsqldb:hsql://localhost/mydb,user=sa
 Enter password for sa:
 SqlTool v. 5736.
 JDBC Connection established to a HSQL Database Engine v. 2.4.0 database
 as "SA" with R/W TRANSACTION_READ_COMMITTED Isolation.

sql>
 CREATE SCHEMA MY_SCHEMA;
 CREATE TABLE MY_SCHEMA.MY_TABLE (email VARCHAR(50));
 INSERT INTO MY_SCHEMA.MY_TABLE VALUES ('abc@efg.hi');
 COMMIT;

sql> SELECT * FROM MY_SCHEMA.MY_TABLE;
 email
 ----------------------------------
 abc@efg.hi

Tomcat:
copy hsqldb/lib/hsqldb.jar to tomcat lib/

DERBY DB:

download from https://db.apache.org/derby/derby_downloads.html, extract and start it as standalone:

me@MacBook:~/dev/env$ tar xzf db-derby-10.14.1.0-bin.tar.gz

me@MacBook:~/dev/env/db-derby-10.14.1.0-bin$ java -jar lib/derbyrun.jar server start
Tue Apr 24 10:02:35 CEST 2018 : Security manager installed using the Basic server security policy.
Tue Apr 24 10:02:35 CEST 2018 : Apache Derby Network Server - 10.14.1.0 - (1808820) started and ready to accept connections on port 1527

me@MacBook:~/dev/env/db-derby-10.14.1.0-bin$ java -cp lib/derbytools.jar:lib/derby.jar:lib/derbyclient.jar org.apache.derby.tools.ij 
ij version 10.14
ij> connect 'jdbc:derby://localhost:1527/mydb;create=true';
ij> CREATE SCHEMA MY_SCHEMA;
0 rows inserted/updated/deleted
ij> CREATE TABLE MY_SCHEMA.MY_TABLE (email VARCHAR(50));
0 rows inserted/updated/deleted
ij> INSERT INTO MY_SCHEMA.MY_TABLE VALUES ('abc@efg.hi');
1 row inserted/updated/deleted
ij> SELECT * FROM MY_SCHEMA.MY_TABLE;
EMAIL 
--------------------------------------------------
abc@efg.hi 

1 row selected
ij> exit;

me@MacBook:~/dev/env/db-derby-10.14.1.0-bin$ ls mydb/
README_DO_NOT_TOUCH_FILES.txt dbex.lck seg0 tmp
db.lck log service.properties

# I have stopped the server and started again #

me@MacBook:~/dev/env/db-derby-10.14.1.0-bin$ java -cp lib/derbytools.jar:lib/derby.jar:lib/derbyclient.jar org.apache.derby.tools.ij 
ij version 10.14
ij> connect 'jdbc:derby://localhost:1527/mydb';
ij> SELECT * FROM MY_SCHEMA.MY_TABLE; 
EMAIL 
--------------------------------------------------
abc@efg.hi  

Note: CREATE TABLE statement had lowercase email column name but derby converted that to uppercase EMAIL.

Tomcat:
copy db-derby-10.14.1.0-bin/lib/derbyclient.jar to tomcat lib/

POSTGRES:

me@ubuntu-vm:~$ sudo apt-get install postgresql
me@ubuntu-vm:~$ sudo -u postgres createuser --interactive
Enter name of role to add: me
Shall the new role be a superuser? (y/n) y
me@ubuntu-vm:~$ sudo -u postgres createdb me
ME@ubuntu-vm:~$ psql
psql (9.5.12)
Type "help" for help.

me=# \password
Enter new password: 
Enter it again:

me=# CREATE DATABASE mydb;
CREATE DATABASE

sg0212148@ubuntu-vm:~$ psql -d mydb

mydb=# \conninfo
You are connected to database "mydb" as user "me" via socket in "/var/run/postgresql" at port "5432".

mydb=# CREATE SCHEMA MY_SCHEMA;
CREATE SCHEMA
mydb=# CREATE TABLE MY_SCHEMA.MY_TABLE (email VARCHAR(50));
CREATE TABLE
mydb=# INSERT INTO MY_SCHEMA.MY_TABLE VALUES ('abc@efg.hi');
INSERT 0 1
mydb=# SELECT * FROM MY_SCHEMA.MY_TABLE;
 email 
------------
 abc@efg.hi
(1 row)

Allow listen on all interfaces:
me@ubuntu-vm:~$ sudo vim /etc/postgresql/9.5/main/postgresql.conf 
#------------------------------------------------------------------------------
# CONNECTIONS AND AUTHENTICATION
#------------------------------------------------------------------------------
# - Connection Settings -
listen_addresses = '*'
#listen_addresses = 'localhost' # what IP address(es) to listen on;

Allow connections from all networks:
me@ubuntu-vm:~$ sudo vim /etc/postgresql/9.5/main/pg_hba.conf
# IPv4 local connections:
#host all all 127.0.0.1/32 md5
host all all 0.0.0.0/0 md5

me@ubuntu-vm:~$ sudo /etc/init.d/postgresql restart
[ ok ] Restarting postgresql (via systemctl): postgresql.service.

VirtualBox settings/Network/Enable Network Adapter: Attached to NAT: click Port Forwarding: add Name: Postgresql, protocol: TCP, Host IP
(empty), Host Port: 5432, Guest IP: (empty), Guest Port: 5432

Tomcat:
copy postgresql-42.2.2.jar to tomcat lib/

src/main/webapp/WEB-INF/web.xml and tomcat/conf/context.xml (last section)

<!DOCTYPE web-app PUBLIC
 "-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN"
 "http://java.sun.com/dtd/web-app_2_3.dtd" >

<web-app>
    <display-name>Archetype Created Web Application</display-name>

    <resource-ref>
        <res-ref-name>jdbc/mydb</res-ref-name>
        <res-type>javax.sql.DataSource</res-type>
        <res-auth>Container</res-auth>
    </resource-ref>

</web-app>
<Context>
...
    <Resource name="jdbc/mydb" auth="Container"
        type="javax.sql.DataSource"
        validationQuery="select 1 from INFORMATION_SCHEMA.SYSTEM_USERS" 
        maxActive="20" maxIdle="30" maxWait="10000"
        username="SA"
        driverClassName="org.hsqldb.jdbc.JDBCDriver"
        url="jdbc:hsqldb:hsql://localhost/mydb"/>
    <!-- 
    <Resource name="jdbc/mydb" auth="Container"
        type="javax.sql.DataSource"
        validationQuery="select 1 from sysibm.sysdummy1"
        maxActive="20" maxIdle="30" maxWait="10000"
        driverClassName="org.apache.derby.jdbc.ClientDriver"
        url="jdbc:derby://localhost:1527/mydb"/>
    -->
    <!-- 
    <Resource name="jdbc/mydb" scope="Container"
        type="javax.sql.DataSource"
        validationQuery="select 1"
        maxActive="20" maxIdle="30" maxWait="10000"
        username="me" password="abc123"
        driverClassName="org.postgresql.Driver"
        url="jdbc:postgresql://localhost:5432/mydb"/>
    -->
</Context>

src/main/webapp/index.jsp:

<%@ page import="com.bawi.ConnectionUtils" %>
<html>
    <body>
        <h2>Hi: <%= ConnectionUtils.getElements() %></h2>
    </body>
</html>

src/main/java/com/bawi/ConnectionUtils.java:

 

package com.bawi;

import javax.naming.Context;
import javax.naming.InitialContext;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;

public class ConnectionUtils {
    public static List<String> getElements() {
        List<String> list = new ArrayList<String>();
        try {
            Context ctx = (Context) new InitialContext().lookup("java:comp/env");
            DataSource ds = (DataSource) ctx.lookup("jdbc/mydb");
            Connection conn = ds.getConnection();
            Statement stmt = conn.createStatement();
            ResultSet res = stmt.executeQuery("SELECT EMAIL from MY_SCHEMA.MY_TABLE");
            while (res.next()) {
                String title = res.getString("EMAIL");
                list.add(title);
            }
            conn.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return list;
    }
}

Ouput:

Hi: [abc@efg.hi]