Tag: maven

Managing maven dependency conflicts using Apache Spark and Google Dataproc example

Currently, we are using Google Dataproc 2.0 release https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-release-2.0
that comes with Apache Spark 3.1.3 and Java 8.

The goal is to have a local environment closest to that specific Dataproc cluster libraries installation:
– run the same Spark scala code locally and on the Dataproc
– open the jar dependencies with the java sources

Let’s try to run locally

package com.bawi

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.slf4j.LoggerFactory

object MyAvroGcsReadAndBQWriteApp {
  private val LOGGER = LoggerFactory.getLogger(MyAvroGcsReadAndBQWriteApp.getClass)

  def main(args: Array[String]): Unit = {
    LOGGER.info("GOOGLE_APPLICATION_CREDENTIALS={}", System.getenv("GOOGLE_APPLICATION_CREDENTIALS"))

    val conf = new SparkConf().setAppName("MyAvroGcsReadAndBQWriteApp").setMaster("local[*]")
    val spark = SparkSession.builder().config(conf).getOrCreate()
    val inputDF = spark.read.format("avro")
//      .load("src/test/resources/my-record.snappy.avro")
      .load("gs://my-bucket-spark/my-record.snappy.avro")

    import spark.implicits._
    val df = inputDF.map((p: Row) => {
      val name = p.getAs[String]("name")
      val body = p.getAs[Array[Byte]]("body")
      LOGGER.info("processing {}", (name, new String(body)))
      (name, body)
    }).toDF(inputDF.columns: _*)

    df.write.format("bigquery")
      .option("writeMethod", "indirect")
      .option("temporaryGcsBucket", "my-bucket-spark")
      .mode(SaveMode.Append)
      .save("my_dataset.my_table")
    spark.stop()
  }
}

so we need to create pom.xml defining Spark 3.1.3, JDK 1.8, Scala 2.12.14, gcs-connector hadoop3-2.2.11.
Read through the pom comments to resolve issues such as:
– Failed to find data source: avro (spark-avro)
– org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme “gs” (gcs-connector)
– org.apache.hadoop.fs.FileSystem: Provider com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem could not be instantiated
or java.lang.ClassNotFoundException: com.google.protobuf.ExtensionLite (shaded classifier)
– java.io.IOException: Error getting access token from metadata server … Caused by: java.net.SocketTimeoutException: connect timed out (export GOOGLE_APPLICATION_CREDENTIALS)

Lets login to the Dataproc cluster to determine main library versions:

me@bartek-dataproc-m:~$ find / -iname *scala*library*.jar -print 2>/dev/null
/usr/share/scala/lib/scala-library.jar
/usr/lib/spark/jars/scala-library-2.12.14.jar

me@bartek-dataproc-m:~$ find / -iname *spark*core*.jar -print 2>/dev/null
/usr/lib/spark/jars/spark-core_2.12-3.1.3.jar

me@bartek-dataproc-m:~$ find / -iname *spark*avro*.jar -print 2>/dev/null
/usr/lib/spark/external/spark-avro.jar
/usr/lib/spark/external/spark-avro_2.12-3.1.3.jar

me@bartek-dataproc-m:~$ find / -iname *connector*.jar -print 2>/dev/null
/usr/local/share/google/dataproc/lib/gcs-connector-hadoop3-2.2.11.jar
/usr/local/share/google/dataproc/lib/gcs-connector.jar

me@bartek-dataproc-m:~$ ls -la /usr/local/share/google/dataproc/lib/gcs-connector.jar
lrwxrwxrwx 1 root root 69 Mar  1 00:32 /usr/local/share/google/dataproc/lib/gcs-connector.jar -> /usr/local/share/google/dataproc/lib/gcs-connector-hadoop3-2.2.11.jar

me@bartek-dataproc-m:~$ find / -iname *grpc*.jar -print 2>/dev/null

