Category: Uncategorized

Overriding Dataflow parameters

Dataflow jobs can be started from Dataflow classic templates using:
1) HTTP Post request:

curl -H "X-Goog-User-Project: $GCP_PROJECT" \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-X POST \
-H "Content-Type:application/json" \
-d @job.json "https://dataflow.googleapis.com/v1b3/projects/my-bucket/locations/us-central1/templates?alt=json

where job.json is similar to:

{
    "jobName": "mysimplejob-from-template3",
    "gcsPath": "gs://my-bucket/MySimpleJob/template/MySimpleJob-template",
    "environment": {
        "bypassTempDirValidation": false,
        "maxWorkers": 1,
        "numWorkers": 2,
        "workerRegion": "us-central1",
        "serviceAccountEmail": "my-service-account@my-project.iam.gserviceaccount.com",
        "machineType": "t2d-standard-2",
        "tempLocation": "gs://my-bucket/MySimpleJob/temp",
        "subnetwork": "https://www.googleapis.com/compute/v1/projects/my-project/regions/us-central1/subnetworks/my-subnetwork",
        "ipConfiguration": "WORKER_IP_PRIVATE",
        "enableStreamingEngine": false,
        "additionalExperiments": [
            "shuffle_mode=appliance"
        ]
    },
    "parameters": {
        "output": "gs://my-bucket/MySimpleJob/output",
        "seqEnd": "300",
        "startDate": "2024-03-03T00:00:00Z"
    }
}

or gcloud, http post request and using GCP SDK.

The complete list of Runtime environment variables is available at https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment

Only these Runtime environment variables can be overridden with a new value regardless they are set or not in the template, e.g. numWorkers, diskSizeGb etc.

However, these RuntimeEnvironment variables does not necessary match method names defined by org.apache.beam.runners.dataflow.options.DataflowPipelineOptions

e.g.
RuntimeEnvironment: maxWorkers, machineType, ipConfiguration, etc
vs
DataflowPipelineOptions methods: getMaxNumWorkers, getWorkerMachineType, getUsePublicIps

at template creation:

    "--numWorkers=1",
    "--maxNumWorkers=1",
    "--workerMachineType=t2d-standard-2",
    "--diskSizeGb=100",
    "--output=gs://" + BUCKET + "/" + JOB_NAME + "/template/output",
    "--templateLocation=gs://" + BUCKET + "/" + JOB_NAME + "/template/" + JOB_NAME + "-template",
    "--stagingLocation=gs://" + BUCKET + "/" + JOB_NAME + "/template/staging",
    "--seqEnd=100",
    "--startDate=2024-01-01T00:00:00Z",
    "--windowSecs=1"

at template execution:

{
    "jobName": "mysimplejob-from-template2",
    "gcsPath": "gs://my-bucket/MySimpleJob/template/MySimpleJob-template",
    "environment": {
        "bypassTempDirValidation": false,
        "maxWorkers": 2,
        "numWorkers": 2,
        "workerRegion": "us-central1",
        "serviceAccountEmail": "my-service-account@my-project.iam.gserviceaccount.com",
        "machineType": "t2d-standard-4",
        "tempLocation": "gs://my-bucket/MySimpleJob/temp2",
        "subnetwork": "https://www.googleapis.com/compute/v1/projects/my-project/regions/us-central1/subnetworks/my-subnetwork",
        "ipConfiguration": "WORKER_IP_PRIVATE",
        "enableStreamingEngine": false,
        "diskSizeGb": 200,
        "additionalExperiments": [
            "shuffle_mode=appliance"
        ]
    },
    "parameters": {
        "output": "gs://my-bucket/MySimpleJob/output2",
        "seqEnd": "200",
        "startDate": "2024-02-02T00:00:00Z",
        "windowSecs": "2"
    }
}

effective:
– 2 workers started visible in Dataflow Autoscaling tab (value HTTP Post json maxWorkers and numWorkers)
– diskSizeGB is effectively 200 visible in GCE VMs (from HTTP Post json)
– machine type is effectively “t2d-standard-4” visible in CGE VMs (from HTTP Post json “machineType”, even though Job info/Resource metrics/workerMachineType t2d-standard-2 which is misleading)

– output is effectively gs://my-bucket/MySimpleJob/template/output (as it is not a RuntimeEnvironment variable but it is a parameter, value from template: “–output=gs://” + BUCKET + “/” + JOB_NAME + “/template/output”)
– seqEnd is effective 100 visible as the last log ‘Processing: 100,2024-01-01T00:01:40.000Z’ as value provider parameter is already set in template, value from template
– startDate is effective 2024-01-01T00:00:00Z visible in the logs ‘Processing: 0,2024-01-01T00:00:00.000Z’, as value provider parameter is already set in template, value from template
– windowSecs is effective 1 as produced file name has 00..01 ‘gs://my-bucket/MySimpleJob/template/output/element/element-2024-01-01T00_00_00.000Z..2024-01-01T00_01_00.000Z-0-of-1.txt’, as parameter is already set in template, value from template (even though Job info/Resource metrics/windowSecs is 2 which is misleading)

