Currently, we are using Google Dataproc 2.0 release https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-release-2.0
that comes with Apache Spark 3.1.3 and Java 8.
The goal is to have a local environment closest to that specific Dataproc cluster libraries installation:
– run the same Spark scala code locally and on the Dataproc
– open the jar dependencies with the java sources
Let’s try to run locally
package com.bawi import org.apache.spark.SparkConf import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.slf4j.LoggerFactory object MyAvroGcsReadAndBQWriteApp { private val LOGGER = LoggerFactory.getLogger(MyAvroGcsReadAndBQWriteApp.getClass) def main(args: Array[String]): Unit = { LOGGER.info("GOOGLE_APPLICATION_CREDENTIALS={}", System.getenv("GOOGLE_APPLICATION_CREDENTIALS")) val conf = new SparkConf().setAppName("MyAvroGcsReadAndBQWriteApp").setMaster("local[*]") val spark = SparkSession.builder().config(conf).getOrCreate() val inputDF = spark.read.format("avro") // .load("src/test/resources/my-record.snappy.avro") .load("gs://my-bucket-spark/my-record.snappy.avro") import spark.implicits._ val df = inputDF.map((p: Row) => { val name = p.getAs[String]("name") val body = p.getAs[Array[Byte]]("body") LOGGER.info("processing {}", (name, new String(body))) (name, body) }).toDF(inputDF.columns: _*) df.write.format("bigquery") .option("writeMethod", "indirect") .option("temporaryGcsBucket", "my-bucket-spark") .mode(SaveMode.Append) .save("my_dataset.my_table") spark.stop() } }
so we need to create pom.xml defining Spark 3.1.3, JDK 1.8, Scala 2.12.14, gcs-connector hadoop3-2.2.11.
Read through the pom comments to resolve issues such as:
– Failed to find data source: avro (spark-avro)
– org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme “gs” (gcs-connector)
– org.apache.hadoop.fs.FileSystem: Provider com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem could not be instantiated
or java.lang.ClassNotFoundException: com.google.protobuf.ExtensionLite (shaded classifier)
– java.io.IOException: Error getting access token from metadata server … Caused by: java.net.SocketTimeoutException: connect timed out (export GOOGLE_APPLICATION_CREDENTIALS)
Lets login to the Dataproc cluster to determine main library versions:
me@bartek-dataproc-m:~$ find / -iname *scala*library*.jar -print 2>/dev/null /usr/share/scala/lib/scala-library.jar /usr/lib/spark/jars/scala-library-2.12.14.jar me@bartek-dataproc-m:~$ find / -iname *spark*core*.jar -print 2>/dev/null /usr/lib/spark/jars/spark-core_2.12-3.1.3.jar me@bartek-dataproc-m:~$ find / -iname *spark*avro*.jar -print 2>/dev/null /usr/lib/spark/external/spark-avro.jar /usr/lib/spark/external/spark-avro_2.12-3.1.3.jar me@bartek-dataproc-m:~$ find / -iname *connector*.jar -print 2>/dev/null /usr/local/share/google/dataproc/lib/gcs-connector-hadoop3-2.2.11.jar /usr/local/share/google/dataproc/lib/gcs-connector.jar me@bartek-dataproc-m:~$ ls -la /usr/local/share/google/dataproc/lib/gcs-connector.jar lrwxrwxrwx 1 root root 69 Mar 1 00:32 /usr/local/share/google/dataproc/lib/gcs-connector.jar -> /usr/local/share/google/dataproc/lib/gcs-connector-hadoop3-2.2.11.jar me@bartek-dataproc-m:~$ find / -iname *grpc*.jar -print 2>/dev/null me@bartek-dataproc-m:~$ find / -iname *protobuf*.jar -print 2>/dev/null /usr/lib/spark/jars/protobuf-java-2.5.0.jar /usr/lib/hadoop/lib/protobuf-java-2.5.0.jar /usr/lib/hadoop-hdfs/lib/protobuf-java-2.5.0.jar
Note some of the jars in Dataproc may be uberjars with repackages/shaded some dependencies e.g.
me@bartek-dataproc-m:~$ ls -la /usr/local/share/google/dataproc/lib/gcs-connector-hadoop3-2.2.11.jar -rw-r--r-- 1 root root 36497606 Mar 1 00:32 /usr/local/share/google/dataproc/lib/gcs-connector-hadoop3-2.2.11.jar me@bartek-dataproc-m:~$ jar tf /usr/local/share/google/dataproc/lib/gcs-connector-hadoop3-2.2.11.jar | grep repackaged | wc -l 15256 me@bartek-dataproc-m:~$ jar tf /usr/local/share/google/dataproc/lib/gcs-connector-hadoop3-2.2.11.jar | grep ExtensionLite com/google/cloud/hadoop/repackaged/gcs/com/google/protobuf/ExtensionLite.class
and based on that define the pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.bawi</groupId> <artifactId>my-apache-spark-3-scala</artifactId> <version>0.1-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <!-- jar versions matching https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-release-2.0, image-version 2.0.58-debian10 --> <spark.version>3.1.3</spark.version> <java.version>1.8</java.version> <scala.major.version>2.12</scala.major.version> <scala.minor.version>14</scala.minor.version> <gcs-connector.version>hadoop3-2.2.11</gcs-connector.version> <!-- up to hadoop3-2.2.4 no dependency on com.google.protobuf.ExtensionLite --> <!-- use gcloud dataproc jobs submit spark: - -properties ^#^spark.jars.packages=org.apache.spark:spark-avro_2.12:3.1.3,com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.29.0#spark.dynamicAllocation.enabled=true ... --> <dataproc.provided.dependencies.scope>compile</dataproc.provided.dependencies.scope> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.major.version}.${scala.minor.version}</version> <scope>${dataproc.provided.dependencies.scope}</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.major.version}</artifactId> <version>${spark.version}</version> <scope>${dataproc.provided.dependencies.scope}</scope> </dependency> <!-- org.apache.spark:spark-sql_2.12:jar:3.1.3 depends transitively on old com.google.protobuf:protobuf-java:jar:2.5.0 (does not depend on io-grpc) --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.major.version}</artifactId> <version>${spark.version}</version> <scope>${dataproc.provided.dependencies.scope}</scope> </dependency> <!-- read/write avro files, registers avro format --> <!-- otherwise: Exception in thread "main" org.apache.spark.sql.AnalysisException: Failed to find data source: avro. Avro is built-in but external data source module since Spark 2.4. Please deploy the application as per the deployment section of "Apache Avro Data Source Guide". --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-avro_${scala.major.version}</artifactId> <version>${spark.version}</version> <scope>${dataproc.provided.dependencies.scope}</scope> </dependency> <!-- com.google.cloud.spark.bigquery.SupportedCustomDataType imports org.apache.spark.ml.linalg.SQLDataTypes --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_${scala.major.version}</artifactId> <version>${spark.version}</version> <scope>${dataproc.provided.dependencies.scope}</scope> </dependency> </dependencies> <profiles> <profile> <!-- should be disabled when using local-with-approximate-dependencies-and-source-code --> <id>local-with-dataproc-matching-shaded-dependencies</id> <activation> <activeByDefault>true</activeByDefault> </activation> <properties> <dataproc.provided.dependencies.scope>compile</dataproc.provided.dependencies.scope> </properties> <!-- use gcloud dataproc jobs submit spark: - -properties ^#^spark.jars.packages=org.apache.spark:spark-avro_2.12:3.1.3,com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.29.0#spark.dynamicAllocation.enabled=true ... --> <dependencies> <!-- read/write GCS, com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem registers extends FileSystem, registers gs scheme --> <!-- otherwise: Exception in thread "main" org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "gs" at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3281) --> <!-- make sure to set environment variable GOOGLE_APPLICATION_CREDENTIALS=/path/to/application_default_credentials.json as explained in https://cloud.google.com/docs/authentication/application-default-credentials to avoid waiting with connection timeout: Exception in thread "main" java.io.IOException: Error getting access token from metadata server at: http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default/token at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.CredentialFactory.getCredentialFromMetadataServiceAccount(CredentialFactory.java:254) at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getCredential(GoogleHadoopFileSystemBase.java:1344) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3303) Caused by: java.net.SocketTimeoutException: connect timed out at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1012) at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.CredentialFactory.getCredentialFromMetadataServiceAccount(CredentialFactory.java:251) --> <!-- use shaded classifier to download gcs-connector-hadoop3-2.2.11-shaded.jar uberjar with repackaged/shaded some dependencies (eg. com.google.protobuf.ExtensionLite -> com.google.cloud.hadoop.repackaged.gcs.com.google.protobuf.ExtensionLite) instead of gcs-connector-hadoop3-2.2.11.jar and gcs-connector-hadoop3-2.2.11-sources.jar otherwise: FileSystem:3231 - Cannot load filesystem: java.util.ServiceConfigurationError: org.apache.hadoop.fs.FileSystem: Provider com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem could not be instantiated FileSystem:3235 - java.lang.NoClassDefFoundError: com/google/protobuf/ExtensionLite FileSystem:3235 - java.lang.ClassNotFoundException: com.google.protobuf.ExtensionLite --> <!-- to see conflicts comment shaded classifier and run: mvn dependency:tree -Dverbose org.apache.spark:spark-sql_2.12:jar:3.1.3 transitively brings old com.google.protobuf:protobuf-java:jar:2.5.0 so that new com.google.protobuf:protobuf-java:jar:3.21.7 from com.google.cloud.bigdataoss:gcs-connector:jar:hadoop3-2.2.11 and new com.google.protobuf:protobuf-java:jar:3.21.12 from com.google.cloud.spark:spark-3.1-bigquery:jar:0.29.0-preview are omitted com.google.cloud.bigdataoss:gcs-connector:jar:hadoop3-2.2.11 transitively brings io.grpc:grpc-..:jar:1.50.2 com.google.cloud.spark:spark-3.1-bigquery:jar:0.29.0-preview depends on io.grpc: 1.52.1 and 1.53.0 --> <dependency> <groupId>com.google.cloud.bigdataoss</groupId> <artifactId>gcs-connector</artifactId> <version>${gcs-connector.version}</version> <scope>${dataproc.provided.dependencies.scope}</scope> <classifier>shaded</classifier> </dependency> <!-- uberjar with repackaged dependencies to avoid conflicts --> <!-- use gcloud dataproc jobs submit spark: - -properties spark.jars.packages=org.apache.spark:spark-avro_2.12:3.1.3 --> <dependency> <groupId>com.google.cloud.spark</groupId> <artifactId>spark-bigquery-with-dependencies_2.12</artifactId> <version>0.29.0</version> <scope>${dataproc.provided.dependencies.scope}</scope> </dependency> </dependencies> </profile> <profile> <!-- should be disabled when using local-with-dataproc-matching-shaded-dependencies --> <id>local-with-approximate-dependencies-and-source-code</id> <properties> <dataproc.provided.dependencies.scope>compile</dataproc.provided.dependencies.scope> <!-- choosing latest grpc and protobuf-java among gcs-connector and spark-3.1-bigquery --> <grpc.version>1.53.0</grpc.version> <protobuf-java.version>3.21.12</protobuf-java.version> </properties> <dependencies> <!-- com.google.cloud.bigdataoss:gcs-connector:jar:hadoop3-2.2.11 (without shaded classifier) depends transitively on com.google.protobuf:protobuf-java-(util):jar:3.21.9 and io.grpc:grpc-..:jar:1.50.2 BUT com.google.protobuf:protobuf-java:jar:3.21.9 is OMITTED due to conflict with 2.5.0 from spark_sql --> <dependency> <groupId>com.google.cloud.bigdataoss</groupId> <artifactId>gcs-connector</artifactId> <version>${gcs-connector.version}</version> <scope>${dataproc.provided.dependencies.scope}</scope> </dependency> <!-- com.google.cloud.spark:spark-3.1-bigquery:jar:0.29.0-preview depends on io.grpc: 1.52.1 and 1.53.0 e.g. io.grpc:grpc-context:jar:1.52.1 and io.grpc:grpc-api:jar:1.53.0:compile and com.google.protobuf:protobuf-java-(util):jar:3.21.12 BUT com.google.protobuf:protobuf-java:jar:3.21.12 is OMITTED due to conflict with 2.5.0 from spark_sql --> <dependency> <groupId>com.google.cloud.spark</groupId> <artifactId>spark-3.1-bigquery</artifactId> <version>0.29.0-preview</version> <scope>${dataproc.provided.dependencies.scope}</scope> </dependency> </dependencies> <!-- Could not resolve version conflict among for io.grpc:grpc-core:jar:[1.53.0,1.53.0] and io.grpc:grpc-core:jar:[1.50.2,1.50.2] due transitive grpc netty specifying explicit single version of dependency in square brackets for grpc-core and grpc-api: io.grpc:grpc-netty:jar:1.53.0 (from spark-3.1-bigquery) -> io.grpc:grpc-core:jar:[1.53.0,1.53.0] io.grpc:grpc-netty-shaded:jar:1.50.2 (from gcs-connector) -> io.grpc:grpc-core:jar:[1.50.2,1.50.2] --> <dependencyManagement> <dependencies> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-core</artifactId> <version>${grpc.version}</version> <scope>${dataproc.provided.dependencies.scope}</scope> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-api</artifactId> <version>${grpc.version}</version> <scope>${dataproc.provided.dependencies.scope}</scope> </dependency> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>${protobuf-java.version}</version> <scope>${dataproc.provided.dependencies.scope}</scope> </dependency> </dependencies> </dependencyManagement> </profile> <profile> <id>dist</id> <properties> <dataproc.provided.dependencies.scope>provided</dataproc.provided.dependencies.scope> </properties> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.4.1</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <createDependencyReducedPom>false</createDependencyReducedPom> <artifactSet> <excludes> <exclude>org.scala-lang:*</exclude> </excludes> </artifactSet> </configuration> </execution> </executions> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> </plugin> </plugins> </build> </profile> </profiles> <build> <pluginManagement> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.11.0</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> </plugins> </pluginManagement> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>4.8.1</version> <executions> <execution> <id>scala-compile-first</id> <phase>process-resources</phase> <goals> <goal>add-source</goal> <goal>compile</goal> </goals> </execution> <execution> <id>scala-test-compile</id> <phase>process-test-resources</phase> <goals> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>build-helper-maven-plugin</artifactId> <version>3.3.0</version> <executions> <execution> <id>add-source</id> <phase>generate-sources</phase> <goals> <goal>add-source</goal> </goals> <configuration> <sources> <source>src/main/scala</source> </sources> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
To learn more about dependencies conflicts comment shaded classifier from gcs-connector and run
mvn dependency:tree -Dverbose
[INFO] +- org.apache.spark:spark-sql_2.12:jar:3.1.3:compile [INFO] | +- org.apache.orc:orc-core:jar:1.5.13:compile [INFO] | | +- com.google.protobuf:protobuf-java:jar:2.5.0:compile [INFO] \- com.google.cloud.bigdataoss:gcs-connector:jar:hadoop3-2.2.11:compile [INFO] +- com.google.cloud.bigdataoss:util:jar:2.2.11:compile [INFO] | +- io.grpc:grpc-api:jar:1.50.2:compile [INFO] | +- io.grpc:grpc-alts:jar:1.50.2:compile [INFO] | | +- io.grpc:grpc-grpclb:jar:1.50.2:compile [INFO] | | | +- (com.google.protobuf:protobuf-java:jar:3.21.7:runtime - omitted for conflict with 2.5.0) [INFO] | | +- (com.google.protobuf:protobuf-java:jar:3.21.7:compile - omitted for conflict with 2.5.0) [INFO] \- com.google.cloud.bigdataoss:gcs-connector:jar:hadoop3-2.2.11:compile [INFO] +- com.google.api-client:google-api-client-jackson2:jar:2.0.1:compile [INFO] | \- com.google.http-client:google-http-client:jar:1.42.3:compile [INFO] | +- io.opencensus:opencensus-api:jar:0.31.1:compile [INFO] | | \- (io.grpc:grpc-context:jar:1.27.2:compile - omitted for conflict with 1.50.2) [INFO] \- com.google.cloud.bigdataoss:gcs-connector:jar:hadoop3-2.2.11:compile [INFO] +- com.google.cloud.bigdataoss:gcsio:jar:2.2.11:compile [INFO] | +- io.grpc:grpc-context:jar:1.50.2:compile
You will note that org.apache.spark:spark-sql_2.12:jar:3.1.3 defines com.google.protobuf:protobuf-java:jar:2.5.0 (winner as shortest path) and com.google.cloud.bigdataoss:gcs-connector:jar:hadoop3-2.2.11 defines com.google.protobuf:protobuf-java:jar:3.21.7 which is ommitted due to the conflict with 2.5.0). As ExtensionLite is not available in com.google.protobuf:protobuf-java:jar:2.5.0 (but present in com.google.protobuf:protobuf-java:jar:3.21.7) you will get exception when initializing GoogleHadoopFileSystem that extends GoogleHadoopFileSystemBase that static imports GoogleHadoopFileSystemConfiguration that imports GoogleCloudStorageOptions that imports com.google.storage.v2.StorageProto that imports com.google.protobuf.GeneratedMessageV3 which in not present in old com.google.protobuf:protobuf-java:jar:2.5.0.
Same conflict is with io.grpc:grpc-context:jar:1.27.2 vs io.grpc:grpc-context:jar:1.50.2