me@bartek-dataproc-m:~$ find / -iname *protobuf*.jar -print 2>/dev/null
/usr/lib/spark/jars/protobuf-java-2.5.0.jar
/usr/lib/hadoop/lib/protobuf-java-2.5.0.jar
/usr/lib/hadoop-hdfs/lib/protobuf-java-2.5.0.jar

Note some of the jars in Dataproc may be uberjars with repackages/shaded some dependencies e.g.

me@bartek-dataproc-m:~$ ls -la /usr/local/share/google/dataproc/lib/gcs-connector-hadoop3-2.2.11.jar
-rw-r--r-- 1 root root 36497606 Mar  1 00:32 /usr/local/share/google/dataproc/lib/gcs-connector-hadoop3-2.2.11.jar
me@bartek-dataproc-m:~$ jar tf /usr/local/share/google/dataproc/lib/gcs-connector-hadoop3-2.2.11.jar | grep repackaged | wc -l
15256
me@bartek-dataproc-m:~$ jar tf /usr/local/share/google/dataproc/lib/gcs-connector-hadoop3-2.2.11.jar | grep ExtensionLite     
com/google/cloud/hadoop/repackaged/gcs/com/google/protobuf/ExtensionLite.class

and based on that define the pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.bawi</groupId>
    <artifactId>my-apache-spark-3-scala</artifactId>
    <version>0.1-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

        <!-- jar versions matching https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-release-2.0, image-version 2.0.58-debian10 -->
        <spark.version>3.1.3</spark.version>
        <java.version>1.8</java.version>
        <scala.major.version>2.12</scala.major.version>
        <scala.minor.version>14</scala.minor.version>
        <gcs-connector.version>hadoop3-2.2.11</gcs-connector.version> <!-- up to hadoop3-2.2.4 no dependency on com.google.protobuf.ExtensionLite -->

        <!-- use gcloud dataproc jobs submit spark: - -properties ^#^spark.jars.packages=org.apache.spark:spark-avro_2.12:3.1.3,com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.29.0#spark.dynamicAllocation.enabled=true ... -->
        <dataproc.provided.dependencies.scope>compile</dataproc.provided.dependencies.scope>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.major.version}.${scala.minor.version}</version>
            <scope>${dataproc.provided.dependencies.scope}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.major.version}</artifactId>
            <version>${spark.version}</version>
            <scope>${dataproc.provided.dependencies.scope}</scope>
        </dependency>

        <!-- org.apache.spark:spark-sql_2.12:jar:3.1.3 depends transitively on old com.google.protobuf:protobuf-java:jar:2.5.0 (does not depend on io-grpc) -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.major.version}</artifactId>
            <version>${spark.version}</version>
            <scope>${dataproc.provided.dependencies.scope}</scope>
        </dependency>

        <!-- read/write avro files, registers avro format -->
        <!-- otherwise: Exception in thread "main" org.apache.spark.sql.AnalysisException: Failed to find data source: avro. Avro is built-in but external data source module since Spark 2.4. Please deploy the application as per the deployment section of "Apache Avro Data Source Guide". -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-avro_${scala.major.version}</artifactId>
            <version>${spark.version}</version>
            <scope>${dataproc.provided.dependencies.scope}</scope>
        </dependency>

        <!-- com.google.cloud.spark.bigquery.SupportedCustomDataType imports org.apache.spark.ml.linalg.SQLDataTypes -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_${scala.major.version}</artifactId>
            <version>${spark.version}</version>
            <scope>${dataproc.provided.dependencies.scope}</scope>
        </dependency>
    </dependencies>

    <profiles>
        <profile>
            <!-- should be disabled when using local-with-approximate-dependencies-and-source-code -->
            <id>local-with-dataproc-matching-shaded-dependencies</id>
            <activation>
                <activeByDefault>true</activeByDefault>
            </activation>
            <properties>
                <dataproc.provided.dependencies.scope>compile</dataproc.provided.dependencies.scope>
            </properties>

            <!-- use gcloud dataproc jobs submit spark: - -properties ^#^spark.jars.packages=org.apache.spark:spark-avro_2.12:3.1.3,com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.29.0#spark.dynamicAllocation.enabled=true ... -->
            <dependencies>
                <!-- read/write GCS, com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem registers extends FileSystem, registers gs scheme -->
                <!-- otherwise: Exception in thread "main" org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "gs"
                    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3281) -->

                <!-- make sure to set environment variable GOOGLE_APPLICATION_CREDENTIALS=/path/to/application_default_credentials.json as explained in
                https://cloud.google.com/docs/authentication/application-default-credentials
                to avoid waiting with connection timeout:
                    Exception in thread "main" java.io.IOException: Error getting access token from metadata server at: http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default/token
                        at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.CredentialFactory.getCredentialFromMetadataServiceAccount(CredentialFactory.java:254)
                        at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getCredential(GoogleHadoopFileSystemBase.java:1344)
                        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3303)
                    Caused by: java.net.SocketTimeoutException: connect timed out
                        at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1012)
                        at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.CredentialFactory.getCredentialFromMetadataServiceAccount(CredentialFactory.java:251)
                -->
                <!-- use shaded classifier to download gcs-connector-hadoop3-2.2.11-shaded.jar
                uberjar with repackaged/shaded some dependencies (eg. com.google.protobuf.ExtensionLite -> com.google.cloud.hadoop.repackaged.gcs.com.google.protobuf.ExtensionLite)
                instead of gcs-connector-hadoop3-2.2.11.jar and gcs-connector-hadoop3-2.2.11-sources.jar
                otherwise:
                    FileSystem:3231 - Cannot load filesystem: java.util.ServiceConfigurationError: org.apache.hadoop.fs.FileSystem: Provider com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem could not be instantiated
                    FileSystem:3235 - java.lang.NoClassDefFoundError: com/google/protobuf/ExtensionLite
                    FileSystem:3235 - java.lang.ClassNotFoundException: com.google.protobuf.ExtensionLite
                -->
                <!-- to see conflicts comment shaded classifier and run: mvn dependency:tree -Dverbose

                org.apache.spark:spark-sql_2.12:jar:3.1.3 transitively brings old com.google.protobuf:protobuf-java:jar:2.5.0
                so that new com.google.protobuf:protobuf-java:jar:3.21.7 from com.google.cloud.bigdataoss:gcs-connector:jar:hadoop3-2.2.11
                and new com.google.protobuf:protobuf-java:jar:3.21.12 from com.google.cloud.spark:spark-3.1-bigquery:jar:0.29.0-preview are omitted

                com.google.cloud.bigdataoss:gcs-connector:jar:hadoop3-2.2.11 transitively brings io.grpc:grpc-..:jar:1.50.2
                com.google.cloud.spark:spark-3.1-bigquery:jar:0.29.0-preview depends on io.grpc: 1.52.1 and 1.53.0
                 -->
                <dependency>
                    <groupId>com.google.cloud.bigdataoss</groupId>
                    <artifactId>gcs-connector</artifactId>
                    <version>${gcs-connector.version}</version>
                    <scope>${dataproc.provided.dependencies.scope}</scope>
                    <classifier>shaded</classifier>
                </dependency>

                <!-- uberjar with repackaged dependencies to avoid conflicts -->
                <!-- use gcloud dataproc jobs submit spark: - -properties spark.jars.packages=org.apache.spark:spark-avro_2.12:3.1.3 -->
                <dependency>
                    <groupId>com.google.cloud.spark</groupId>
                    <artifactId>spark-bigquery-with-dependencies_2.12</artifactId>
                    <version>0.29.0</version>
                    <scope>${dataproc.provided.dependencies.scope}</scope>
                </dependency>
            </dependencies>
        </profile>

        <profile>
            <!-- should be disabled when using local-with-dataproc-matching-shaded-dependencies -->
            <id>local-with-approximate-dependencies-and-source-code</id>
            <properties>
                <dataproc.provided.dependencies.scope>compile</dataproc.provided.dependencies.scope>
                <!-- choosing latest grpc and protobuf-java among gcs-connector and spark-3.1-bigquery -->
                <grpc.version>1.53.0</grpc.version>
                <protobuf-java.version>3.21.12</protobuf-java.version>
            </properties>
            <dependencies>

                <!-- com.google.cloud.bigdataoss:gcs-connector:jar:hadoop3-2.2.11 (without shaded classifier)
                  depends transitively on com.google.protobuf:protobuf-java-(util):jar:3.21.9 and io.grpc:grpc-..:jar:1.50.2
                  BUT com.google.protobuf:protobuf-java:jar:3.21.9 is OMITTED due to conflict with 2.5.0 from spark_sql
                  -->
                <dependency>
                    <groupId>com.google.cloud.bigdataoss</groupId>
                    <artifactId>gcs-connector</artifactId>
                    <version>${gcs-connector.version}</version>
                    <scope>${dataproc.provided.dependencies.scope}</scope>
                </dependency>

                <!-- com.google.cloud.spark:spark-3.1-bigquery:jar:0.29.0-preview depends on io.grpc: 1.52.1 and 1.53.0
                    e.g. io.grpc:grpc-context:jar:1.52.1 and io.grpc:grpc-api:jar:1.53.0:compile
                    and com.google.protobuf:protobuf-java-(util):jar:3.21.12
                    BUT com.google.protobuf:protobuf-java:jar:3.21.12 is OMITTED due to conflict with 2.5.0 from spark_sql -->
                <dependency>
                    <groupId>com.google.cloud.spark</groupId>
                    <artifactId>spark-3.1-bigquery</artifactId>
                    <version>0.29.0-preview</version>
                    <scope>${dataproc.provided.dependencies.scope}</scope>
                </dependency>
            </dependencies>

            <!-- Could not resolve version conflict among for io.grpc:grpc-core:jar:[1.53.0,1.53.0] and io.grpc:grpc-core:jar:[1.50.2,1.50.2]
            due transitive grpc netty specifying explicit single version of dependency in square brackets for grpc-core and grpc-api:
            io.grpc:grpc-netty:jar:1.53.0 (from spark-3.1-bigquery) -> io.grpc:grpc-core:jar:[1.53.0,1.53.0]
            io.grpc:grpc-netty-shaded:jar:1.50.2 (from gcs-connector) -> io.grpc:grpc-core:jar:[1.50.2,1.50.2] -->
            <dependencyManagement>
                <dependencies>
                    <dependency>
                        <groupId>io.grpc</groupId>
                        <artifactId>grpc-core</artifactId>
                        <version>${grpc.version}</version>
                        <scope>${dataproc.provided.dependencies.scope}</scope>
                    </dependency>
                    <dependency>
                        <groupId>io.grpc</groupId>
                        <artifactId>grpc-api</artifactId>
                        <version>${grpc.version}</version>
                        <scope>${dataproc.provided.dependencies.scope}</scope>
                    </dependency>
                    <dependency>
                        <groupId>com.google.protobuf</groupId>
                        <artifactId>protobuf-java</artifactId>
                        <version>${protobuf-java.version}</version>
                        <scope>${dataproc.provided.dependencies.scope}</scope>
                    </dependency>
                </dependencies>
            </dependencyManagement>
        </profile>

        <profile>
            <id>dist</id>
            <properties>
                <dataproc.provided.dependencies.scope>provided</dataproc.provided.dependencies.scope>
            </properties>
            <build>
                <plugins>
                    <plugin>
                        <groupId>org.apache.maven.plugins</groupId>
                        <artifactId>maven-shade-plugin</artifactId>
                        <version>3.4.1</version>
                        <executions>
                            <execution>
                                <phase>package</phase>
                                <goals>
                                    <goal>shade</goal>
                                </goals>
                                <configuration>
                                    <createDependencyReducedPom>false</createDependencyReducedPom>
                                    <artifactSet>
                                        <excludes>
                                            <exclude>org.scala-lang:*</exclude>
                                        </excludes>
                                    </artifactSet>
                                </configuration>
                            </execution>
                        </executions>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </plugin>
                </plugins>
            </build>
        </profile>
    </profiles>

    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.11.0</version>
                    <configuration>
                        <source>${java.version}</source>
                        <target>${java.version}</target>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>

        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>4.8.1</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>build-helper-maven-plugin</artifactId>
                <version>3.3.0</version>
                <executions>
                    <execution>
                        <id>add-source</id>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>add-source</goal>
                        </goals>
                        <configuration>
                            <sources>
                                <source>src/main/scala</source>
                            </sources>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