To summarize, check worker logs for log starting with “Worker harness” or “pipeline_options:” to check e.g.

  "display_data": [
    {
      "key": "output",
      "namespace": "com.bawi.beam.dataflow.MySimpleJob$MyPipelineOptions",
      "type": "STRING",
      "value": "gs://my-project/MySimpleJob/template/output/element"
    },
    {
      "key": "diskSizeGb",
      "namespace": "org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions",
      "type": "INTEGER",
      "value": 200
    },

vs

  "options": {
    "output": "gs://my-project/MySimpleJob/output2",
    "diskSizeGb": 200,

Since the output is not RuntimeEnviroment variable the value from template (in the display_data section) will be effectively used, not the value in the options.

But for the RuntimeEnviroment variable diskSizeGB the value from options will be used (not from the template).

2. Template creation settings remain unchanged but HTTP Post request does not have any overrides for num and max machines, machine type and parameters:

at template execution:

{
    "jobName": "mysimplejob-from-template2",
    "gcsPath": "gs://my-bucket/MySimpleJob/template/MySimpleJob-template",
    "environment": {
        "bypassTempDirValidation": false,
        "workerRegion": "us-central1",
        "serviceAccountEmail": "my-service-account@my-project.iam.gserviceaccount.com",
        "tempLocation": "gs://my-bucket/MySimpleJob/temp2",
        "subnetwork": "https://www.googleapis.com/compute/v1/projects/my-project/regions/us-central1/subnetworks/my-subnetwork",
        "ipConfiguration": "WORKER_IP_PRIVATE",
        "enableStreamingEngine": false,
        "additionalExperiments": [
            "shuffle_mode=appliance"
        ]
    },
    "parameters": {}
}

so the values are taken from the template:
effective:
– 1 worker started visible in Dataflow Autoscaling tab
– diskSizeGB is effectively 100 visible in GCE VMs
– machine type is effectively “t2d-standard-2” visible in CGE VMs

and parameters taken from template
– output is effectively gs://my-bucket/MySimpleJob/template/output
– seqEnd is effective 100
– startDate is effective 2024-01-01T00:00:00Z
– windowSecs is effective 1 as produced file name has 00..01

3. Template values a minimal and remaining ones are provider when starting from template:

  "--templateLocation=gs://" + BUCKET + "/" + JOB_NAME + "/template/" + JOB_NAME + "-template",
  "--stagingLocation=gs://" + BUCKET + "/" + JOB_NAME + "/template/staging",
  "--windowSecs=1"
{
    "jobName": "mysimplejob-from-template2",
    "gcsPath": "gs://my-bucket/MySimpleJob/template/MySimpleJob-template",
    "environment": {
        "bypassTempDirValidation": false,
        "maxWorkers": 2,
        "numWorkers": 2,
        "workerRegion": "us-central1",
        "serviceAccountEmail": "my-service-account@my-project.iam.gserviceaccount.com",
        "machineType": "t2d-standard-4",
        "tempLocation": "gs://my-bucket/MySimpleJob/temp2",
        "subnetwork": "https://www.googleapis.com/compute/v1/projects/my-project/regions/us-central1/subnetworks/my-subnetwork",
        "ipConfiguration": "WORKER_IP_PRIVATE",
        "enableStreamingEngine": false,
        "diskSizeGb": 200,
        "additionalExperiments": [
            "shuffle_mode=appliance"
        ]
    },
    "parameters": {
        "output": "gs://my-bucket/MySimpleJob/output2",
        "seqEnd": "200",
        "startDate": "2024-02-02T00:00:00Z",
        "windowSecs": "2"
    }
}

effective:
– 2 workers started visible in Dataflow Autoscaling tab (from HTTP Post json maxWorkers and numWorkers)
– diskSizeGB is effectively 200 visible in GCE VMs (from HTTP Post json)
– machine type is effectively “t2d-standard-4” visible in CGE VMs (from HTTP Post json “machineType”)

– output is effectively gs://my-project/MySimpleJob/output2/ as value provider parameter was not set in template, value HTTP Post json
– seqEnd is effective 200 visible as the last log ‘Processing: 200,2024-02-02T00:03:20.000Z’ as value provider parameter was not set in template, value HTTP Post json
– startDate is effective 2024-02-02T00:00:00Z visible in the logs ‘Processing: 60,2024-02-02T00:01:00.000Z’, as value provider parameter is already set in template, value from template
– windowSecs is effective 1 as produced file name has 00..01 ‘gs://my-bucket/MySimpleJob/output2/element-2024-02-02T00_00_00.000Z..2024-02-02T00_01_00.000Z-0-of-1.txt’, as parameter is already set in template, value from template (even though Job info/Resource metrics/windowSecs is 2 which is misleading)

Worker harness shows:

  "display_data": [
    {
      "key": "maxNumWorkers",
      "namespace": "google.dataflow.v1beta3.TemplatesService",
      "type": "INTEGER",
      "value": 2
    },
    {
      "key": "machineType",
      "namespace": "google.dataflow.v1beta3.TemplatesService",
      "type": "STRING",
      "value": "t2d-standard-4"
    },
    {
      "key": "diskSizeGb",
      "namespace": "google.dataflow.v1beta3.TemplatesService",
      "type": "INTEGER",
      "value": 200
    },
    {
      "key": "windowSecs",
      "namespace": "com.bawi.beam.dataflow.MySimpleJob$MyPipelineOptions",
      "type": "INTEGER",
      "value": 1
    },
  ],
  "options": {
    "maxNumWorkers": 2,
    "machineType": "t2d-standard-4",
    "diskSizeGb": 200,
    "seqEnd": "200",
    "windowSecs": "2",

4. In case neither template nor HTTP Post json does not define num worker, max workers, machine type, diskSizeGB, then Dataflow will use it defaults, which for batch is 1 worker with n1-standard-1 with 250 GB disk when Dataflow Shuffle is disabled.

package com.bawi.beam.dataflow;

import com.bawi.beam.WindowUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.*;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.transforms.windowing.*;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.beam.sdk.values.TypeDescriptors.integers;

public class MySimpleJob {
    private static final Logger LOGGER = LoggerFactory.getLogger(MySimpleJob.class);

    @SuppressWarnings("unused")
    public interface MyPipelineOptions extends PipelineOptions {
        ValueProvider<Integer> getSeqEnd();
        void setSeqEnd(ValueProvider<Integer> seqEnd);

        ValueProvider<String> getOutput();
        void setOutput(ValueProvider<String> getOutput);

        ValueProvider<String> getTempDir();
        void setTempDir(ValueProvider<String> getOutput);

        ValueProvider<String> getStartDate();
        void setStartDate(ValueProvider<String> startDate);

        ValueProvider<Integer> getNumber();
        void setNumber(ValueProvider<Integer> number);

        int getWindowSecs();
        void setWindowSecs(int windowSecs);

    }

    public static void main(String[] args) {
        String JOB_NAME = MySimpleJob.class.getSimpleName();
        String BUCKET = System.getenv("GCP_BUCKET");

//        args = PipelineUtils.updateArgs(args,
//                "--output=target/" + JOB_NAME + "/output",
//                "--tempDir=target/" + JOB_NAME + "/temp",  // used for TextIO.write().withWindowedWrites()
        args = PipelineUtils.updateArgsWithDataflowRunner(args,
                "--numWorkers=1",
                "--maxNumWorkers=1",
                "--workerMachineType=t2d-standard-2",
                "--usePublicIps=true",
                "--diskSizeGb=100",
                "--output=gs://" + BUCKET + "/" + JOB_NAME + "/template/output/element",
                "--templateLocation=gs://" + BUCKET + "/" + JOB_NAME + "/template/" + JOB_NAME + "-template",
                "--stagingLocation=gs://" + BUCKET + "/" + JOB_NAME + "/template/staging",
                "--seqEnd=100",
                "--startDate=2024-01-01T00:00:00Z",
                "--windowSecs=1"
        );
        MyPipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(MyPipelineOptions.class);
        Pipeline pipeline = Pipeline.create(options);

        NestedValueProvider<List<Integer>, Integer> seqProvider = NestedValueProvider.of(options.getSeqEnd(),
                seqEnd -> IntStream.rangeClosed(0, seqEnd).boxed().collect(Collectors.toList()));

        PCollection<String> converted = pipeline.apply(Create.ofProvider(seqProvider, ListCoder.of(VarIntCoder.of())))
                .apply(FlatMapElements.into(integers()).via(i -> i))
                .apply(ParDo.of(new MyDoFn(options.getStartDate())))
                .apply(WithTimestamps.of(line -> Instant.parse(line.split(",")[1])))
                .apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSecs()))));

        /*
        if (options.getNumber().get() == 1) { // java.lang.IllegalStateException: Value only available at runtime, but accessed from a non-runtime context: RuntimeValueProvider{propertyName=number, default=null}
            converted.apply(TextIO.write().withSuffix(".txt").to(options.getOutput()));
        }
         */

//        converted.apply(TextIO.write()
//                .withWindowedWrites()
//                .to(new MyFilenamePolicy(options.getOutput(), "element-", ".txt"))
//                .withTempDirectory(NestedValueProvider.of(options.getTempDir(), FileBasedSink::convertToFileResourceIfPossible)));


        converted.apply(FileIO.<String>write()
                .via(TextIO.sink())
                .to(options.getOutput())
                .withNaming(new MyFileNaming("element-", ".txt")));

        PipelineResult result = pipeline.run();

        if ("DirectPipelineResult".equals(result.getClass().getSimpleName())) {
            result.waitUntilFinish(); // usually waitUntilFinish while pipeline development, remove when generating dataflow classic template
        }
    }

    static class MyDoFn extends DoFn<Integer,String> {

        private final ValueProvider<String> startDate;

        public MyDoFn(ValueProvider<String> startDate) {
            this.startDate = startDate;
        }

        @ProcessElement
        public void process(@Element Integer i, OutputReceiver<String> receiver) throws InterruptedException {
            Instant instant = Instant.parse(startDate.get()).plus(Duration.standardSeconds(i));
            LOGGER.info("Processing: {}", i + "," + instant);
            receiver.output(i + "," + instant);
        }
    }

    private static class MyFilenamePolicy extends FileBasedSink.FilenamePolicy {
        private final ValueProvider<String> output;
        private final String prefix;
        private final String suffix;

        public MyFilenamePolicy(ValueProvider<String> output, String prefix, String suffix) {
            this.output = output;
            this.prefix = prefix;
            this.suffix = suffix;
        }

        @Override
        public ResourceId windowedFilename(int shardNumber, int numShards, BoundedWindow window, PaneInfo paneInfo, FileBasedSink.OutputFileHints outputFileHints) {
            ResourceId resource = FileBasedSink.convertToFileResourceIfPossible(output.get());
            String parentDirectoryPathWithSlash =  resource.isDirectory() ? "" : resource.getFilename() + "/";
            String filename = String.format("%s%s%s-%s-of-%s%s", parentDirectoryPathWithSlash,
                    prefix, WindowUtils.windowToNormalizedString(window), shardNumber, numShards, suffix);
            ResourceId resourceId = resource.getCurrentDirectory().resolve(filename, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
            System.out.println(filename + "," + resourceId);
            // output/element-2024-01-01T00_00_00.000Z..2024-01-01T00_02_00.000Z-3-of-5.txt,/Users/me/dev/my-apache-beam-dataflow/target/MySimpleJob/output/element-2024-01-01T00_00_00.000Z..2024-01-01T00_02_00.000Z-3-of-5.txt
            // "target/MySimpleJob/output/element-2024-01-01T00_00_00.000Z..2024-01-01T00_02_00.000Z-0-of-4.txt
            return resourceId;
        }

        @Override
        public ResourceId unwindowedFilename(int shardNumber, int numShards, FileBasedSink.OutputFileHints outputFileHints) {
            throw new UnsupportedOperationException("Expecting windowed outputs only");
        }
    }
    
    private static class MyFileNaming implements FileIO.Write.FileNaming {
        private final String prefix;
        private final String suffix;

        public MyFileNaming(String prefix, String suffix) {
            this.prefix = prefix;
            this.suffix = suffix;
        }

        @Override
        public String getFilename(BoundedWindow window, PaneInfo pane, int numShards, int shardIndex, Compression compression) {
            String fileName = String.format("%s%s-%s-of-%s%s", prefix, windowToNormalizedString(window), shardIndex, numShards, suffix);
            LOGGER.info("fileName={}", fileName);
            // fileName=element-2024-01-01T00_00_00.000Z..2024-01-01T00_02_00.000Z-4-of-5.txt
            // writes to: target/MySimpleJob/output/element-2024-01-01T00_00_00.000Z..2024-01-01T00_02_00.000Z-0-of-5.txt
            return fileName;
        }

        private static String windowToNormalizedString(BoundedWindow window) {
            return window instanceof GlobalWindow ? replace(window.maxTimestamp()) :
                    replace(((IntervalWindow) window).start()) + ".." + replace(((IntervalWindow) window).end());
        }

        private static String replace(Instant instant) {
            return instant.toString().replace(":", "_").replace(" ", "_");
        }

    }
}

parquet-tools snappy compression

java -Dlog4j.configuration=file:$HOME/Downloads/log4j.properties \
-cp $HOME/Downloads/parquet-tools-1.8.3.jar:$HOME/.m2/repository/org/xerial/snappy/snappy-java/1.1.8.4/snappy-java-1.1.8.4.jar \
org.apache.parquet.tools.Main --help
usage: parquet-tools cat [option...] 
where option is one of:
       --debug     Enable debug output
    -h,--help      Show this help string
    -j,--json      Show records in JSON format.
       --no-color  Disable color output even if supported
where  is the parquet file to print to stdout

usage: parquet-tools head [option...] 
where option is one of:
       --debug          Enable debug output
    -h,--help           Show this help string
    -n,--records   The number of records to show (default: 5)
       --no-color       Disable color output even if supported
where  is the parquet file to print to stdout

usage: parquet-tools schema [option...] 
where option is one of:
    -d,--detailed  Show detailed information about the schema.
       --debug     Enable debug output
    -h,--help      Show this help string
       --no-color  Disable color output even if supported
