Dataflow metrics dashboard

terraform resource "google_dataflow_job" :

additional_experiments = ["enable_stackdriver_agent_metrics"]

maven:

-Dexec.args="${JAVA_DATAFLOW_RUN_OPTS} \
  --runner=DataflowRunner \
...
  --experiments=enable_stackdriver_agent_metrics
VM CPU and JVM usage - dataflow metrics dashboard
VM CPU and JVM usage – dataflow metrics dashboard
Selected element count and elapsed time – dataflow metrics dashboard
Selected element count and elapsed time – dataflow metrics dashboard
Current vCPU and Failed  – dataflow metrics dashboard
Current vCPU and Failed – dataflow metrics dashboard
All and Selected elements produced at specific point in time– dataflow metrics dashboard
All and Selected elements produced at specific point in time– dataflow metrics dashboard
BigQuery read and write elements produced at specific point in time– dataflow metrics dashboard
BigQuery read and write elements produced at specific point in time– dataflow metrics dashboard

Dataflow – debugging OutOfMemory OOM, profiling and logging overwrite

Apache beam 2.28 + Google Dataflow

package com.bawi.beam.dataflow;

import com.bawi.beam.dataflow.schema.AvroToBigQuerySchemaConverter;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.AvroGenericCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.gcp.bigquery.AvroWriteRequest;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.*;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class MyOOMJob {
    private static final Logger LOGGER = LoggerFactory.getLogger(MyOOMJob.class);

    private static Schema SCHEMA = SchemaBuilder.record("root").fields()
                .requiredInt("value")
                .optionalString("local_host_address")
                .optionalLong("thread_id")
                .optionalString("thread_name")
                .optionalString("thread_group")
                .optionalString("heap_total")
                .optionalString("heap_free")
                .optionalString("heap_used")
                .optionalString("heap_max")
            .endRecord();

    public interface MyPipelineOptions extends PipelineOptions {
//    public interface MyPipelineOptions extends DataflowPipelineOptions {
        @Validation.Required
        @Default.String("bartek_dataset.myoomtest_table")
        ValueProvider<String> getTable();
        void setTable(ValueProvider<String> value);

        @Validation.Required
        @Default.String("1,10000")
        ValueProvider<String> getSequenceStartCommaEnd();
        void setSequenceStartCommaEnd(ValueProvider<String> value);
    }

    public static void main(String[] args) {
        MyPipelineOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyPipelineOptions.class);
//        DataflowProfilingOptions.DataflowProfilingAgentConfiguration profilingConf = new DataflowProfilingOptions.DataflowProfilingAgentConfiguration();
//        profilingConf.put("APICurated", true);
//        pipelineOptions.setProfilingAgentConfiguration(profilingConf);
        Pipeline pipeline = Pipeline.create(pipelineOptions);

        ValueProvider.NestedValueProvider<List<Integer>, String> nestedValueProvider = ValueProvider.NestedValueProvider.of(pipelineOptions.getSequenceStartCommaEnd(), startCommaStop -> {
            int start = Integer.parseInt(startCommaStop.substring(0, startCommaStop.indexOf(",")));
            int end = Integer.parseInt(startCommaStop.substring(startCommaStop.indexOf(",") + 1));
            LOGGER.debug("DEBUG: Sequence start={}, end={}", start, end);
            LOGGER.info("INFO: Sequence start={}, end={}", start, end);
            return IntStream.rangeClosed(start, end).boxed().collect(Collectors.toList());
        });

        ListCoder<Integer> integerListCoder = ListCoder.of(SerializableCoder.of(Integer.class));

        //pipeline.apply(Create.of(IntStream.rangeClosed(10000, 25000).boxed().collect(Collectors.toList())))
        pipeline.apply(Create.ofProvider(nestedValueProvider, integerListCoder))
                .apply(FlatMapElements.into(TypeDescriptors.integers()).via(iter -> iter))

                .apply(MapElements.into(TypeDescriptor.of(GenericRecord.class)).via(i -> {
                    GenericData.Record record = new GenericData.Record(SCHEMA);
                    record.put("value", createBigArrayList(i));
                    InetAddress localHostAddress = getLocalHostAddress();
                    if (localHostAddress != null) {
                        record.put("local_host_address", localHostAddress.toString());
                    }
                    Thread thread = Thread.currentThread();
                    record.put("thread_id", thread.getId());
                    record.put("thread_name", thread.getName());
                    ThreadGroup threadGroup = thread.getThreadGroup();
                    if (threadGroup != null) {
                        record.put("thread_group", threadGroup.getName());
                    }
                    String total = format(Runtime.getRuntime().totalMemory());
                    record.put("heap_total", total);
                    String free = format(Runtime.getRuntime().freeMemory());
                    record.put("heap_free", free);
                    String used = format(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory());
                    record.put("heap_used", used);
                    String max = format(Runtime.getRuntime().maxMemory());
                    record.put("heap_max", max);
                    LOGGER.info("i={},localHostAddress={},thread_id={},thread_name={},threadGroup={},heap_total={},heap_free={},heap_used={},heap_max={}",
                            i, localHostAddress, thread.getId(), thread.getName(), thread.getThreadGroup(), total, free, used, max);
                    return record;
                })).setCoder(AvroGenericCoder.of(SCHEMA))

                // requires org.apache.beam:beam-sdks-java-io-google-cloud-platform
                .apply(BigQueryIO.<GenericRecord>write()
                        .withAvroFormatFunction(AvroWriteRequest::getElement)
                        .withAvroSchemaFactory(qTableSchema -> SCHEMA)
                        .to(pipelineOptions.getTable())
                        .useAvroLogicalTypes()
                        .withSchema(AvroToBigQuerySchemaConverter.convert(SCHEMA))
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

        PipelineResult pipelineResult = pipeline.run();
//        pipelineResult.waitUntilFinish();
    }

    private static String format(long value) {
        NumberFormat numberFormat = NumberFormat.getInstance();
        numberFormat.setGroupingUsed(true);
        return numberFormat.format(value);
    }

    private static int createBigArrayList(int limit) {
        List<byte[]> strings = new ArrayList<>();
        for (int i = 1; i <= limit; i++) {
            strings.add(new byte[16 * 1024]);
        }
        return strings.size();
    }

    private static InetAddress getLocalHostAddress() {
        try {
            return InetAddress.getLocalHost();
        } catch (UnknownHostException e) {
            LOGGER.error("Unable to get local host address", e);
            return null;
        }
    }
}