To learn more about dependencies conflicts comment shaded classifier from gcs-connector and run

mvn dependency:tree -Dverbose
            [INFO] +- org.apache.spark:spark-sql_2.12:jar:3.1.3:compile
            [INFO] |  +- org.apache.orc:orc-core:jar:1.5.13:compile
            [INFO] |  |  +- com.google.protobuf:protobuf-java:jar:2.5.0:compile

            [INFO] \- com.google.cloud.bigdataoss:gcs-connector:jar:hadoop3-2.2.11:compile
            [INFO]    +- com.google.cloud.bigdataoss:util:jar:2.2.11:compile
            [INFO]    |  +- io.grpc:grpc-api:jar:1.50.2:compile
            [INFO]    |  +- io.grpc:grpc-alts:jar:1.50.2:compile
            [INFO]    |  |  +- io.grpc:grpc-grpclb:jar:1.50.2:compile
            [INFO]    |  |  |  +- (com.google.protobuf:protobuf-java:jar:3.21.7:runtime - omitted for conflict with 2.5.0)
            [INFO]    |  |  +- (com.google.protobuf:protobuf-java:jar:3.21.7:compile - omitted for conflict with 2.5.0)

            [INFO] \- com.google.cloud.bigdataoss:gcs-connector:jar:hadoop3-2.2.11:compile
            [INFO]    +- com.google.api-client:google-api-client-jackson2:jar:2.0.1:compile
            [INFO]    |  \- com.google.http-client:google-http-client:jar:1.42.3:compile
            [INFO]    |     +- io.opencensus:opencensus-api:jar:0.31.1:compile
            [INFO]    |     |  \- (io.grpc:grpc-context:jar:1.27.2:compile - omitted for conflict with 1.50.2)


            [INFO] \- com.google.cloud.bigdataoss:gcs-connector:jar:hadoop3-2.2.11:compile
            [INFO]    +- com.google.cloud.bigdataoss:gcsio:jar:2.2.11:compile
            [INFO]    |  +- io.grpc:grpc-context:jar:1.50.2:compile

You will note that org.apache.spark:spark-sql_2.12:jar:3.1.3 defines com.google.protobuf:protobuf-java:jar:2.5.0 (winner as shortest path) and com.google.cloud.bigdataoss:gcs-connector:jar:hadoop3-2.2.11 defines com.google.protobuf:protobuf-java:jar:3.21.7 which is ommitted due to the conflict with 2.5.0). As ExtensionLite is not available in com.google.protobuf:protobuf-java:jar:2.5.0 (but present in com.google.protobuf:protobuf-java:jar:3.21.7) you will get exception when initializing GoogleHadoopFileSystem that extends GoogleHadoopFileSystemBase that static imports GoogleHadoopFileSystemConfiguration that imports GoogleCloudStorageOptions that imports com.google.storage.v2.StorageProto that imports com.google.protobuf.GeneratedMessageV3 which in not present in old com.google.protobuf:protobuf-java:jar:2.5.0.

Same conflict is with io.grpc:grpc-context:jar:1.27.2 vs io.grpc:grpc-context:jar:1.50.2