where  is the parquet file containing the schema to show

usage: parquet-tools meta [option...] 
where option is one of:
       --debug     Enable debug output
    -h,--help      Show this help string
       --no-color  Disable color output even if supported
where  is the parquet file to print to stdout

usage: parquet-tools dump [option...] 
where option is one of:
    -c,--column   Dump only the given column, can be specified more than
                       once
    -d,--disable-data  Do not dump column data
       --debug         Enable debug output
    -h,--help          Show this help string
    -m,--disable-meta  Do not dump row group and page metadata
       --no-color      Disable color output even if supported
where  is the parquet file to print to stdout
java -Dlog4j.configuration=file:$HOME/Downloads/log4j.properties \
-cp $HOME/Downloads/parquet-tools-1.8.3.jar:$HOME/.m2/repository/org/xerial/snappy/snappy-java/1.1.8.4/snappy-java-1.1.8.4.jar \
org.apache.parquet.tools.Main cat myNestedRecord.parquet
myRequiredInt = 123
myRequiredString = abc
myRequiredBoolean = true
myRequiredBytes = QUJDMTIz
myBytesDecimal = SVBPgA==
myRequiredTimestamp = 1699601177192000
myRequiredDate = 19671
myRequiredArrayLongs:
.array = 1
.array = 2
.array = 3
myRequiredSubRecord:
.myRequiredDouble = 1.0
.myRequiredBoolean = false
myOptionalSubRecord:
.myRequiredFloat = 2.0
.myRequiredBoolean = true
myNullableSubRecord:
.myRequiredLong = 12
.myRequiredBoolean = false
myOptionalArraySubRecords:
.array:
..myRequiredBoolean = true
.array:
..myRequiredBoolean = false
java -Dlog4j.configuration=file:$HOME/Downloads/log4j.properties \
-cp $HOME/Downloads/parquet-tools-1.8.3.jar:$HOME/.m2/repository/org/xerial/snappy/snappy-java/1.1.8.4/snappy-java-1.1.8.4.jar \
org.apache.parquet.tools.Main cat --j myNestedRecord.parquet | jq
{
  "myRequiredInt": 123,
  "myRequiredString": "abc",
  "myRequiredBoolean": true,
  "myRequiredBytes": "QUJDMTIz",
  "myBytesDecimal": "SVBPgA==",
  "myRequiredTimestamp": 1699601177192000,
  "myRequiredDate": 19671,
  "myRequiredArrayLongs": [
    1,
    2,
    3
  ],
  "myRequiredSubRecord": {
    "myRequiredDouble": 1,
    "myRequiredBoolean": false
  },
  "myOptionalSubRecord": {
    "myRequiredFloat": 2,
    "myRequiredBoolean": true
  },
  "myNullableSubRecord": {
    "myRequiredLong": 12,
    "myRequiredBoolean": false
  },
  "myOptionalArraySubRecords": [
    {
      "myRequiredBoolean": true
    },
    {
      "myRequiredBoolean": false
    }
  ]
}
java -Dlog4j.configuration=file:$HOME/Downloads/log4j.properties \
-cp $HOME/Downloads/parquet-tools-1.8.3.jar:$HOME/.m2/repository/org/xerial/snappy/snappy-java/1.1.8.4/snappy-java-1.1.8.4.jar \
org.apache.parquet.tools.Main meta myNestedRecord.parquet
file:                      file:/Users/me/dev/my-apache-beam-dataflow/terraform/external-parquet-bq-table/myNestedRecord.parquet
creator:                   parquet-mr version 1.12.0 (build db75a6815f2ba1d1ee89d1a90aeb296f1f3a8f20)
extra:                     parquet.avro.schema = {"type":"record","name":"myRecordName","fields":[{"name":"myRequiredInt","type":"int"},{"name":"myRequiredString","type":"string"},{"name":"myOptionalString","type":["null","string"],"default":null},{"name":"myNullableString","type":["string","null"],"default":"myNullableStringDefaultValue"},{"name":"myRequiredBoolean","type":"boolean"},{"name":"myRequiredBytes","type":"bytes"},{"name":"myBytesDecimal","type":{"type":"bytes","logicalType":"decimal","precision":38,"scale":9}},{"name":"myRequiredTimestamp","type":{"type":"long","logicalType":"timestamp-micros"}},{"name":"myOptionalTimestamp","type":["null",{"type":"long","logicalType":"timestamp-micros"}],"default":null},{"name":"myRequiredDate","type":{"type":"int","logicalType":"date"},"doc":"Expiration date field"},{"name":"myRequiredArrayLongs","type":{"type":"array","items":"long"}},{"name":"myRequiredSubRecord","type":{"type":"record","name":"myRequiredSubRecordType","fields":[{"name":"myRequiredDouble","type":"double"},{"name":"myRequiredBoolean","type":"boolean"}]}},{"name":"myOptionalSubRecord","type":["null",{"type":"record","name":"myOptionalSubRecordType","fields":[{"name":"myRequiredFloat","type":"float"},{"name":"myRequiredBoolean","type":"boolean"}]}],"default":null},{"name":"myNullableSubRecord","type":[{"type":"record","name":"myNullableSubRecordType","fields":[{"name":"myRequiredLong","type":"long"},{"name":"myRequiredBoolean","type":"boolean"}]},"null"]},{"name":"myOptionalArraySubRecords","type":[{"type":"array","items":{"type":"record","name":"myOptionalArraySubRecord","fields":[{"name":"myRequiredBoolean","type":"boolean"}]}},"null"]}]}
extra:                     writer.model.name = avro

file schema:               myRecordName
--------------------------------------------------------------------------------
myRequiredInt:             REQUIRED INT32 R:0 D:0
myRequiredString:          REQUIRED BINARY O:UTF8 R:0 D:0
myOptionalString:          OPTIONAL BINARY O:UTF8 R:0 D:1
myNullableString:          OPTIONAL BINARY O:UTF8 R:0 D:1
myRequiredBoolean:         REQUIRED BOOLEAN R:0 D:0
myRequiredBytes:           REQUIRED BINARY R:0 D:0
myBytesDecimal:            REQUIRED BINARY O:DECIMAL R:0 D:0
myRequiredTimestamp:       REQUIRED INT64 O:TIMESTAMP_MICROS R:0 D:0
myOptionalTimestamp:       OPTIONAL INT64 O:TIMESTAMP_MICROS R:0 D:1
myRequiredDate:            REQUIRED INT32 O:DATE R:0 D:0
myRequiredArrayLongs:      REQUIRED F:1
.array:                    REPEATED INT64 R:1 D:1
myRequiredSubRecord:       REQUIRED F:2
.myRequiredDouble:         REQUIRED DOUBLE R:0 D:0
.myRequiredBoolean:        REQUIRED BOOLEAN R:0 D:0
myOptionalSubRecord:       OPTIONAL F:2
.myRequiredFloat:          REQUIRED FLOAT R:0 D:1
.myRequiredBoolean:        REQUIRED BOOLEAN R:0 D:1
myNullableSubRecord:       OPTIONAL F:2
.myRequiredLong:           REQUIRED INT64 R:0 D:1
.myRequiredBoolean:        REQUIRED BOOLEAN R:0 D:1
myOptionalArraySubRecords: OPTIONAL F:1
.array:                    REPEATED F:1
..myRequiredBoolean:       REQUIRED BOOLEAN R:1 D:2

row group 1:               RC:1 TS:570 OFFSET:4
--------------------------------------------------------------------------------
myRequiredInt:              INT32 SNAPPY DO:0 FPO:4 SZ:29/27/0.93 VC:1 ENC:BIT_PACKED,PLAIN
myRequiredString:           BINARY SNAPPY DO:0 FPO:33 SZ:32/30/0.94 VC:1 ENC:BIT_PACKED,PLAIN
myOptionalString:           BINARY SNAPPY DO:0 FPO:65 SZ:31/29/0.94 VC:1 ENC:RLE,BIT_PACKED,PLAIN
myNullableString:           BINARY SNAPPY DO:0 FPO:96 SZ:31/29/0.94 VC:1 ENC:RLE,BIT_PACKED,PLAIN
myRequiredBoolean:          BOOLEAN SNAPPY DO:0 FPO:127 SZ:26/24/0.92 VC:1 ENC:BIT_PACKED,PLAIN
myRequiredBytes:            BINARY SNAPPY DO:0 FPO:153 SZ:35/33/0.94 VC:1 ENC:BIT_PACKED,PLAIN
myBytesDecimal:             BINARY SNAPPY DO:0 FPO:188 SZ:33/31/0.94 VC:1 ENC:BIT_PACKED,PLAIN
myRequiredTimestamp:        INT64 SNAPPY DO:0 FPO:221 SZ:33/31/0.94 VC:1 ENC:BIT_PACKED,PLAIN
myOptionalTimestamp:        INT64 SNAPPY DO:0 FPO:254 SZ:31/29/0.94 VC:1 ENC:RLE,BIT_PACKED,PLAIN
myRequiredDate:             INT32 SNAPPY DO:0 FPO:285 SZ:29/27/0.93 VC:1 ENC:BIT_PACKED,PLAIN
myRequiredArrayLongs:
.array:                     INT64 SNAPPY DO:0 FPO:314 SZ:54/59/1.09 VC:3 ENC:RLE,PLAIN
myRequiredSubRecord:
.myRequiredDouble:          DOUBLE SNAPPY DO:0 FPO:368 SZ:33/31/0.94 VC:1 ENC:BIT_PACKED,PLAIN
.myRequiredBoolean:         BOOLEAN SNAPPY DO:0 FPO:401 SZ:25/23/0.92 VC:1 ENC:BIT_PACKED,PLAIN
myOptionalSubRecord:
.myRequiredFloat:           FLOAT SNAPPY DO:0 FPO:426 SZ:35/33/0.94 VC:1 ENC:RLE,BIT_PACKED,PLAIN
.myRequiredBoolean:         BOOLEAN SNAPPY DO:0 FPO:461 SZ:32/30/0.94 VC:1 ENC:RLE,BIT_PACKED,PLAIN
myNullableSubRecord:
.myRequiredLong:            INT64 SNAPPY DO:0 FPO:493 SZ:39/37/0.95 VC:1 ENC:RLE,BIT_PACKED,PLAIN
.myRequiredBoolean:         BOOLEAN SNAPPY DO:0 FPO:532 SZ:32/30/0.94 VC:1 ENC:RLE,BIT_PACKED,PLAIN
myOptionalArraySubRecords:
.array:
..myRequiredBoolean:        BOOLEAN SNAPPY DO:0 FPO:564 SZ:39/37/0.95 VC:2 ENC:RLE,PLAIN