Running:

#PROJECT=$(gcloud config get-value project)
JOB=myoomjob
USER=bartek
BUCKET=${PROJECT}-$USER-${JOB}
gsutil mb gs://${BUCKET}

mvn clean compile -DskipTests -Pdataflow-runner exec:java \
 -Dexec.mainClass=com.bawi.beam.dataflow.MyOOMJob \
 -Dexec.args="${JAVA_DATAFLOW_RUN_OPTS} \
  --runner=DataflowRunner \
  --stagingLocation=gs://${BUCKET}/staging \
  --workerMachineType=n1-standard-1 \
  --dumpHeapOnOOM \
  --saveHeapDumpsToGcsPath=gs://${BUCKET}/oom \
  --workerLogLevelOverrides='{ \"com.bawi.beam.dataflow.MyOOMJob\": \"DEBUG\" }' \
  --profilingAgentConfiguration='{ \"APICurated\" : true }'"

Note user code logged at debug level (--workerLogLevelOverrides='{ \"com.bawi.beam.dataflow.MyOOMJob\": \"DEBUG\" }') – repeated same logging since job retried after OOM error.

Google Dataflow UI - debug log level overwrite
Google Dataflow UI – debug log level overwrite

Wait until worker started to execute:

WORKER="$(gcloud compute instances list --filter="name~$JOB" | tail -n 1)"
echo WORKER=$WORKER
WORKER_NAME=$(echo "${WORKER}" | awk '{ print $1 }')
echo WORKER_NAME=$WORKER_NAME
WORKER_ZONE=$(echo "${WORKER}" | awk '{ print $2 }')
echo WORKER_ZONE=$WORKER_ZONE
gcloud compute ssh --project=$PROJECT --zone=$WORKER_ZONE $WORKER_NAME --ssh-flag "-L 5555:127.0.0.1:5555"

Connect using JVisualVm (+JXM -> connection: localhost:5555)

Java VisualVM - JVM arguments
Java VisualVM – JVM arguments
Java VisualVM - Monitor
Java VisualVM – Monitor
Java VisualVM - Threads
Java VisualVM – Threads
select * except(local_host_address), concat(substr(local_host_address, 0 , 9),"XXX", substr(local_host_address, 35)) as masked_local_host_address 
from bartek_dataset.myoomtest_table where 
heap_used = (select min(heap_used) from bartek_dataset.myoomtest_table) or
heap_used = (select max(heap_used) from bartek_dataset.myoomtest_table);

or

select f.* except(local_host_address), concat(substr(f.local_host_address, 0 , 9),"XXX", substr(local_host_address, 35)) as masked_local_host_address
from bartek_dataset.myoomtest_table as f
inner join 
(select min(heap_used) as min, max(heap_used) as max from bartek_dataset.myoomtest_table) as m
on ( f.heap_used = m.max OR f.heap_used = m.min);
BigQuery - Dataflow worker heap used min max
BigQuery – Dataflow worker heap used min max

Enabling Google profiling (--profilingAgentConfiguration='{ \"APICurated\" : true }')

Google Dataflow - worker profiling enabled
Google Dataflow – worker profiling enabled

https://console.cloud.google.com/profiler;timespan=10m/myoomjob-XXX/cpu?project=YYY

Google Dataflow profiler
Google Dataflow profiler

