Dataflow jobs can be started from Dataflow classic templates using:
1) HTTP Post request:
curl -H "X-Goog-User-Project: $GCP_PROJECT" \ -H "Authorization: Bearer $(gcloud auth print-access-token)" \ -X POST \ -H "Content-Type:application/json" \ -d @job.json "https://dataflow.googleapis.com/v1b3/projects/my-bucket/locations/us-central1/templates?alt=json
where job.json is similar to:
{ "jobName": "mysimplejob-from-template3", "gcsPath": "gs://my-bucket/MySimpleJob/template/MySimpleJob-template", "environment": { "bypassTempDirValidation": false, "maxWorkers": 1, "numWorkers": 2, "workerRegion": "us-central1", "serviceAccountEmail": "my-service-account@my-project.iam.gserviceaccount.com", "machineType": "t2d-standard-2", "tempLocation": "gs://my-bucket/MySimpleJob/temp", "subnetwork": "https://www.googleapis.com/compute/v1/projects/my-project/regions/us-central1/subnetworks/my-subnetwork", "ipConfiguration": "WORKER_IP_PRIVATE", "enableStreamingEngine": false, "additionalExperiments": [ "shuffle_mode=appliance" ] }, "parameters": { "output": "gs://my-bucket/MySimpleJob/output", "seqEnd": "300", "startDate": "2024-03-03T00:00:00Z" } }
or gcloud, http post request and using GCP SDK.
The complete list of Runtime environment variables is available at https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment
Only these Runtime environment variables can be overridden with a new value regardless they are set or not in the template, e.g. numWorkers, diskSizeGb etc.
However, these RuntimeEnvironment variables does not necessary match method names defined by org.apache.beam.runners.dataflow.options.DataflowPipelineOptions
e.g.
RuntimeEnvironment: maxWorkers, machineType, ipConfiguration, etc
vs
DataflowPipelineOptions methods: getMaxNumWorkers, getWorkerMachineType, getUsePublicIps
at template creation:
"--numWorkers=1", "--maxNumWorkers=1", "--workerMachineType=t2d-standard-2", "--diskSizeGb=100", "--output=gs://" + BUCKET + "/" + JOB_NAME + "/template/output", "--templateLocation=gs://" + BUCKET + "/" + JOB_NAME + "/template/" + JOB_NAME + "-template", "--stagingLocation=gs://" + BUCKET + "/" + JOB_NAME + "/template/staging", "--seqEnd=100", "--startDate=2024-01-01T00:00:00Z", "--windowSecs=1"
at template execution:
{ "jobName": "mysimplejob-from-template2", "gcsPath": "gs://my-bucket/MySimpleJob/template/MySimpleJob-template", "environment": { "bypassTempDirValidation": false, "maxWorkers": 2, "numWorkers": 2, "workerRegion": "us-central1", "serviceAccountEmail": "my-service-account@my-project.iam.gserviceaccount.com", "machineType": "t2d-standard-4", "tempLocation": "gs://my-bucket/MySimpleJob/temp2", "subnetwork": "https://www.googleapis.com/compute/v1/projects/my-project/regions/us-central1/subnetworks/my-subnetwork", "ipConfiguration": "WORKER_IP_PRIVATE", "enableStreamingEngine": false, "diskSizeGb": 200, "additionalExperiments": [ "shuffle_mode=appliance" ] }, "parameters": { "output": "gs://my-bucket/MySimpleJob/output2", "seqEnd": "200", "startDate": "2024-02-02T00:00:00Z", "windowSecs": "2" } }
effective:
– 2 workers started visible in Dataflow Autoscaling tab (value HTTP Post json maxWorkers and numWorkers)
– diskSizeGB is effectively 200 visible in GCE VMs (from HTTP Post json)
– machine type is effectively “t2d-standard-4” visible in CGE VMs (from HTTP Post json “machineType”, even though Job info/Resource metrics/workerMachineType t2d-standard-2 which is misleading)
– output is effectively gs://my-bucket/MySimpleJob/template/output (as it is not a RuntimeEnvironment variable but it is a parameter, value from template: “–output=gs://” + BUCKET + “/” + JOB_NAME + “/template/output”)
– seqEnd is effective 100 visible as the last log ‘Processing: 100,2024-01-01T00:01:40.000Z’ as value provider parameter is already set in template, value from template
– startDate is effective 2024-01-01T00:00:00Z visible in the logs ‘Processing: 0,2024-01-01T00:00:00.000Z’, as value provider parameter is already set in template, value from template
– windowSecs is effective 1 as produced file name has 00..01 ‘gs://my-bucket/MySimpleJob/template/output/element/element-2024-01-01T00_00_00.000Z..2024-01-01T00_01_00.000Z-0-of-1.txt’, as parameter is already set in template, value from template (even though Job info/Resource metrics/windowSecs is 2 which is misleading)
To summarize, check worker logs for log starting with “Worker harness” or “pipeline_options:” to check e.g.
"display_data": [ { "key": "output", "namespace": "com.bawi.beam.dataflow.MySimpleJob$MyPipelineOptions", "type": "STRING", "value": "gs://my-project/MySimpleJob/template/output/element" }, { "key": "diskSizeGb", "namespace": "org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions", "type": "INTEGER", "value": 200 },
vs
"options": { "output": "gs://my-project/MySimpleJob/output2", "diskSizeGb": 200,
Since the output is not RuntimeEnviroment variable the value from template (in the display_data section) will be effectively used, not the value in the options.
But for the RuntimeEnviroment variable diskSizeGB the value from options will be used (not from the template).
2. Template creation settings remain unchanged but HTTP Post request does not have any overrides for num and max machines, machine type and parameters:
at template execution:
{ "jobName": "mysimplejob-from-template2", "gcsPath": "gs://my-bucket/MySimpleJob/template/MySimpleJob-template", "environment": { "bypassTempDirValidation": false, "workerRegion": "us-central1", "serviceAccountEmail": "my-service-account@my-project.iam.gserviceaccount.com", "tempLocation": "gs://my-bucket/MySimpleJob/temp2", "subnetwork": "https://www.googleapis.com/compute/v1/projects/my-project/regions/us-central1/subnetworks/my-subnetwork", "ipConfiguration": "WORKER_IP_PRIVATE", "enableStreamingEngine": false, "additionalExperiments": [ "shuffle_mode=appliance" ] }, "parameters": {} }
so the values are taken from the template:
effective:
– 1 worker started visible in Dataflow Autoscaling tab
– diskSizeGB is effectively 100 visible in GCE VMs
– machine type is effectively “t2d-standard-2” visible in CGE VMs
and parameters taken from template
– output is effectively gs://my-bucket/MySimpleJob/template/output
– seqEnd is effective 100
– startDate is effective 2024-01-01T00:00:00Z
– windowSecs is effective 1 as produced file name has 00..01
3. Template values a minimal and remaining ones are provider when starting from template:
"--templateLocation=gs://" + BUCKET + "/" + JOB_NAME + "/template/" + JOB_NAME + "-template", "--stagingLocation=gs://" + BUCKET + "/" + JOB_NAME + "/template/staging", "--windowSecs=1"
{ "jobName": "mysimplejob-from-template2", "gcsPath": "gs://my-bucket/MySimpleJob/template/MySimpleJob-template", "environment": { "bypassTempDirValidation": false, "maxWorkers": 2, "numWorkers": 2, "workerRegion": "us-central1", "serviceAccountEmail": "my-service-account@my-project.iam.gserviceaccount.com", "machineType": "t2d-standard-4", "tempLocation": "gs://my-bucket/MySimpleJob/temp2", "subnetwork": "https://www.googleapis.com/compute/v1/projects/my-project/regions/us-central1/subnetworks/my-subnetwork", "ipConfiguration": "WORKER_IP_PRIVATE", "enableStreamingEngine": false, "diskSizeGb": 200, "additionalExperiments": [ "shuffle_mode=appliance" ] }, "parameters": { "output": "gs://my-bucket/MySimpleJob/output2", "seqEnd": "200", "startDate": "2024-02-02T00:00:00Z", "windowSecs": "2" } }
effective:
– 2 workers started visible in Dataflow Autoscaling tab (from HTTP Post json maxWorkers and numWorkers)
– diskSizeGB is effectively 200 visible in GCE VMs (from HTTP Post json)
– machine type is effectively “t2d-standard-4” visible in CGE VMs (from HTTP Post json “machineType”)
– output is effectively gs://my-project/MySimpleJob/output2/ as value provider parameter was not set in template, value HTTP Post json
– seqEnd is effective 200 visible as the last log ‘Processing: 200,2024-02-02T00:03:20.000Z’ as value provider parameter was not set in template, value HTTP Post json
– startDate is effective 2024-02-02T00:00:00Z visible in the logs ‘Processing: 60,2024-02-02T00:01:00.000Z’, as value provider parameter is already set in template, value from template
– windowSecs is effective 1 as produced file name has 00..01 ‘gs://my-bucket/MySimpleJob/output2/element-2024-02-02T00_00_00.000Z..2024-02-02T00_01_00.000Z-0-of-1.txt’, as parameter is already set in template, value from template (even though Job info/Resource metrics/windowSecs is 2 which is misleading)
Worker harness shows:
"display_data": [ { "key": "maxNumWorkers", "namespace": "google.dataflow.v1beta3.TemplatesService", "type": "INTEGER", "value": 2 }, { "key": "machineType", "namespace": "google.dataflow.v1beta3.TemplatesService", "type": "STRING", "value": "t2d-standard-4" }, { "key": "diskSizeGb", "namespace": "google.dataflow.v1beta3.TemplatesService", "type": "INTEGER", "value": 200 }, { "key": "windowSecs", "namespace": "com.bawi.beam.dataflow.MySimpleJob$MyPipelineOptions", "type": "INTEGER", "value": 1 }, ], "options": { "maxNumWorkers": 2, "machineType": "t2d-standard-4", "diskSizeGb": 200, "seqEnd": "200", "windowSecs": "2",
4. In case neither template nor HTTP Post json does not define num worker, max workers, machine type, diskSizeGB, then Dataflow will use it defaults, which for batch is 1 worker with n1-standard-1 with 250 GB disk when Dataflow Shuffle is disabled.
package com.bawi.beam.dataflow; import com.bawi.beam.WindowUtils; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.io.Compression; import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.FileIO; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.fs.ResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.*; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.transforms.*; import org.apache.beam.sdk.transforms.windowing.*; import org.apache.beam.sdk.values.PCollection; import org.joda.time.Duration; import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.apache.beam.sdk.values.TypeDescriptors.integers; public class MySimpleJob { private static final Logger LOGGER = LoggerFactory.getLogger(MySimpleJob.class); @SuppressWarnings("unused") public interface MyPipelineOptions extends PipelineOptions { ValueProvider<Integer> getSeqEnd(); void setSeqEnd(ValueProvider<Integer> seqEnd); ValueProvider<String> getOutput(); void setOutput(ValueProvider<String> getOutput); ValueProvider<String> getTempDir(); void setTempDir(ValueProvider<String> getOutput); ValueProvider<String> getStartDate(); void setStartDate(ValueProvider<String> startDate); ValueProvider<Integer> getNumber(); void setNumber(ValueProvider<Integer> number); int getWindowSecs(); void setWindowSecs(int windowSecs); } public static void main(String[] args) { String JOB_NAME = MySimpleJob.class.getSimpleName(); String BUCKET = System.getenv("GCP_BUCKET"); // args = PipelineUtils.updateArgs(args, // "--output=target/" + JOB_NAME + "/output", // "--tempDir=target/" + JOB_NAME + "/temp", // used for TextIO.write().withWindowedWrites() args = PipelineUtils.updateArgsWithDataflowRunner(args, "--numWorkers=1", "--maxNumWorkers=1", "--workerMachineType=t2d-standard-2", "--usePublicIps=true", "--diskSizeGb=100", "--output=gs://" + BUCKET + "/" + JOB_NAME + "/template/output/element", "--templateLocation=gs://" + BUCKET + "/" + JOB_NAME + "/template/" + JOB_NAME + "-template", "--stagingLocation=gs://" + BUCKET + "/" + JOB_NAME + "/template/staging", "--seqEnd=100", "--startDate=2024-01-01T00:00:00Z", "--windowSecs=1" ); MyPipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(MyPipelineOptions.class); Pipeline pipeline = Pipeline.create(options); NestedValueProvider<List<Integer>, Integer> seqProvider = NestedValueProvider.of(options.getSeqEnd(), seqEnd -> IntStream.rangeClosed(0, seqEnd).boxed().collect(Collectors.toList())); PCollection<String> converted = pipeline.apply(Create.ofProvider(seqProvider, ListCoder.of(VarIntCoder.of()))) .apply(FlatMapElements.into(integers()).via(i -> i)) .apply(ParDo.of(new MyDoFn(options.getStartDate()))) .apply(WithTimestamps.of(line -> Instant.parse(line.split(",")[1]))) .apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSecs())))); /* if (options.getNumber().get() == 1) { // java.lang.IllegalStateException: Value only available at runtime, but accessed from a non-runtime context: RuntimeValueProvider{propertyName=number, default=null} converted.apply(TextIO.write().withSuffix(".txt").to(options.getOutput())); } */ // converted.apply(TextIO.write() // .withWindowedWrites() // .to(new MyFilenamePolicy(options.getOutput(), "element-", ".txt")) // .withTempDirectory(NestedValueProvider.of(options.getTempDir(), FileBasedSink::convertToFileResourceIfPossible))); converted.apply(FileIO.<String>write() .via(TextIO.sink()) .to(options.getOutput()) .withNaming(new MyFileNaming("element-", ".txt"))); PipelineResult result = pipeline.run(); if ("DirectPipelineResult".equals(result.getClass().getSimpleName())) { result.waitUntilFinish(); // usually waitUntilFinish while pipeline development, remove when generating dataflow classic template } } static class MyDoFn extends DoFn<Integer,String> { private final ValueProvider<String> startDate; public MyDoFn(ValueProvider<String> startDate) { this.startDate = startDate; } @ProcessElement public void process(@Element Integer i, OutputReceiver<String> receiver) throws InterruptedException { Instant instant = Instant.parse(startDate.get()).plus(Duration.standardSeconds(i)); LOGGER.info("Processing: {}", i + "," + instant); receiver.output(i + "," + instant); } } private static class MyFilenamePolicy extends FileBasedSink.FilenamePolicy { private final ValueProvider<String> output; private final String prefix; private final String suffix; public MyFilenamePolicy(ValueProvider<String> output, String prefix, String suffix) { this.output = output; this.prefix = prefix; this.suffix = suffix; } @Override public ResourceId windowedFilename(int shardNumber, int numShards, BoundedWindow window, PaneInfo paneInfo, FileBasedSink.OutputFileHints outputFileHints) { ResourceId resource = FileBasedSink.convertToFileResourceIfPossible(output.get()); String parentDirectoryPathWithSlash = resource.isDirectory() ? "" : resource.getFilename() + "/"; String filename = String.format("%s%s%s-%s-of-%s%s", parentDirectoryPathWithSlash, prefix, WindowUtils.windowToNormalizedString(window), shardNumber, numShards, suffix); ResourceId resourceId = resource.getCurrentDirectory().resolve(filename, ResolveOptions.StandardResolveOptions.RESOLVE_FILE); System.out.println(filename + "," + resourceId); // output/element-2024-01-01T00_00_00.000Z..2024-01-01T00_02_00.000Z-3-of-5.txt,/Users/me/dev/my-apache-beam-dataflow/target/MySimpleJob/output/element-2024-01-01T00_00_00.000Z..2024-01-01T00_02_00.000Z-3-of-5.txt // "target/MySimpleJob/output/element-2024-01-01T00_00_00.000Z..2024-01-01T00_02_00.000Z-0-of-4.txt return resourceId; } @Override public ResourceId unwindowedFilename(int shardNumber, int numShards, FileBasedSink.OutputFileHints outputFileHints) { throw new UnsupportedOperationException("Expecting windowed outputs only"); } } private static class MyFileNaming implements FileIO.Write.FileNaming { private final String prefix; private final String suffix; public MyFileNaming(String prefix, String suffix) { this.prefix = prefix; this.suffix = suffix; } @Override public String getFilename(BoundedWindow window, PaneInfo pane, int numShards, int shardIndex, Compression compression) { String fileName = String.format("%s%s-%s-of-%s%s", prefix, windowToNormalizedString(window), shardIndex, numShards, suffix); LOGGER.info("fileName={}", fileName); // fileName=element-2024-01-01T00_00_00.000Z..2024-01-01T00_02_00.000Z-4-of-5.txt // writes to: target/MySimpleJob/output/element-2024-01-01T00_00_00.000Z..2024-01-01T00_02_00.000Z-0-of-5.txt return fileName; } private static String windowToNormalizedString(BoundedWindow window) { return window instanceof GlobalWindow ? replace(window.maxTimestamp()) : replace(((IntervalWindow) window).start()) + ".." + replace(((IntervalWindow) window).end()); } private static String replace(Instant instant) { return instant.toString().replace(":", "_").replace(" ", "_"); } } }