Avro Schema – java and json/avsc

Build avro schema programmatically in Java:

    private static final Schema TIMESTAMP_MICROS_LOGICAL_TYPE = LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
    private static final Schema DATE_LOGICAL_TYPE = LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
    private static final Schema DECIMAL_38_LOGICAL_TYPE = LogicalTypes.decimal(38, 9).addToSchema(Schema.create(Schema.Type.BYTES));

    private static final Schema INTPUT_SCHEMA = SchemaBuilder.record("myRecordName")
            .fields()
            .requiredInt("myRequiredInt")
            .requiredString("myRequiredString")
            .optionalString("myOptionalString")
            .nullableString("myNullableString", "myNullableStringDefaultValue")
            .requiredBoolean("myRequiredBoolean")
            .requiredBytes("myRequiredBytes")
            .name("myBytesDecimal").type(DECIMAL_38_LOGICAL_TYPE).noDefault()
            .name("myRequiredTimestamp").type(TIMESTAMP_MICROS_LOGICAL_TYPE).noDefault() // needs to be timestamp_micros (not timestamp_millis)
            .name("myOptionalTimestamp").type().optional().type(TIMESTAMP_MICROS_LOGICAL_TYPE)
            .name("myRequiredDate").doc("Expiration date field").type(DATE_LOGICAL_TYPE).noDefault()
            .name("myRequiredArrayLongs").type().array().items().longType().noDefault()
            .name("myRequiredSubRecord").type(SchemaBuilder.record("myRequiredSubRecordType").fields().requiredDouble("myRequiredDouble").requiredBoolean("myRequiredBoolean").endRecord()).noDefault()
            .name("myOptionalSubRecord").type().optional().record("myOptionalSubRecordType").fields().requiredFloat("myRequiredFloat").requiredBoolean("myRequiredBoolean").endRecord()
            .name("myNullableSubRecord").type().nullable().record("myNullableSubRecordType").fields().requiredLong("myRequiredLong").requiredBoolean("myRequiredBoolean").endRecord().noDefault()
            .name("myOptionalArraySubRecords").type().nullable().array().items(
                    SchemaBuilder.record("myOptionalArraySubRecord").fields().requiredBoolean("myRequiredBoolean").endRecord()).noDefault()
            .endRecord();