Create heap dump on OOM and upload it to GCS (--dumpHeapOnOOM --saveHeapDumpsToGcsPath=gs://${BUCKET}/oom)

Dataflow UI log - heap dump on OOM
Dataflow UI log – heap dump on OOM
Java VisualVM - heap dump analysis
Java VisualVM – heap dump analysis
Java VisualVM - byte[] heap dump analysis
Java VisualVM – byte[] heap dump analysis

BigQuery timestamp, timestamp_trunc and date and timezones

SELECT 
 TIMESTAMP("2021-01-02 12:34:56.789 UTC") as full_utc # 2021-01-02 12:34:56.789 UTC
 ,TIMESTAMP("2021-01-02 12:34:56.789") as no_utc # 2021-01-02 12:34:56.789 UTC
 ,TIMESTAMP("2021-01-02") as date_only_no_utc # 2021-01-02 00:00:00 UTC, adds 0 hour, min etc
 ,TIMESTAMP("2021-01-02 12:34:56.789 America/Chicago") as full_am # 2021-01-02 18:34:56.789 UTC, +6h to become UTC,
 ,TIMESTAMP("2021-01-02 12:34:56.789","America/Chicago") as full__am # 2021-01-02 18:34:56.789 UTC, +6h to become UTC
 ,TIMESTAMP("2021-01-02","America/Chicago") as date__am # 2021-01-02 06:00:00 UTC, +6h to hour 0 to become UTC
;

RESULT:

full_utc	               no_utc	                      date_only_no_utc	         full_am	                full__am	               date__am	
2021-01-02 12:34:56.789 UTC.   2021-01-02 12:34:56.789 UTC    2021-01-02 00:00:00 UTC    2021-01-02 18:34:56.789 UTC.   2021-01-02 18:34:56.789 UTC   2021-01-02 06:00:00 UTC
SELECT ts, TIMESTAMP_TRUNC(ts,HOUR,"America/Chicago"), TIMESTAMP_TRUNC(ts,HOUR,"UTC") 
FROM my_dateset.my_table 
WHERE 
  TIMESTAMP_TRUNC(ts,DAY,"America/Chicago") = TIMESTAMP_TRUNC("2021-03-18 10:00:00.000",DAY,"America/Chicago") 
  AND id = '076ca3a2-b622-469f-a345-8619760f6302';

RESULT:

ts	                       tt_h_am_ch	          tt_h_utc	
2021-03-18 22:32:06.218 UTC    2021-03-18 22:00:00 UTC    2021-03-18 22:00:00 UTC
SELECT TIMESTAMP("2021-03-18 17:32:06.218 America/Chicago"); 

RESULT:

2021-03-18 22:32:06.218 UTC
SELECT ts,	TIMESTAMP_TRUNC(ts,HOUR,"America/Chicago") as tt_h_am_ch, TIMESTAMP_TRUNC(shop_ts,HOUR,"UTC") as tt_h_utc
FROM my_dateset.my_table 
WHERE 
  TIMESTAMP_TRUNC(ts,HOUR) = TIMESTAMP("2021-03-18 17:00:00 America/Chicago"). -- same as  TIMESTAMP_TRUNC(shop_ts,HOUR) = TIMESTAMP("2021-03-18 17:00:00","America/Chicago")
  AND id = '076ca3a2-b622-469f-a345-8619760f6302';

RESULT:

ts	                       tt_h_am_ch	          tt_h_utc	
2021-03-18 22:32:06.218 UTC.   2021-03-18 22:00:00 UTC.   2021-03-18 22:00:00 UTC


Mac OSX install ansible

python3 --version
Python 3.8.2

which python3
/usr/bin/python3

pip3 install virtualenv virtualenvwrapper

cat ~/.bash_profile
export PATH=”/Users/me/Library/Python/3.8/bin:$PATH”
# Configuration for virtualenv
export WORKON_HOME=$HOME/virtualenvs
export VIRTUALENVWRAPPER_PYTHON=/usr/bin/python3
export VIRTUALENVWRAPPER_VIRTUALENV=/Users/me/Library/Python/3.8/bin/virtualenv
source /Users/me/Library/Python/3.8/bin/virtualenvwrapper.sh

#reopen terminal
virtualenv --version
virtualenv 20.4.2 from /Users/me/Library/Python/3.8/lib/python/site-packages/virtualenv/__init__.py

mkvirtualenv ansible2_10 --python=/usr/bin/python3

(ansible2_10) me@MacBook:~$ pip install ansible==2.10.7

Google Dataflow autoscaling with CPU intensive factorial calculation and output to parquet

Default throughput based autoscaling from 1 to 5 workers with n1-standard-1 machine type with 1 vCPU.

Google dataflow autoscaling
Google dataflow autoscaling
5 workers with 1 thread each
5 workers with 1 thread each

if we run it with 2vCPU –workerMachineType=e2-small we would get 2 threads:

2 threads with 2 vCPU machine
2 threads with 2 vCPU machine

public class MyAutoscalingFactToParquetJob {
    private static final Logger LOGGER = LoggerFactory.getLogger(MyAutoscalingFactToParquetJob.class);

    public interface MyAutoscalingPipelineOptions extends PipelineOptions {
        @Validation.Required
        @Default.Integer(15000)
        int getFactorialLimit();
        void setFactorialLimit(int value);

        @Validation.Required
        ValueProvider<String> getOutput();
        void setOutput(ValueProvider<String> value);
    }

    static Schema SCHEMA = SchemaBuilder.record("myRecord").fields().requiredString("value").endRecord();

    public static void main(String[] args) {
        MyAutoscalingPipelineOptions myAutoscalingPipelineOptions =
                PipelineOptionsFactory.fromArgs(args).withValidation().as(MyAutoscalingPipelineOptions.class);
        Pipeline pipeline = Pipeline.create(myAutoscalingPipelineOptions);

        pipeline.apply(Create.of(IntStream.rangeClosed(1, myAutoscalingPipelineOptions.getFactorialLimit()).boxed().collect(Collectors.toList())))
                .apply(MapElements.into(TypeDescriptor.of(GenericRecord.class)).via(i -> {
                    LOGGER.info("i={}", i);
                    GenericData.Record record = new GenericData.Record(SCHEMA);
                    record.put("value", factorial(i));
                    return record;
                })).setCoder(AvroGenericCoder.of(SCHEMA))
                .apply(FileIO.<GenericRecord>write().via(ParquetIO.sink(SCHEMA)).to(myAutoscalingPipelineOptions.getOutput()));

        pipeline.run();
    }

    private static String factorial(int limit) {
        BigInteger factorial = BigInteger.valueOf(1);
        for (int i = 1; i <= limit; i++) {
            factorial = factorial.multiply(BigInteger.valueOf(i));
        }
        return factorial.toString();
    }
}

Running:

PROJECT=$(gcloud config get-value project)
JOB_NAME=myautoscalingfacttoparquetjob
BUCKET=${PROJECT}-$USER-${JOB_NAME}
gsutil mb gs://${BUCKET}

FACTORIAL_LIMIT=20000
mvn clean compile -DskipTests -Pdataflow-runner exec:java \
 -Dexec.mainClass=com.bawi.beam.dataflow.MyAutoscalingFactToParquetJob \
 -Dexec.args="--project=${PROJECT} ${JAVA_DATAFLOW_RUN_OPTS} \
  --runner=DataflowRunner \
  --stagingLocation=gs://${BUCKET}/staging \
  --workerMachineType=n1-standard-1 \
  --jobName=${JOB_NAME}-$USER-${FACTORIAL_LIMIT} \
  --factorialLimit=${FACTORIAL_LIMIT} \
  --output=gs://${BUCKET}/output"

For completeness pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>com.bawi</groupId>
    <artifactId>my-apache-beam-dataflow</artifactId>
    <version>0.1-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

        <beam.version>2.27.0</beam.version>
        <slf4j.version>1.7.30</slf4j.version>
        <guava.version>26.0-jre</guava.version>
        <junit.version>4.13.2</junit.version>
        <hadoop.version>2.10.1</hadoop.version>

        <maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version>
        <maven-exec-plugin.version>3.0.0</maven-exec-plugin.version>
    </properties>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>${maven-compiler-plugin.version}</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>${maven-exec-plugin.version}</version>
                <configuration>
                    <cleanupDaemonThreads>false</cleanupDaemonThreads>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-core</artifactId>
            <version>${beam.version}</version>
        </dependency>

        <!-- to use log4j.properties and avoid SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder" -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
            <scope>runtime</scope>
        </dependency>

        <!-- required for org.apache.beam.sdk.io.gcp.pubsub.public class PubsubIO -->
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
            <version>${beam.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-io-parquet</artifactId>
            <version>${beam.version}</version>
        </dependency>

        <!-- required for ParquetIO.sink requiring runtime org.apache.hadoop.io.Writable -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
            <scope>runtime</scope>
        </dependency>

        <!-- required for ParquetIO.read requiring runtime org.apache.hadoop.mapreduce.lib.input.FileInputFormat -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>${hadoop.version}</version>
            <scope>runtime</scope>
        </dependency>

    </dependencies>

    <profiles>
        <profile>
            <id>direct-runner</id>
            <activation>
                <!-- by default runs main and tests pipelines from ide and maven -->
                <activeByDefault>true</activeByDefault>
            </activation>
            <dependencies>
                <dependency>
                    <groupId>org.apache.beam</groupId>
                    <artifactId>beam-runners-direct-java</artifactId>
                    <version>${beam.version}</version>
                    <scope>runtime</scope>
                </dependency>
            </dependencies>
        </profile>

        <profile>
            <id>dataflow-runner</id>
            <dependencies>
                <dependency>
                    <groupId>org.apache.beam</groupId>
                    <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
                    <version>${beam.version}</version>
                    <scope>runtime</scope>
                </dependency>
            </dependencies>
        </profile>
    </profiles>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
                <version>${guava.version}</version>  <!-- "-jre" for Java 8 or higher -->
            </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>${junit.version}</version>
                <scope>test</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

</project>

Install specific version of terraform from brew mac osx

Requirement: install brew 0.12.39, however terraform@0.12 points to newer version: 0.12.30
Steps:
https://github.com/Homebrew/homebrew-core/blob/master/Formula/terraform@0.12.rb -> History -> choose commit with specific version e.g. https://github.com/Homebrew/homebrew-core/commit/13fce7f5504ba838199557bde565b824128e4362#diff-20915addc0e9fc9aa51da2f0748e48706d22db60d4b45b66415a8fd116619f37 –> … (3 dots) -> View file -> Raw -> download https://raw.githubusercontent.com/Homebrew/homebrew-core/13fce7f5504ba838199557bde565b824128e4362/Formula/terraform%400.12.rb as terraform@0.12.rb

# install terraform 0.12.29 from source based on rb file

brew install --build-from-source ~/Downloads/terraform@0.12.rb

# keep specific version and exclude from update

brew pin terraform@0.12

Java Serialization – static nested, inner and anonymous classes

public class SerializationUtils {
    public static ByteArrayOutputStream serialize(Object object) throws IOException {
        System.out.print("Serializing: " + object);
        ByteArrayOutputStream bout = new ByteArrayOutputStream();
        ObjectOutputStream out = new ObjectOutputStream(bout);
        out.writeObject(object);
        out.close();
        return bout;
    }

    public static void deserialize(ByteArrayOutputStream bout) throws IOException, 
                                                           ClassNotFoundException {
        ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray());
        ObjectInputStream in = new ObjectInputStream(bin);
        Object readObject = in.readObject();
        System.out.println(", deserialized: " + readObject);
    }
}

Junit:

public class MySerializableClass implements Serializable {
}

public class MySerializationTest {
    private String myJunitStringField = "hello";

    static class MyStaticNestedSerializableClass implements Serializable {
        public int getLength(String s) {
            // Error: java: non-static variable this cannot be referenced from a static context
            //System.out.println(MySerializationTest.this);// static nested class does not have reference to outer class
            return s.length();
        }
    }

    interface MySerializableFunction<T, R> extends Function<T, R>, Serializable {}

    @Test
    public void testSerialization() throws IOException, ClassNotFoundException {
       deserialize(serialize(1));     // Serializing: 1, deserialized: 1
       deserialize(serialize("aaa")); // Serializing: aaa, deserialized: aaa
       deserialize(serialize(new MySerializableClass())); // Serializing: com.bawi.serialization.MySerializableClass@4520ebad, deserialized: com.bawi.serialization.MySerializableClass@4f933fd1
       deserialize(serialize(new MyStaticNestedSerializableClass())); // Serializing: com.bawi.serialization.MySerializationTest$MyStaticNestedSerializableClass@4520ebad, deserialized: com.bawi.serialization.MySerializationTest$MyStaticNestedSerializableClass@4f933fd1
       deserialize(serialize((MySerializableFunction<String, Integer>) s -> s.length()));       
    }

    static class MyStaticNestedNonSerializableClass {} // missing implements Serializable

    class MyInnerSerializableClass implements Serializable {
        public int getLength(String s) {
            System.out.println("myJunitStringField=" + myJunitStringField); // direct access to private outer class fields
            System.out.println(MySerializationTest.this); // reference to enclosing instance of the outer class
            return s.length();
        }
    }

    @Test//(expected = java.io.NotSerializableException.class)
    public void failWithNotSerializableException() throws IOException, ClassNotFoundException {
        // NotSerializableException: com.bawi.serialization.MySerializationTest$MyStaticNestedNonSerializableClass
//        deserialize(serialize(new MyStaticNestedNonSerializableClass()));

        // inner (non static nested) class has reference to non-serializable outer class (junit com.bawi.serialization.MySerializationTest)

        MyInnerSerializableClass myInnerSerializableClass = new MyInnerSerializableClass();
        myInnerSerializableClass.getLength("a"); // myJunitStringField=hello
                                                 // com.bawi.serialization.MySerializationTest@1324409e
        // java.io.NotSerializableException: com.bawi.serialization.MySerializationTest
//        deserialize(serialize(myInnerSerializableClass));

        class MyLocalSerializableClass implements Serializable {
            @Override
            public String toString() {
                return MySerializationTest.this + "";
            }
        }
        // java.io.NotSerializableException: com.bawi.serialization.MySerializationTest
//        deserialize(serialize(new MyLocalSerializableClass()));


        //anonymous is inner class: java.io.NotSerializableException: com.bawi.serialization.MySerializationTest
//        deserialize(serialize(new Serializable(){
//            @Override
//            public String toString() {
//                return MySerializationTest.this + "";
//            }
//        }));

        // like local classes, anonymous classes have access to local variables of enclosing scope that are effectively final
        // anonymous class definition (within {}) is an expression and must be part of the statement and end with semicolon
        MyStaticNestedSerializableClass myAnonymousInnerSerializableClass = new MyStaticNestedSerializableClass() {
            @Override
            public int getLength(String s) {
                System.out.println("myJunitStringField=" + myJunitStringField); // direct access to private outer class fields
                System.out.println(MySerializationTest.this);
                return s.length() + 2;
            }
        };
        myAnonymousInnerSerializableClass.getLength("aa");
        // java.io.NotSerializableException: com.bawi.serialization.MySerializationTest
//        deserialize(serialize(myAnonymousInnerSerializableClass));

        //java.io.NotSerializableException: com.bawi.serialization.MySerializationTest$$Lambda$1/1576861390
//        deserialize(serialize((Function<String, Integer>) s -> s.length()));
    }
}

If we make MySerializationTest implements Serializable then we will not get java.io.NotSerializableException: com.bawi.serialization.MySerializationTest

Generics with java collection

import java.util.Arrays;
import java.util.List;

public class MyGenerics {
    static class SuperClass {}
    static class SubClass extends SuperClass {}

    public static void main(String[] args) {
// get1:
        List<SuperClass> superClasses = get1(Arrays.asList(new SuperClass()));

        // Error: java: incompatible types: List<MyGenerics.SuperClass> cannot be converted to List<MyGenerics.SubClass>
        //List<SubClass> subClasses = get1(Arrays.asList(new SubClass()));

// get2 (none compiles):
        // Error: java: incompatible types: List<capture#1 of ? extends MyGenerics.SuperClass> cannot be converted to List<MyGenerics.SuperClass>
        //List<SuperClass> superClasses2 = get2(Arrays.asList(new SuperClass()));

        // Error: java: incompatible types: List<capture#1 of ? extends MyGenerics.SuperClass> cannot be converted to List<MyGenerics.SubClass>
        //List<SubClass> subClasses2 = get2(Arrays.asList(new SubClass()));

// get3: (all compiles):
        List<SuperClass> superClasses3 = get3(Arrays.asList(new SuperClass()));
        List<SubClass> subClasses3 = get3(Arrays.asList(new SubClass()));
        List<SuperClass> subClasses3_1 = get3(Arrays.asList(new SubClass()));
        List<SuperClass> subClasses3_2 = get3(Arrays.asList(new SubClass(), new SuperClass()));
    }

    static List<SuperClass> get1(List<SuperClass> input) { return null; }

    static List<? extends SuperClass> get2(List<? extends SuperClass> input) { return null; }

    static <T extends SuperClass> List<T> get3(List<T> input) { return null; }
}

Spark Scala and Java – creating rdd, dataframe and dataset

Spark is written in Scala. Spark Java API are wrappers for Scala API for Java Developers not to use Scala language libraries.

1. Create instance of org.apache.spark.sql.SparkSession (spark) using builder (same in both languages):

SparkSession sparkSession = SparkSession.builder().master("local[*]").getOrCreate();

2. Create org.apache.spark.rdd.RDD / org.apache.spark.api.java.JavaRDD, rdd has a partitioned collections of objects (without a schema)
Using directly sparkContext to parallelize scala.collection.Seq[A]:
Scala:

val stringRDD: RDD[String] = sparkSession.sparkContext.parallelize(Seq("a", "bb", "ccc"))

val rowRDD: RDD[Row] = stringRDD.map(string => RowFactory.create(string))

case class Person(name: String, age: Integer) // defined outside main method

val personRDD: RDD[Person] = stringRDD.map(string => Person(string, new java.util.Random().nextInt(100)))
personRDD.foreach(p => println(p)) 
personRDD.foreach(println) // same as above

output:

Person(ccc,63)
Person(a,50)
Person(bb,93)

Use JavaSparkContext wrapping sparkContext to allow parallelize java.util.List<T>
Java:

JavaRDD<String> stringJavaRDD 
  = new JavaSparkContext(sparkSession.sparkContext()).parallelize(Arrays.asList("a", "bb", "ccc"));

JavaRDD<Row> rowJavaRDD = stringJavaRDD.map(string -> RowFactory.create(string));

public static class Person implements Serializable { // class need to be public static outside main method
    private String name;
    private int age;
    public Person() { } // default construction and setters/getters are required by spark
    public Person(String name, int age) { this.name = name; this.age = age; }
    public void setName(String name) { this.name = name; }
    public void setAge(int age) { this.age = age; }
    public String getName() { return name; }
    public int getAge() { return age; }
    @Override public String toString() { return "Person{name='" + name + "',age='" + age + "'}"; }
}

JavaRDD<Person> personJavaRDD = stringJavaRDD.map(name -> new Person(name, new Random().nextInt(100)));

personJavaRDD.foreach(p -> System.out.println(p)); // no .show() method on rdd
// personJavaRDD.foreach(System.out::println); // java.io.NotSerializableException: java.io.PrintStream

// using scala api in java requires scala imports and lots of boilerplate code:
Iterator<String> iterator = Arrays.asList("a", "b").iterator();
// import scala.collection.JavaConverters;
// import scala.collection.Seq;
// import scala.reflect.ClassManifestFactory;
// import scala.reflect.ClassTag$;
Seq<String> seq = JavaConverters.asScalaIteratorConverter(iterator).asScala().toSeq();
RDD<String> rdd = sparkSession.sparkContext().parallelize(seq, 2, 
                                               ClassTag$.MODULE$.apply(String.class));

// using explicit scala code in java (instead of implicit import)
implicits$ implicits = sparkSession.implicits();  // compiles (but highlighed as error in Intellij, no warnings in Eclipse Scala IDE)
Encoder<String> stringEncoder = implicits.newStringEncoder();
Dataset<Row> rowDataset1 = implicits.localSeqToDatasetHolder(stringSeq, stringEncoder).toDF();

output:

Person{name='ccc',age='59'}
Person{name='a',age='25'}
Person{name='bb',age='57'}

3. Create org.apache.spark.sql.DataFrame and org.apache.spark.sql.Dataset:
Dataframe is not available in Java because Dataframe is a scala alias for Dataset[Row].
Dataset is typed. Dataset typed with Row (DataSet<Row>) is Dataframe.

Scala:

// .toDF/.toDS requires sparkSession.implicits._
import sparkSession.implicits._

// 1. from rdd
val df: DataFrame = rdd.toDF() // defaults to `value` as column name: value: string (nullable = true)

val df2: DataFrame = rdd.toDF("str_name") // explicitly set column name: str_name: string (nullable = true)

val ds: Dataset[String] = rdd.toDS() // defaults to `value` as column name: value: string (nullable = true)

val personDF: DataFrame = personRDD.toDF()
personDF.printSchema() // both Dataset and Dataframe have schema associated
personDF.show()


// 2. from Seq
val df11: DataFrame = Seq("X", "Y", "Z").toDF()

val df12: DataFrame = Seq("X", "Y", "Z").toDF("my_string")

val ds11: Dataset[String] = Seq("X", "Y", "Z").toDS()

val personDf: DataFrame = Seq(Person("Bob", 21), Person("Alice", 20)).toDF()

val personDs: Dataset[Person] = Seq(Person("Bob", 21), Person("Alice", 20)).toDS()

// 3. from sparkSession.createDataframe/createDataset
// a) using RDD[Row] and Schema StructType
val df31: DataFrame = sparkSession.createDataFrame(rowRDD, StructType(Seq(StructField("my_column_name", StringType))))
// dataset can be created from RDD, Seq, util.List of type T and implicit Encoder[T]
val ds31: Dataset[Person] = sparkSession.createDataset(util.Arrays.asList(Person("Bob", 21)))

// 4. from / to Dataset / Dataframe:
val ds_2: Dataset[Person] = personDf.as[Person]
val df_2: DataFrame = personDs.toDF()
root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

+----+---+
|name|age|
+----+---+
|   a| 84|
|  bb| 80|
| ccc| 89|
+----+---+

Java:

// 1. from sparkSession createDataframe/createDataset
StructField structField = DataTypes.createStructField("my_name", DataTypes.StringType, true);
StructType structType = DataTypes.createStructType((Collections.singletonList(structField)));
Dataset<Row> dataFrame = sparkSession.createDataFrame(rowJavaRDD, structType);

Dataset<Row> personDataframe = sparkSession.createDataFrame(personJavaRDD, Person.class);
personDataframe.printSchema();
personDataframe.show();

Dataset<String> stringDataset = sparkSession.createDataset(stringJavaRDD.rdd(), Encoders.STRING());

Dataset<Person> personDataset = sparkSession.createDataset(
  Arrays.asList(new Person("Bob", 21), new Person("Alice", 20)), Encoders.bean(Person.class));
personDataset.show();

// 2. from / to Dataframe / Dataset
Dataset<Person> ds_2 = personDataframe.as(Encoders.bean(Person.class));
Dataset<Row> df_2 = personDataset.toDF();

output:

root
 |-- age: integer (nullable = false)
 |-- name: string (nullable = true)
+---+----+
|age|name|
+---+----+
| 19|   a|
| 56|  bb|
| 72| ccc|
+---+----+

+---+-----+
|age| name|
+---+-----+
| 21|  Bob|
| 20|Alice|
+---+-----+

4. Mapping RDDs, Dataframes and Datasets:
Scala:

val rowRDD: RDD[Row] = stringRDD.map(string => RowFactory.create(string))
val rowRDD2: RDD[Row] = stringRDD.map(RowFactory.create(_)) // as above

val intDataset: Dataset[Int] = stringDataset.map(string => string.length)
val intDataset2: Dataset[Int] = stringDataset.map(_.length) // as above

Java:

// Mapping RDD
// java functional interface org.apache.spark.api.java.function.Function
JavaRDD<Row> rowJavaRDD = stringJavaRDD.map(new Function<String, Row>() {  
    @Override
    public Row call(String string) throws Exception {
        return RowFactory.create(string);
    }
});

// lambda
JavaRDD<Row> rowJavaRDD2 = stringJavaRDD.map(RowFactory::create); // as above 

stringJavaRDD.foreach(s -> System.out.println(s));

// java.io.NotSerializableException: java.io.PrintStream
// stringJavaRDD.foreach(System.out::println); 

RDD<Row> rowRDD = rowJavaRDD.rdd();
// requires import of scala.reflect.ClassManifestFactory and scala.runtime.AbstractFunction1
RDD<Row> rowRDD1 = rowRDD.map(new AbstractFunction1<Row, Row>() {
    @Override
    public Row apply(Row row) {
        return row;
    }
}, ClassManifestFactory.fromClass(Row.class));

// Mapping Dataset - requires explicit encoder of mapping output type (in scala it used implicit encoder)
// org.apache.spark.api.java.function.MapFunction (from java package)
dataset.map(new MapFunction<String, Integer>() {
    @Override
    public Integer call(String value) throws Exception {
        return value.length();
    }
}, Encoders.INT());


// lambda usage as MapFunction is functional interface
// need to cast to MapFunction to avoid ambiguous method call
dataset.map((MapFunction<String, Integer>) String::length, Encoders.INT());

// compilation error - scala.Function1 is scala specialized and requires implementing additional abstract method apply$mcVJ$sp(long)1
/*
dataset.map(new Function1<String, Integer>() { 
    @Override
    public Integer apply(String string) {
        return string.length();
    }
}, Encoders.INT());
*/

// compilation error: incompatible types: scala.Function1 is not a functional interface multiple non-overriding abstract methods found in interface scala.Function1
// dataset.map((Function1<String, Integer>) String::length, Encoders.INT()); 

// Ambiguous method call conflicting with map(Function1)
// dataset.map(value -> value.length(), Encoders.INT()) 

Apache Beam / Dataflow – defining transformations using MapElements and ParDo.of with DoFn

pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>com.bawi</groupId>
    <artifactId>my-apache-beam-data-flow</artifactId>
    <version>0.1-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <beam.version>2.27.0</beam.version>
    </properties>

    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.8.1</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

    <dependencies>
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-core</artifactId>
            <version>${beam.version}</version>
        </dependency>

    </dependencies>

    <profiles>
        <profile>
            <id>direct-runner</id>
            <activation>
                <activeByDefault>true</activeByDefault>
            </activation>
            <!-- Makes the DirectRunner available when running a pipeline. -->
            <dependencies>
                <dependency>
                    <groupId>org.apache.beam</groupId>
                    <artifactId>beam-runners-direct-java</artifactId>
                    <version>${beam.version}</version>
                    <scope>runtime</scope>
                </dependency>
            </dependencies>
        </profile>
    </profiles>
</project>

Concepts:
1. Create PipelineOptions and Pipeline using create method

2. Create pipeline definition using apply method:
pCollection = pipeline.apply(pTransform)
pCollection2 = pCollection.apply(pTransform2)

or chained:
pipeline.apply(pTransform).apply(pTransform2)

PTransforms are in org.apache.beam.sdk.transforms package e.g: Create.of(), Filter.by(), MapElements.via(), ParDo.of() – all extending PTransform<PCollection<? extends InputT>, PCollection<OutputT>>

3. Execute pipeline using run method:
pipeline.run()

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.*;

public class MyInMemoryToConsolePipeline {

    public static void main(String[] args) {
        PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
        Pipeline pipeline = Pipeline.create(pipelineOptions);

        PTransform<PBegin, PCollection<String>> createStringsPT =
                Create.of("a", "bb", "ccc", "dddd");

        PCollection<String> inMemPC = pipeline.apply(createStringsPT);

        PTransform<PCollection<String>, PCollection<String>> filterPT
                = Filter.by(s -> s.length() > 1);

        inMemPC.apply(filterPT)

        // MapElements<InputT, OutputT> extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>>
               .apply(MapElements
                       .into(TypeDescriptors.integers()) // requires specifying TypeDescriptor
                       .via( // allow using lambda
                           s -> {
                               System.out.println("s=" + s);
                               return s.length();
                           }
                       )
               )

// or
               .apply(MapElements.via(new SimpleFunction<String, Integer>() {
                    @Override
                    public Integer apply(String s) { // does not implement functional interface but TypeDescriptor not required
                        System.out.println("s=" + s);
                        return s.length();
                    }
               }))

// or

        // ParDo.of returning SingleOutput<InputT, OutputT> extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>>
                .apply(ParDo.of(new DoFn<String, Integer>() {
                    @ProcessElement
                    public void processElement(@Element String element, OutputReceiver<Integer> out) {
                        System.out.println("element=" + element);
                        out.output(element.length());
                    }
                }))

// or with ProcessContext instead of @Element and OutputReceiver
                .apply(ParDo.of(new DoFn<String, Integer>() { // ParDo.of extends PTransform
                    @ProcessElement
                    public void processElement(ProcessContext context) {
                        String element = context.element();
                        System.out.println("element=" + element);
                        context.output(element.length());
                    }
                }))

                // custom impl of PTransform
                //  - with nested transformation that prints input to console
                //  - that returns PDone disallowing further transformation (no apply method available)
                .apply(ConsoleIO.write())
                ;

        pipeline.run().waitUntilFinish();
    }

    private static class ConsoleIO<T> extends PTransform<PCollection<T>, PDone> {
        private ConsoleIO() { }

        private static <T> ConsoleIO<T> write() {
            return new ConsoleIO<>();
        }

        @Override
        public PDone expand(PCollection<T> input) {
            input.apply(MapElements.via(new SimpleFunction<T, Void>() {
                @Override
                public Void apply(T input) {
                    System.out.println("input=" + input);
                    return null;
                }
            }));
            return PDone.in(input.getPipeline());
        }
    }

}    

output (may be in different order):

element=bb
element=ccc
element=dddd
input=3
input=4
input=2