Create GenericRecord for the schema:

    private static GenericRecord createGenericRecord(Schema schema) {
        GenericRecord record = new GenericData.Record(schema);
        record.put("myRequiredInt", 123);
        record.put("myRequiredString", "abc");
        record.put("myRequiredBoolean", true);
        record.put("myRequiredBytes", ByteBuffer.wrap("ABC123".getBytes()));
        record.put("myBytesDecimal",  doubleToByteBuffer(1.23d));
        record.put("myRequiredTimestamp", System.currentTimeMillis() * 1000); // needs to be timestamp_micros (not timestamp_millis)
        record.put("myRequiredDate", (int) new Date(System.currentTimeMillis()).toLocalDate().toEpochDay());
        record.put("myRequiredArrayLongs", Arrays.asList(1L, 2L, 3L));
        Schema myRequiredSubRecordSchema = schema.getField("myRequiredSubRecord").schema();
        GenericRecord myRequiredSubRecord = new GenericData.Record(myRequiredSubRecordSchema);
        myRequiredSubRecord.put("myRequiredDouble", 1.0d);
        myRequiredSubRecord.put("myRequiredBoolean", false);
        record.put("myRequiredSubRecord", myRequiredSubRecord);
        Schema myOptionalSubRecordSchema = unwrapSchema(schema.getField("myOptionalSubRecord").schema());
        GenericRecord myOptionalSubRecord = new GenericData.Record(myOptionalSubRecordSchema);
        myOptionalSubRecord.put("myRequiredFloat", 2.0f);
        myOptionalSubRecord.put("myRequiredBoolean", true);
        record.put("myOptionalSubRecord", myOptionalSubRecord);
        Schema myNullableSubRecordSchema = unwrapSchema(schema.getField("myNullableSubRecord").schema());
        GenericRecord myNullableSubRecord = new GenericData.Record(myNullableSubRecordSchema);
        myNullableSubRecord.put("myRequiredLong", 12L);
        myNullableSubRecord.put("myRequiredBoolean", false);
        record.put("myNullableSubRecord", myNullableSubRecord);
        Schema myOptionalArraySubRecords = schema.getField("myOptionalArraySubRecords").schema();
        Schema arraySubRecordSchema = unwrapSchema(myOptionalArraySubRecords).getElementType();
        GenericRecord mySubRecord1 = new GenericData.Record(arraySubRecordSchema);
        mySubRecord1.put("myRequiredBoolean", true);
        GenericRecord mySubRecord2 = new GenericData.Record(arraySubRecordSchema);
        mySubRecord2.put("myRequiredBoolean", false);
        record.put("myOptionalArraySubRecords", Arrays.asList(mySubRecord1, mySubRecord2));
        return record;
    }


    private static void createAvroFile(String path) {
        DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(INTPUT_SCHEMA);
        DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
        dataFileWriter.setCodec(CodecFactory.snappyCodec());
        try {
            dataFileWriter.create(INTPUT_SCHEMA, new File(path));
            for (int i = 1; i <= 1; i++) {
                GenericRecord record = createGenericRecord(INTPUT_SCHEMA);
                dataFileWriter.append(record);
            }
            dataFileWriter.close();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private static Schema unwrapSchema(Schema schema) {
        if (schema.getType() == Schema.Type.UNION) {
            List<Schema> types = schema.getTypes();
            return types.get(0).getType() == Schema.Type.NULL ? types.get(1) : types.get(0);
        }
        return schema;
    }

    private static ByteBuffer doubleToByteBuffer(double d) {
        BigDecimal bigDecimal = BigDecimal.valueOf(d).setScale(9, RoundingMode.UNNECESSARY);
        BigInteger bigInteger = bigDecimal.unscaledValue();
        return ByteBuffer.wrap(bigInteger.toByteArray());
    }
{
  "type": "record",
  "name": "myRecordName",
  "fields": [
    {
      "name": "myRequiredInt",
      "type": "int"
    },
    {
      "name": "myRequiredString",
      "type": "string"
    },
    {
      "name": "myOptionalString",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "myNullableString",
      "type": [
        "string",
        "null"
      ],
      "default": "myNullableStringDefaultValue"
    },
    {
      "name": "myRequiredBoolean",
      "type": "boolean"
    },
    {
      "name": "myRequiredBytes",
      "type": "bytes"
    },
    {
      "name": "myBytesDecimal",
      "type": {
        "type": "bytes",
        "logicalType": "decimal",
        "precision": 38,
        "scale": 9
      }
    },
    {
      "name": "myRequiredTimestamp",
      "type": {
        "type": "long",
        "logicalType": "timestamp-micros"
      }
    },
    {
      "name": "myOptionalTimestamp",
      "type": [
        "null",
        {
          "type": "long",
          "logicalType": "timestamp-micros"
        }
      ],
      "default": null
    },
    {
      "name": "myRequiredDate",
      "type": {
        "type": "int",
        "logicalType": "date"
      },
      "doc": "Expiration date field"
    },
    {
      "name": "myRequiredArrayLongs",
      "type": {
        "type": "array",
        "items": "long"
      }
    },
    {
      "name": "myRequiredSubRecord",
      "type": {
        "type": "record",
        "name": "myRequiredSubRecordType",
        "fields": [
          {
            "name": "myRequiredDouble",
            "type": "double"
          },
          {
            "name": "myRequiredBoolean",
            "type": "boolean"
          }
        ]
      }
    },
    {
      "name": "myOptionalSubRecord",
      "type": [
        "null",
        {
          "type": "record",
          "name": "myOptionalSubRecordType",
          "fields": [
            {
              "name": "myRequiredFloat",
              "type": "float"
            },
            {
              "name": "myRequiredBoolean",
              "type": "boolean"
            }
          ]
        }
      ],
      "default": null
    },
    {
      "name": "myNullableSubRecord",
      "type": [
        {
          "type": "record",
          "name": "myNullableSubRecordType",
          "fields": [
            {
              "name": "myRequiredLong",
              "type": "long"
            },
            {
              "name": "myRequiredBoolean",
              "type": "boolean"
            }
          ]
        },
        "null"
      ]
    },
    {
      "name": "myOptionalArraySubRecords",
      "type": [
        {
          "type": "array",
          "items": {
            "type": "record",
            "name": "myOptionalArraySubRecord",
            "fields": [
              {
                "name": "myRequiredBoolean",
                "type": "boolean"
              }
            ]
          }
        },
        "null"
      ]
    }
  ]
}

alias avro-tools-tojson='java -Dlog4j.configuration=file:$HOME/Downloads/log4j.properties -cp $HOME/.m2/repository/org/apache/avro/avro-tools/1.8.2/avro-tools-1.8.2.jar:$HOME/.m2/repository/org/xerial/snappy/snappy-java/1.1.8.4/snappy-java-1.1.8.4.jar org.apache.avro.tool.Main tojson'
 myRecord.snappy.avro
{
  "myRequiredInt": 123,
  "myRequiredString": "abc",
  "myOptionalString": null,
  "myNullableString": null,
  "myRequiredBoolean": true,
  "myRequiredBytes": "ABC123",
  "myBytesDecimal": "IPO",
  "myRequiredTimestamp": 1685292531792000,
  "myOptionalTimestamp": null,
  "myRequiredDate": 19505,
  "myRequiredArrayLongs": [
    1,
    2,
    3
  ],
  "myRequiredSubRecord": {
    "myRequiredDouble": 1.0,
    "myRequiredBoolean": false
  },
  "myOptionalSubRecord": {
    "myOptionalSubRecordType": {
      "myRequiredFloat": 2.0,
      "myRequiredBoolean": true
    }
  },
  "myNullableSubRecord": {
    "myNullableSubRecordType": {
      "myRequiredLong": 12,
      "myRequiredBoolean": false
    }
  },
  "myOptionalArraySubRecords": {
    "array": [
      {
        "myRequiredBoolean": true
      },
      {
        "myRequiredBoolean": false
      }
    ]
  }
}

Merge maps java

@Test
public void testMapMerge() {
    // given
    Map<String, Integer> mutableMap = new HashMap<>(Map.of("a", 1, "b", 11));
    Map<String, Integer> otherMap = Map.of("b", 22, "c", 111);

    // when
    otherMap.forEach((k, v) -> mutableMap.merge(k, v, (v1, v2) -> v1 + v2));

    // then
    Assert.assertEquals(mutableMap, Map.of("a", 1, "b", (11 + 22), "c", 111));

    // when
    mutableMap.merge("b", 44, (v1, v2) -> v1 + v2);
    mutableMap.merge("d", 1111, (v1, v2) -> v1 + v2);

    // then
    Assert.assertEquals(mutableMap, Map.of("a", 1, "b", (11 + 22 + 44), "c", 111, "d", 1111));
}

vimeo-dl

https://github.com/akiomik/vimeo-dl/releases/
https://github.com/akiomik/vimeo-dl/releases/download/v0.1.0/vimeo-dl_0.1.0_darwin_arm64.tar.gz
https://github.com/akiomik/vimeo-dl/releases/download/v0.1.0/vimeo-dl_0.1.0_linux_amd64.tar.gz

tar xzf vimeo-dl_0.1.0_*.tar.gz
nohup ./vimeo-dl -i "https://.../master.json?base64_init=1&query_string_ranges=1" > vimeo.log 2>&1&
ffmpeg -i orig-video.mp4 -i orig-audio.mp4 -c copy orig.mp4
ffmpeg -i "orig_1920_1080p.mp4" -vf scale="iw/2:ih/2" "scaled_960_540p.mp4" 

Managing maven dependency conflicts using Apache Spark and Google Dataproc example

Currently, we are using Google Dataproc 2.0 release https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-release-2.0
that comes with Apache Spark 3.1.3 and Java 8.

The goal is to have a local environment closest to that specific Dataproc cluster libraries installation:
– run the same Spark scala code locally and on the Dataproc
– open the jar dependencies with the java sources

Let’s try to run locally

package com.bawi

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.slf4j.LoggerFactory

object MyAvroGcsReadAndBQWriteApp {
  private val LOGGER = LoggerFactory.getLogger(MyAvroGcsReadAndBQWriteApp.getClass)

  def main(args: Array[String]): Unit = {
    LOGGER.info("GOOGLE_APPLICATION_CREDENTIALS={}", System.getenv("GOOGLE_APPLICATION_CREDENTIALS"))

    val conf = new SparkConf().setAppName("MyAvroGcsReadAndBQWriteApp").setMaster("local[*]")
    val spark = SparkSession.builder().config(conf).getOrCreate()
    val inputDF = spark.read.format("avro")
//      .load("src/test/resources/my-record.snappy.avro")
      .load("gs://my-bucket-spark/my-record.snappy.avro")

    import spark.implicits._
    val df = inputDF.map((p: Row) => {
      val name = p.getAs[String]("name")
      val body = p.getAs[Array[Byte]]("body")
      LOGGER.info("processing {}", (name, new String(body)))
      (name, body)
    }).toDF(inputDF.columns: _*)

    df.write.format("bigquery")
      .option("writeMethod", "indirect")
      .option("temporaryGcsBucket", "my-bucket-spark")
      .mode(SaveMode.Append)
      .save("my_dataset.my_table")
    spark.stop()
  }
}

so we need to create pom.xml defining Spark 3.1.3, JDK 1.8, Scala 2.12.14, gcs-connector hadoop3-2.2.11.
Read through the pom comments to resolve issues such as:
– Failed to find data source: avro (spark-avro)
– org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme “gs” (gcs-connector)
– org.apache.hadoop.fs.FileSystem: Provider com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem could not be instantiated
or java.lang.ClassNotFoundException: com.google.protobuf.ExtensionLite (shaded classifier)
– java.io.IOException: Error getting access token from metadata server … Caused by: java.net.SocketTimeoutException: connect timed out (export GOOGLE_APPLICATION_CREDENTIALS)

Lets login to the Dataproc cluster to determine main library versions:

me@bartek-dataproc-m:~$ find / -iname *scala*library*.jar -print 2>/dev/null
/usr/share/scala/lib/scala-library.jar
/usr/lib/spark/jars/scala-library-2.12.14.jar

me@bartek-dataproc-m:~$ find / -iname *spark*core*.jar -print 2>/dev/null
/usr/lib/spark/jars/spark-core_2.12-3.1.3.jar

me@bartek-dataproc-m:~$ find / -iname *spark*avro*.jar -print 2>/dev/null
/usr/lib/spark/external/spark-avro.jar
/usr/lib/spark/external/spark-avro_2.12-3.1.3.jar

me@bartek-dataproc-m:~$ find / -iname *connector*.jar -print 2>/dev/null
/usr/local/share/google/dataproc/lib/gcs-connector-hadoop3-2.2.11.jar
/usr/local/share/google/dataproc/lib/gcs-connector.jar

me@bartek-dataproc-m:~$ ls -la /usr/local/share/google/dataproc/lib/gcs-connector.jar
lrwxrwxrwx 1 root root 69 Mar  1 00:32 /usr/local/share/google/dataproc/lib/gcs-connector.jar -> /usr/local/share/google/dataproc/lib/gcs-connector-hadoop3-2.2.11.jar

me@bartek-dataproc-m:~$ find / -iname *grpc*.jar -print 2>/dev/null

me@bartek-dataproc-m:~$ find / -iname *protobuf*.jar -print 2>/dev/null
/usr/lib/spark/jars/protobuf-java-2.5.0.jar
/usr/lib/hadoop/lib/protobuf-java-2.5.0.jar
/usr/lib/hadoop-hdfs/lib/protobuf-java-2.5.0.jar

Note some of the jars in Dataproc may be uberjars with repackages/shaded some dependencies e.g.

me@bartek-dataproc-m:~$ ls -la /usr/local/share/google/dataproc/lib/gcs-connector-hadoop3-2.2.11.jar
-rw-r--r-- 1 root root 36497606 Mar  1 00:32 /usr/local/share/google/dataproc/lib/gcs-connector-hadoop3-2.2.11.jar
me@bartek-dataproc-m:~$ jar tf /usr/local/share/google/dataproc/lib/gcs-connector-hadoop3-2.2.11.jar | grep repackaged | wc -l
15256
me@bartek-dataproc-m:~$ jar tf /usr/local/share/google/dataproc/lib/gcs-connector-hadoop3-2.2.11.jar | grep ExtensionLite     
com/google/cloud/hadoop/repackaged/gcs/com/google/protobuf/ExtensionLite.class

and based on that define the pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.bawi</groupId>
    <artifactId>my-apache-spark-3-scala</artifactId>
    <version>0.1-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

        <!-- jar versions matching https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-release-2.0, image-version 2.0.58-debian10 -->
        <spark.version>3.1.3</spark.version>
        <java.version>1.8</java.version>
        <scala.major.version>2.12</scala.major.version>
        <scala.minor.version>14</scala.minor.version>
        <gcs-connector.version>hadoop3-2.2.11</gcs-connector.version> <!-- up to hadoop3-2.2.4 no dependency on com.google.protobuf.ExtensionLite -->

        <!-- use gcloud dataproc jobs submit spark: - -properties ^#^spark.jars.packages=org.apache.spark:spark-avro_2.12:3.1.3,com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.29.0#spark.dynamicAllocation.enabled=true ... -->
        <dataproc.provided.dependencies.scope>compile</dataproc.provided.dependencies.scope>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.major.version}.${scala.minor.version}</version>
            <scope>${dataproc.provided.dependencies.scope}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.major.version}</artifactId>
            <version>${spark.version}</version>
            <scope>${dataproc.provided.dependencies.scope}</scope>
        </dependency>

        <!-- org.apache.spark:spark-sql_2.12:jar:3.1.3 depends transitively on old com.google.protobuf:protobuf-java:jar:2.5.0 (does not depend on io-grpc) -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.major.version}</artifactId>
            <version>${spark.version}</version>
            <scope>${dataproc.provided.dependencies.scope}</scope>
        </dependency>

        <!-- read/write avro files, registers avro format -->
        <!-- otherwise: Exception in thread "main" org.apache.spark.sql.AnalysisException: Failed to find data source: avro. Avro is built-in but external data source module since Spark 2.4. Please deploy the application as per the deployment section of "Apache Avro Data Source Guide". -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-avro_${scala.major.version}</artifactId>
            <version>${spark.version}</version>
            <scope>${dataproc.provided.dependencies.scope}</scope>
        </dependency>

        <!-- com.google.cloud.spark.bigquery.SupportedCustomDataType imports org.apache.spark.ml.linalg.SQLDataTypes -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_${scala.major.version}</artifactId>
            <version>${spark.version}</version>
            <scope>${dataproc.provided.dependencies.scope}</scope>
        </dependency>
    </dependencies>

    <profiles>
        <profile>
            <!-- should be disabled when using local-with-approximate-dependencies-and-source-code -->
            <id>local-with-dataproc-matching-shaded-dependencies</id>
            <activation>
                <activeByDefault>true</activeByDefault>
            </activation>
            <properties>
                <dataproc.provided.dependencies.scope>compile</dataproc.provided.dependencies.scope>
            </properties>

            <!-- use gcloud dataproc jobs submit spark: - -properties ^#^spark.jars.packages=org.apache.spark:spark-avro_2.12:3.1.3,com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.29.0#spark.dynamicAllocation.enabled=true ... -->
            <dependencies>
                <!-- read/write GCS, com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem registers extends FileSystem, registers gs scheme -->
                <!-- otherwise: Exception in thread "main" org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "gs"
                    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3281) -->

                <!-- make sure to set environment variable GOOGLE_APPLICATION_CREDENTIALS=/path/to/application_default_credentials.json as explained in
                https://cloud.google.com/docs/authentication/application-default-credentials
                to avoid waiting with connection timeout:
                    Exception in thread "main" java.io.IOException: Error getting access token from metadata server at: http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default/token
                        at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.CredentialFactory.getCredentialFromMetadataServiceAccount(CredentialFactory.java:254)
                        at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getCredential(GoogleHadoopFileSystemBase.java:1344)
                        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3303)
                    Caused by: java.net.SocketTimeoutException: connect timed out
                        at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1012)
                        at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.CredentialFactory.getCredentialFromMetadataServiceAccount(CredentialFactory.java:251)
                -->
                <!-- use shaded classifier to download gcs-connector-hadoop3-2.2.11-shaded.jar
                uberjar with repackaged/shaded some dependencies (eg. com.google.protobuf.ExtensionLite -> com.google.cloud.hadoop.repackaged.gcs.com.google.protobuf.ExtensionLite)
                instead of gcs-connector-hadoop3-2.2.11.jar and gcs-connector-hadoop3-2.2.11-sources.jar
                otherwise:
                    FileSystem:3231 - Cannot load filesystem: java.util.ServiceConfigurationError: org.apache.hadoop.fs.FileSystem: Provider com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem could not be instantiated
                    FileSystem:3235 - java.lang.NoClassDefFoundError: com/google/protobuf/ExtensionLite
                    FileSystem:3235 - java.lang.ClassNotFoundException: com.google.protobuf.ExtensionLite
                -->
                <!-- to see conflicts comment shaded classifier and run: mvn dependency:tree -Dverbose

                org.apache.spark:spark-sql_2.12:jar:3.1.3 transitively brings old com.google.protobuf:protobuf-java:jar:2.5.0
                so that new com.google.protobuf:protobuf-java:jar:3.21.7 from com.google.cloud.bigdataoss:gcs-connector:jar:hadoop3-2.2.11
                and new com.google.protobuf:protobuf-java:jar:3.21.12 from com.google.cloud.spark:spark-3.1-bigquery:jar:0.29.0-preview are omitted

                com.google.cloud.bigdataoss:gcs-connector:jar:hadoop3-2.2.11 transitively brings io.grpc:grpc-..:jar:1.50.2
                com.google.cloud.spark:spark-3.1-bigquery:jar:0.29.0-preview depends on io.grpc: 1.52.1 and 1.53.0
                 -->
                <dependency>
                    <groupId>com.google.cloud.bigdataoss</groupId>
                    <artifactId>gcs-connector</artifactId>
                    <version>${gcs-connector.version}</version>
                    <scope>${dataproc.provided.dependencies.scope}</scope>
                    <classifier>shaded</classifier>
                </dependency>

                <!-- uberjar with repackaged dependencies to avoid conflicts -->
                <!-- use gcloud dataproc jobs submit spark: - -properties spark.jars.packages=org.apache.spark:spark-avro_2.12:3.1.3 -->
                <dependency>
                    <groupId>com.google.cloud.spark</groupId>
                    <artifactId>spark-bigquery-with-dependencies_2.12</artifactId>
                    <version>0.29.0</version>
                    <scope>${dataproc.provided.dependencies.scope}</scope>
                </dependency>
            </dependencies>
        </profile>

        <profile>
            <!-- should be disabled when using local-with-dataproc-matching-shaded-dependencies -->
            <id>local-with-approximate-dependencies-and-source-code</id>
            <properties>
                <dataproc.provided.dependencies.scope>compile</dataproc.provided.dependencies.scope>
                <!-- choosing latest grpc and protobuf-java among gcs-connector and spark-3.1-bigquery -->
                <grpc.version>1.53.0</grpc.version>
                <protobuf-java.version>3.21.12</protobuf-java.version>
            </properties>
            <dependencies>

                <!-- com.google.cloud.bigdataoss:gcs-connector:jar:hadoop3-2.2.11 (without shaded classifier)
                  depends transitively on com.google.protobuf:protobuf-java-(util):jar:3.21.9 and io.grpc:grpc-..:jar:1.50.2
                  BUT com.google.protobuf:protobuf-java:jar:3.21.9 is OMITTED due to conflict with 2.5.0 from spark_sql
                  -->
                <dependency>
                    <groupId>com.google.cloud.bigdataoss</groupId>
                    <artifactId>gcs-connector</artifactId>
                    <version>${gcs-connector.version}</version>
                    <scope>${dataproc.provided.dependencies.scope}</scope>
                </dependency>

                <!-- com.google.cloud.spark:spark-3.1-bigquery:jar:0.29.0-preview depends on io.grpc: 1.52.1 and 1.53.0
                    e.g. io.grpc:grpc-context:jar:1.52.1 and io.grpc:grpc-api:jar:1.53.0:compile
                    and com.google.protobuf:protobuf-java-(util):jar:3.21.12
                    BUT com.google.protobuf:protobuf-java:jar:3.21.12 is OMITTED due to conflict with 2.5.0 from spark_sql -->
                <dependency>
                    <groupId>com.google.cloud.spark</groupId>
                    <artifactId>spark-3.1-bigquery</artifactId>
                    <version>0.29.0-preview</version>
                    <scope>${dataproc.provided.dependencies.scope}</scope>
                </dependency>
            </dependencies>

            <!-- Could not resolve version conflict among for io.grpc:grpc-core:jar:[1.53.0,1.53.0] and io.grpc:grpc-core:jar:[1.50.2,1.50.2]
            due transitive grpc netty specifying explicit single version of dependency in square brackets for grpc-core and grpc-api:
            io.grpc:grpc-netty:jar:1.53.0 (from spark-3.1-bigquery) -> io.grpc:grpc-core:jar:[1.53.0,1.53.0]
            io.grpc:grpc-netty-shaded:jar:1.50.2 (from gcs-connector) -> io.grpc:grpc-core:jar:[1.50.2,1.50.2] -->
            <dependencyManagement>
                <dependencies>
                    <dependency>
                        <groupId>io.grpc</groupId>
                        <artifactId>grpc-core</artifactId>
                        <version>${grpc.version}</version>
                        <scope>${dataproc.provided.dependencies.scope}</scope>
                    </dependency>
                    <dependency>
                        <groupId>io.grpc</groupId>
                        <artifactId>grpc-api</artifactId>
                        <version>${grpc.version}</version>
                        <scope>${dataproc.provided.dependencies.scope}</scope>
                    </dependency>
                    <dependency>
                        <groupId>com.google.protobuf</groupId>
                        <artifactId>protobuf-java</artifactId>
                        <version>${protobuf-java.version}</version>
                        <scope>${dataproc.provided.dependencies.scope}</scope>
                    </dependency>
                </dependencies>
            </dependencyManagement>
        </profile>

        <profile>
            <id>dist</id>
            <properties>
                <dataproc.provided.dependencies.scope>provided</dataproc.provided.dependencies.scope>
            </properties>
            <build>
                <plugins>
                    <plugin>
                        <groupId>org.apache.maven.plugins</groupId>
                        <artifactId>maven-shade-plugin</artifactId>
                        <version>3.4.1</version>
                        <executions>
                            <execution>
                                <phase>package</phase>
                                <goals>
                                    <goal>shade</goal>
                                </goals>
                                <configuration>
                                    <createDependencyReducedPom>false</createDependencyReducedPom>
                                    <artifactSet>
                                        <excludes>
                                            <exclude>org.scala-lang:*</exclude>
                                        </excludes>
                                    </artifactSet>
                                </configuration>
                            </execution>
                        </executions>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </plugin>
                </plugins>
            </build>
        </profile>
    </profiles>

    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.11.0</version>
                    <configuration>
                        <source>${java.version}</source>
                        <target>${java.version}</target>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>

        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>4.8.1</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>build-helper-maven-plugin</artifactId>
                <version>3.3.0</version>
                <executions>
                    <execution>
                        <id>add-source</id>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>add-source</goal>
                        </goals>
                        <configuration>
                            <sources>
                                <source>src/main/scala</source>
                            </sources>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

To learn more about dependencies conflicts comment shaded classifier from gcs-connector and run

mvn dependency:tree -Dverbose
            [INFO] +- org.apache.spark:spark-sql_2.12:jar:3.1.3:compile
            [INFO] |  +- org.apache.orc:orc-core:jar:1.5.13:compile
            [INFO] |  |  +- com.google.protobuf:protobuf-java:jar:2.5.0:compile

            [INFO] \- com.google.cloud.bigdataoss:gcs-connector:jar:hadoop3-2.2.11:compile
            [INFO]    +- com.google.cloud.bigdataoss:util:jar:2.2.11:compile
            [INFO]    |  +- io.grpc:grpc-api:jar:1.50.2:compile
            [INFO]    |  +- io.grpc:grpc-alts:jar:1.50.2:compile
            [INFO]    |  |  +- io.grpc:grpc-grpclb:jar:1.50.2:compile
            [INFO]    |  |  |  +- (com.google.protobuf:protobuf-java:jar:3.21.7:runtime - omitted for conflict with 2.5.0)
            [INFO]    |  |  +- (com.google.protobuf:protobuf-java:jar:3.21.7:compile - omitted for conflict with 2.5.0)

            [INFO] \- com.google.cloud.bigdataoss:gcs-connector:jar:hadoop3-2.2.11:compile
            [INFO]    +- com.google.api-client:google-api-client-jackson2:jar:2.0.1:compile
            [INFO]    |  \- com.google.http-client:google-http-client:jar:1.42.3:compile
            [INFO]    |     +- io.opencensus:opencensus-api:jar:0.31.1:compile
            [INFO]    |     |  \- (io.grpc:grpc-context:jar:1.27.2:compile - omitted for conflict with 1.50.2)


            [INFO] \- com.google.cloud.bigdataoss:gcs-connector:jar:hadoop3-2.2.11:compile
            [INFO]    +- com.google.cloud.bigdataoss:gcsio:jar:2.2.11:compile
            [INFO]    |  +- io.grpc:grpc-context:jar:1.50.2:compile

You will note that org.apache.spark:spark-sql_2.12:jar:3.1.3 defines com.google.protobuf:protobuf-java:jar:2.5.0 (winner as shortest path) and com.google.cloud.bigdataoss:gcs-connector:jar:hadoop3-2.2.11 defines com.google.protobuf:protobuf-java:jar:3.21.7 which is ommitted due to the conflict with 2.5.0). As ExtensionLite is not available in com.google.protobuf:protobuf-java:jar:2.5.0 (but present in com.google.protobuf:protobuf-java:jar:3.21.7) you will get exception when initializing GoogleHadoopFileSystem that extends GoogleHadoopFileSystemBase that static imports GoogleHadoopFileSystemConfiguration that imports GoogleCloudStorageOptions that imports com.google.storage.v2.StorageProto that imports com.google.protobuf.GeneratedMessageV3 which in not present in old com.google.protobuf:protobuf-java:jar:2.5.0.

Same conflict is with io.grpc:grpc-context:jar:1.27.2 vs io.grpc:grpc-context:jar:1.50.2

GCP publish message vs publish request

When one publish request is a batch request containing 10 messages then we get:

    CountDownLatch countDownLatch = new CountDownLatch(4 * 120);
    ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);

    ScheduledFuture<?> handle = scheduler.scheduleAtFixedRate(() -> {
        try {
            publishWithBatchSettingsExample(projectId, topicId);
            countDownLatch.countDown();
        } catch (IOException | ExecutionException | InterruptedException e) {
            throw new RuntimeException(e);
        }
    }, 10, 250, TimeUnit.MILLISECONDS);

    countDownLatch.await();
    handle.cancel(false);
    private static void publishWithBatchSettingsExample(String projectId, String topicId)
            throws IOException, ExecutionException, InterruptedException {
        TopicName topicName = TopicName.of(projectId, topicId);
        Publisher publisher = null;
        List<ApiFuture<String>> messageIdFutures = new ArrayList<>();

        try {
            // Batch settings control how the publisher batches messages
            long requestBytesThreshold = 5000L; // default : 1000 bytes
            long messageCountBatchSize = 10L; // default : 100 message

            Duration publishDelayThreshold = Duration.ofMillis(500); // default : 1 ms

            // Publish request get triggered based on request size, messages count & time since last
            // publish, whichever condition is met first.
            BatchingSettings batchingSettings =
                    BatchingSettings.newBuilder()
                            .setElementCountThreshold(messageCountBatchSize)
                            .setRequestByteThreshold(requestBytesThreshold)
                            .setDelayThreshold(publishDelayThreshold)
                            .build();

            // Create a publisher instance with default settings bound to the topic
            publisher = Publisher.newBuilder(topicName).setBatchingSettings(batchingSettings).build();

            // schedule publishing one message at a time : messages get automatically batched
            for (int i = 0; i < 10; i++) {
                String message = "message " + i;
                ByteString data = ByteString.copyFromUtf8(message);
                PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

                // Once published, returns a server-assigned message id (unique within the topic)
                ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
                messageIdFutures.add(messageIdFuture);
            }
        } finally {
            // Wait on any pending publish requests.
            List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get();

            LOGGER.info("Published " + messageIds.size() + " messages with batch settings.");

            if (publisher != null) {
                // When finished with the publisher, shutdown to free up resources.
                publisher.shutdown();
                publisher.awaitTermination(1, TimeUnit.MINUTES);
            }
        }
    }

Apache Beam and Dataflow sharding

Pipeline writing numbers 1 – 10000 as strings to Pubsub:

writingPipeline.apply(Create.of(IntStream.rangeClosed(1, 10000).mapToObj(String::valueOf).collect(Collectors.toList())))
    .apply(ParDo.of(new CreatePubsubMessageFn()))  // new PubsubMessage(word.getBytes())
    .apply(PubsubIO.writeMessages().to(writeOptions.getTopic()));

Streaming pipeline reading from Pubsub, applying 5 sec fixed window, grouping read elements into 4 groups: a, b, c, d as following a: 1-9, b: 10-99, c: 100-999 and d: 1000-10000 and writing using FileOI into txt files with 4 shards:

readingPipeline.apply(PubsubIO.readMessagesWithAttributesAndMessageId().fromSubscription(readOptions.getSubscription()))
    .apply(Window.into(FixedWindows.of(Duration.standardSeconds(5))))
    .apply("Log Window", ParDo.of(new DoFn<PubsubMessage,  KV<String, String>>() {
        @ProcessElement
        public void process(@Element PubsubMessage e, OutputReceiver<KV<String, String>> receiver, @Timestamp Instant ts, BoundedWindow w, PaneInfo p) {
        int i = Integer.parseInt(new String(e.getPayload()));
        String key = i < 10 ? "a" : i < 100 ? "b" : i < 1000 ? "c" : "d";
        KV<String, String> kv = KV.of(key, String.valueOf(i));
        LOGGER.info("[{}][Window] {}:{},ts={},w={},p={}", ipAddressAndThread(), key, i, ts, windowToString(w), p);
        receiver.output(kv);
        }
    }))
    .apply(FileIO
        .<String, KV<String, String>>writeDynamic()
        .by(KV::getKey)
        .via(Contextful.fn(KV::getValue), TextIO.sink())
        .withDestinationCoder(StringUtf8Coder.of())
        .withNaming(subPath -> new MyFileNaming(subPath, ".txt"))
        .to(readOptions.getOutput())
        .withTempDirectory(readOptions.getTemp())
        .withNumShards(4)
    );

and

    static class MyFileNaming implements FileIO.Write.FileNaming {
        private final String subPath;
        private final String extension;

        public MyFileNaming(String subPath, String extension) {
            this.subPath = subPath;
            this.extension = extension;
        }

        @Override
        public String getFilename(BoundedWindow w, PaneInfo p, int numShards, int shardIndex, Compression compression) {
            String filename = String.format("%s-winMaxTs-%s-shard-%s-of-%s--%s-%s%s", subPath, w.maxTimestamp().toString().replace(":","_").replace(" ","_"), shardIndex, numShards, getLocalHostAddressSpaced(), currentThread().getName(), extension);
            return filename;
        }
    }

Generated 16 shards – 4 shards for each key

a-winMaxTs-2023-01-20T10_48_04.999Z-shard-0-of-4--10_128_0_112-Thread-17.txt
a-winMaxTs-2023-01-20T10_48_04.999Z-shard-1-of-4--10_128_0_112-Thread-16.txt
a-winMaxTs-2023-01-20T10_48_04.999Z-shard-2-of-4--10_128_0_112-Thread-16.txt
a-winMaxTs-2023-01-20T10_48_04.999Z-shard-3-of-4--10_128_0_112-Thread-16.txt
b-winMaxTs-2023-01-20T10_48_04.999Z-shard-0-of-4--10_128_0_112-Thread-17.txt
b-winMaxTs-2023-01-20T10_48_04.999Z-shard-1-of-4--10_128_0_112-Thread-16.txt
b-winMaxTs-2023-01-20T10_48_04.999Z-shard-2-of-4--10_128_0_112-Thread-16.txt
b-winMaxTs-2023-01-20T10_48_04.999Z-shard-3-of-4--10_128_0_112-Thread-17.txt
c-winMaxTs-2023-01-20T10_48_04.999Z-shard-0-of-4--10_128_0_112-Thread-16.txt
c-winMaxTs-2023-01-20T10_48_04.999Z-shard-1-of-4--10_128_0_112-Thread-17.txt
c-winMaxTs-2023-01-20T10_48_04.999Z-shard-2-of-4--10_128_0_112-Thread-16.txt
c-winMaxTs-2023-01-20T10_48_04.999Z-shard-3-of-4--10_128_0_112-Thread-17.txt
d-winMaxTs-2023-01-20T10_48_04.999Z-shard-0-of-4--10_128_0_112-Thread-16.txt
d-winMaxTs-2023-01-20T10_48_04.999Z-shard-1-of-4--10_128_0_112-Thread-18.txt
d-winMaxTs-2023-01-20T10_48_04.999Z-shard-2-of-4--10_128_0_112-Thread-17.txt
d-winMaxTs-2023-01-20T10_48_04.999Z-shard-3-of-4--10_128_0_112-Thread-16.txt

If the window was 2 secs instead of 5 secs, then we could have 4 shards for
first window [2023-01-20T16_29_28.000, 2023-01-20T16_29_30.000) and 4 shards for second window [2023-01-20T16_29_30.000, 2023-01-20T16_29_32.000), e.g.:

d-winMaxTs-2023-01-20T16_29_29.999Z-shard-0-of-4--10_128_0_29-Thread-17.txt
d-winMaxTs-2023-01-20T16_29_29.999Z-shard-1-of-4--10_128_0_29-Thread-17.txt
d-winMaxTs-2023-01-20T16_29_29.999Z-shard-2-of-4--10_128_0_29-Thread-17.txt
d-winMaxTs-2023-01-20T16_29_29.999Z-shard-3-of-4--10_128_0_29-Thread-16.txt
d-winMaxTs-2023-01-20T16_29_31.999Z-shard-0-of-4--10_128_0_29-Thread-18.txt
d-winMaxTs-2023-01-20T16_29_31.999Z-shard-1-of-4--10_128_0_29-Thread-18.txt
d-winMaxTs-2023-01-20T16_29_31.999Z-shard-2-of-4--10_128_0_29-Thread-18.txt
d-winMaxTs-2023-01-20T16_29_31.999Z-shard-3-of-4--10_128_0_28-Thread-16.txt

Coming back to 5 sec window (usually captures as elements),
when

.withNumShards(4)

is changed to:

.withNumShards(2)

2 shards are generated for each key for each window:
a-winMaxTs-2023-01-20T12_10_49.999Z-shard-0-of-2–10_128_0_16-Thread-19.txt
a-winMaxTs-2023-01-20T12_10_49.999Z-shard-1-of-2–10_128_0_15-Thread-18.txt
b-winMaxTs-2023-01-20T12_10_49.999Z-shard-0-of-2–10_128_0_15-Thread-16.txt
b-winMaxTs-2023-01-20T12_10_49.999Z-shard-1-of-2–10_128_0_16-Thread-19.txt
c-winMaxTs-2023-01-20T12_10_49.999Z-shard-0-of-2–10_128_0_15-Thread-18.txt
c-winMaxTs-2023-01-20T12_10_49.999Z-shard-1-of-2–10_128_0_16-Thread-16.txt
d-winMaxTs-2023-01-20T12_10_49.999Z-shard-0-of-2–10_128_0_16-Thread-19.txt
d-winMaxTs-2023-01-20T12_10_49.999Z-shard-1-of-2–10_128_0_16-Thread-16.txt

When

.withNumShards(4)

is changed to:

.withNumShards(0)

then the runner will determine number of shards per key (assuming Streaming Engine is used), in that case we get 20 shards (additional 4 shards for hottest key d):

a-winMaxTs-2023-01-20T10_27_34.999Z-shard-0-of-4--10_128_0_7-Thread-17.txt
a-winMaxTs-2023-01-20T10_27_34.999Z-shard-1-of-4--10_128_0_7-Thread-16.txt
a-winMaxTs-2023-01-20T10_27_34.999Z-shard-2-of-4--10_128_0_7-Thread-16.txt
a-winMaxTs-2023-01-20T10_27_34.999Z-shard-3-of-4--10_128_0_7-Thread-17.txt
b-winMaxTs-2023-01-20T10_27_34.999Z-shard-0-of-4--10_128_0_7-Thread-16.txt
b-winMaxTs-2023-01-20T10_27_34.999Z-shard-1-of-4--10_128_0_7-Thread-17.txt
b-winMaxTs-2023-01-20T10_27_34.999Z-shard-2-of-4--10_128_0_7-Thread-17.txt
b-winMaxTs-2023-01-20T10_27_34.999Z-shard-3-of-4--10_128_0_7-Thread-16.txt
c-winMaxTs-2023-01-20T10_27_34.999Z-shard-0-of-4--10_128_0_7-Thread-17.txt
c-winMaxTs-2023-01-20T10_27_34.999Z-shard-1-of-4--10_128_0_7-Thread-17.txt
c-winMaxTs-2023-01-20T10_27_34.999Z-shard-2-of-4--10_128_0_7-Thread-17.txt
c-winMaxTs-2023-01-20T10_27_34.999Z-shard-3-of-4--10_128_0_7-Thread-16.txt
d-winMaxTs-2023-01-20T10_27_34.999Z-shard-0-of-4--10_128_0_7-Thread-17.txt
d-winMaxTs-2023-01-20T10_27_34.999Z-shard-1-of-4--10_128_0_7-Thread-16.txt
d-winMaxTs-2023-01-20T10_27_34.999Z-shard-2-of-4--10_128_0_7-Thread-17.txt
d-winMaxTs-2023-01-20T10_27_34.999Z-shard-3-of-4--10_128_0_7-Thread-16.txt
d-winMaxTs-2023-01-20T10_27_39.999Z-shard-0-of-4--10_128_0_7-Thread-19.txt
d-winMaxTs-2023-01-20T10_27_39.999Z-shard-1-of-4--10_128_0_6-Thread-16.txt
d-winMaxTs-2023-01-20T10_27_39.999Z-shard-2-of-4--10_128_0_7-Thread-19.txt
d-winMaxTs-2023-01-20T10_27_39.999Z-shard-3-of-4--10_128_0_6-Thread-16.txt

My mac installation

System preferences:
trackpad -> first tab (point & click) -> secondary click -> choose: click in bottom right corner
trackpad -> second tab (scroll & zoom) -> scroll direction: natural -> uncheck
keyboard Use F1, F2 as standard function keys

Intellij -> File -> Manage IDE settings -> Import Settings -> choose mac os keyscheme (rename .zip.doc to zip)
IntelliJIdea2022.2-keymapsschemes-settings.zip

Optionally
Intellij -> File -> Manage IDE settings -> Restore default settings

# manually install chrome -> settings -> language -> Preferred languages: Websites will show content in your preferred languages, when possible -> put
your home country in the top

# change shell to bash from zsh

chsh -s /bin/bash

# restart terminal after
# [[ -f ~/.bashrc ]] && source ~/.bashrc
export BASH_SILENCE_DEPRECATION_WARNING=1 >> ~/.bashrc

# install brew in default arm64 arch

/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"
echo 'eval "$(/opt/homebrew/bin/brew shellenv)"' >> ~/.bashrc
eval "$(/opt/homebrew/bin/brew shellenv)"

# or optionally uninstall that if python/pip build errors (packages do not support arm64) by

/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"

go to applications/utilities -> right click terminal -> get info -> check ‘Open using Rosetta’ and restart computer then install again start terminal and run arch should return i386
then install brew using i386/x64 architecture by:

/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"

optionally if

Error: Failed to link all completions, docs and manpages:
  Permission denied @ rb_file_s_symlink - (../../../Homebrew/completions/zsh/_brew, /usr/local/share/zsh/site-functions/_brew)
Failed during: /usr/local/bin/brew update --force --quiet

execute:

sudo chown -R $(whoami) $(brew --prefix)/*

then

brew install git maven bash-completion curl coreutils
#ffmpeg gnutls terraform@0.12 youtube-dl python@3.9 docker docker-machine 
# docker-compose ca-certificates inetutils

#brew list –cask
# virtualbox

#brew info virtualbox –cask

# brew install homebrew disabled package

brew edit terraform@0.12

#Comment or remove the line:
#disable! date: “2022-07-31”, because: :unmaintained

brew install terraform@0.12
brew install pyenv pyenv-virtualenv
eval "$(pyenv init -)"
eval "$(pyenv virtualenv-init -)"

pyenv versions
* system (set by /Users/me/.pyenv/version)

pyenv install 3.8.14
#delete
#pyenv uninstall 3.8.14/envs/airflow-1.10.15-python-3.8.14
#pyenv uninstall 3.8.14

#create

pyenv virtualenv 3.8.14 airflow-1.10.15-python-3.8.14
pyenv activate airflow-1.10.15-python-3.8.14
AIRFLOW_VERSION=1.10.15
PYTHON_VERSION="$(python3 --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
echo $PYTHON_VERSION
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
echo $CONSTRAINT_URL

# python3 -m pip install --upgrade pip==20.2.4

# WARNING: You are using pip version 22.0.4; however, version 22.3 is available.

(airflow-1.10.15-python-3.8.14) me@MacBook:~$ pip3 --use-deprecated legacy-resolver install "apache-airflow==${AIRFLOW_VERSION}" apache-airflow-backport-providers-apache-beam==2021.3.13 apache-airflow-backport-providers-google==2021.3.3 pytest --constraint "${CONSTRAINT_URL}"

#start

PATH=$PATH:~/.local/bin
airflow db init
nohup airflow webserver $* >> ~/airflow/logs/webserver.logs &
nohup airflow scheduler >> ~/airflow/logs/scheduler.logs &
cd ~/airflow
mkdir dags

#kill

ps -ef | grep bin/airflow  | grep -v grep | awk  '{ print $2 }' | xargs kill