Tag: avro

Building avro schema programmatically with SchemaBuilder

1. maven pom.xml dependency:

    <dependencies>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.8.1</version>
        </dependency>
    </dependencies>

2. Java avro SchemaBuilder:

import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;

public class MySchemaBuilder {
  public static void main(String[] args) throws IOException {
    Schema schema = SchemaBuilder.record("myRecordName").fields()
      .requiredInt("myRequiredInt")
      //.name("myRequiredInt2").type().intType().noDefault()

      .optionalDouble("myOptionalDouble")
      //.name("myOptionalDouble2").type().optional().doubleType()

      .nullableString("myNullableString", "myNullableStringDefaultValue")
      //.name("myNullableString2").type().nullable().stringType().stringDefault("myNullableStringDefaultValue2")

      .name("myRequiredArrayLongs").type().array().items().longType().noDefault()

      .name("myRequiredSubRecord")
        .type(
          SchemaBuilder.record("myRequiredSubRecordType").fields().requiredFloat("myRequiredFloat").endRecord()
        ).noDefault()

      .name("myOptionalArraySubRecords").type().nullable().array()
        .items(
          SchemaBuilder.record("myOptionalArraySubRecordType").fields().requiredBoolean("myRequiredBoolean").endRecord()
        ).noDefault()
    .endRecord();

    System.out.println(schema);

    File file = new File("my-file.avro");
    GenericData.Record record = new GenericData.Record(schema);

    record.put("myRequiredInt", 1);

    record.put("myOptionalDouble", 2.2d); // this line can be commented since optional long

    record.put("myNullableString", "abc"); // this line can be commented since optional string

    record.put("myRequiredArrayLongs", Arrays.asList(1L, 2L, 3L));  // required otherwise java.lang.NullPointerException: null of array in field myRequiredArrayLongs of myRecordName

    GenericData.Record myRequiredSubRecord = new GenericData.Record(schema.getField("myRequiredSubRecord").schema());
    myRequiredSubRecord.put("myRequiredFloat", 1.0f); // required otherwise null of int in field myRequiredFloat of myRequiredSubRecordType in field myRequiredSubRecord of myRecordName
    record.put("myRequiredSubRecord", myRequiredSubRecord); // required otherwise java.lang.NullPointerException: null of myRequiredSubRecordType in field myRequiredSubRecord of myRecordName

    GenericData.Record myOptionalArraySubRecord = new GenericData.Record(schema.getField("myOptionalArraySubRecords").schema().getTypes().get(0).getElementType());
    myOptionalArraySubRecord.put("myRequiredBoolean", true);
    record.put("myOptionalArraySubRecords", Arrays.asList(myOptionalArraySubRecord, myOptionalArraySubRecord));

    GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<>(schema);
    DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(genericDatumWriter);
    dataFileWriter.create(schema, file);
    dataFileWriter.append(record);
    dataFileWriter.close();
  }
}

3. Ouput schema

{
  "type": "record",
  "name": "myRecordName",
  "fields": [
    {
      "name": "myRequiredInt",
      "type": "int"
    },
    {
      "name": "myOptionalDouble",
      "type": [
        "null",
        "double"
      ],
      "default": null
    },
    {
      "name": "myNullableString",
      "type": [
        "string",
        "null"
      ],
      "default": "myNullableStringDefaultValue"
    },
    {
      "name": "myRequiredArrayLongs",
      "type": {
        "type": "array",
        "items": "long"
      }
    },
    {
      "name": "myRequiredSubRecord",
      "type": {
        "type": "record",
        "name": "myRequiredSubRecordType",
        "fields": [
          {
            "name": "myRequiredFloat",
            "type": "float"
          }
        ]
      }
    },
    {
      "name": "myOptionalArraySubRecords",
      "type": [
        {
          "type": "array",
          "items": {
            "type": "record",
            "name": "myOptionalArraySubRecordType",
            "fields": [
              {
                "name": "myRequiredBoolean",
                "type": "boolean"
              }
            ]
          }
        },
        "null"
      ]
    }
  ]
}

4. Reading
java -jar ~/Downloads/avro-tools-1.8.1.jar tojson my-file.avro

{
  "myRequiredInt": 1,
  "myOptionalDouble": {
    "double": 2.2
  },
  "myNullableString": {
    "string": "abc"
  },
  "myRequiredArrayLongs": [
    1,
    2,
    3
  ],
  "myRequiredSubRecord": {
    "myRequiredFloat": 1.0
  },
  "myOptionalArraySubRecords": {
    "array": [
      {
        "myRequiredBoolean": true
      },
      {
        "myRequiredBoolean": true
      }
    ]
  }
}

In case we do not populate optional/nullable fields we get:

{
  "myRequiredInt": 1,
  "myOptionalDouble": null,
  "myNullableString": null,
  "myRequiredArrayLongs": [
    1,
    2,
    3
  ],
  "myRequiredSubRecord": {
    "myRequiredFloat": 1.0
  },
  "myOptionalArraySubRecords": null
}

spark read avro write json

me@MacBook:~/Downloads/spark-2.4.7-bin-hadoop2.6$ ./bin/spark-shell –jars ~/Downloads/spark-avro_2.11-2.4.7.jar
scala> spark.read.format(“avro”).load(“/tmp/flume/1604421105296-2”).filter(“entity is null”).show(1)
scala> spark.read.format(“avro”).load(“/tmp/flume/1604421105296-2”).filter(“entity is null”).count
res5: Long = 8
scala> spark.read.format(“avro”).load(“/tmp/flume/1604421105296-2”).filter(“entity is null”).write.format(“json”).save(“sr360_null_entity.json”)

Reading and writing parquet and avro files with apache spark

[me@myhost ~]$ spark2-shell 

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/02/02 06:08:36 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Spark context Web UI available at http://myhost:4041
Spark context available as 'sc' (master = yarn, app id = application_1570004862164_137877).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.3.0.cloudera4
/_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_141)
Type in expressions to have them evaluated.
Type :help for more information.

scala> case class MyClass(mystring: String, myboolean: Boolean, myint: Int, myLong: Long, myfloat: Float, mydouble: Double)

defined class MyClass

scala> val df = Seq(MyClass("a", true, 1, 1L, 1/7f, 1/7f), MyClass("b", false, 2, 2L, 2.0f, 2.0d)).toDF
df: org.apache.spark.sql.DataFrame = [mystring: string, myboolean: boolean ... 4 more fields]

scala> import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SaveMode

scala> df.write.mode(SaveMode.Overwrite).format("parquet").save("myclass_parquet")

scala> df.write.mode(SaveMode.Overwrite).parquet("myclass_parquet")

scala> df.write.mode(SaveMode.Overwrite).format("avro").save("myclass_avro")
org.apache.spark.sql.AnalysisException: Failed to find data source: avro. Please find an Avro package at http://spark.apache.org/third-party-projects.html;
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:634)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:241)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
... 49 elided

scala> df.write.mode(SaveMode.Overwrite).avro("myclass_avro")
:27: error: value avro is not a member of org.apache.spark.sql.DataFrameWriter[org.apache.spark.sql.Row]
df.write.mode(SaveMode.Overwrite).avro("myclass_avro")
^

scala> import com.databricks.spark.avro._
:24: error: object databricks is not a member of package com
import com.databricks.spark.avro._
^

[me@myhost ~]$ wget https://repo1.maven.org/maven2/com/databricks/spark-avro_2.11/4.0.0/spark-avro_2.11-4.0.0.jar
[me@myhost ~]$ spark2-shell --jars spark-avro_2.11-4.0.0.jar
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://myhost:4040
Spark context available as 'sc' (master = yarn, app id = application_1570004862164_137923).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.3.0.cloudera4
/_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_141)
Type in expressions to have them evaluated.
Type :help for more information.

scala> case class MyClass(mystring: String, myboolean: Boolean, myint: Int, myLong: Long, myfloat: Float, mydouble: Double)
defined class MyClass

scala> val df = Seq(MyClass("a", true, 1, 1L, 1/7f, 1/7f), MyClass("b", false, 2, 2L, 2.0f, 2.0d)).toDF
df: org.apache.spark.sql.DataFrame = [mystring: string, myboolean: boolean ... 4 more fields]

scala>

scala> import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SaveMode

scala> df.write.mode(SaveMode.Overwrite).format("avro").save("myclass_avro")
org.apache.spark.sql.AnalysisException: Failed to find data source: avro. Please find an Avro package at http://spark.apache.org/third-party-projects.html;
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:634)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:241)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
... 49 elided

scala> df.write.mode(SaveMode.Overwrite).format("com.databricks.spark.avro").save("myclass_avro")

scala> df.write.mode(SaveMode.Overwrite).avro("myclass_avro")
:27: error: value avro is not a member of org.apache.spark.sql.DataFrameWriter[org.apache.spark.sql.Row]
df.write.mode(SaveMode.Overwrite).avro("myclass_avro")
^

scala> import com.databricks.spark.avro._
import com.databricks.spark.avro._

scala> df.write.mode(SaveMode.Overwrite).avro("myclass_avro")

scala>

[me@myhost ~]$ hdfs dfs -ls myclass_parquet
Found 3 items
-rw-r--r-- 2 me me 0 2020-02-02 05:40 myclass_parquet/_SUCCESS
-rw-r--r-- 2 me me 1210 2020-02-02 05:40 myclass_parquet/part-00000-f8303ddc-21af-4d58-9697-ae2f470e4791-c000.snappy.parquet
-rw-r--r-- 2 me me 1210 2020-02-02 05:40 myclass_parquet/part-00001-f8303ddc-21af-4d58-9697-ae2f470e4791-c000.snappy.parquet

[me@myhost ~]$ hadoop fs -copyToLocal myclass_parquet/part-00000-f8303ddc-21af-4d58-9697-ae2f470e4791-c000.snappy.parquet

[me@myhost ~]$ parquet-tools schema part-00000-f8303ddc-21af-4d58-9697-ae2f470e4791-c000.snappy.parquet
message spark_schema {
optional binary mystring (UTF8);
required boolean myboolean;
required int32 myint;
required int64 myLong;
required float myfloat;
required double mydouble;
}

[me@myhost ~]$ parquet-tools cat part-00000-f8303ddc-21af-4d58-9697-ae2f470e4791-c000.snappy.parquet
mystring = a
myboolean = true
myint = 1
myLong = 1
myfloat = 0.14285715
mydouble = 0.1428571492433548

[me@myhost ~]$ hadoop fs -ls myclass_avro
Found 3 items
-rw-r--r-- 2 me me 0 2020-02-02 06:31 myclass_avro/_SUCCESS
-rw-r--r-- 2 me me 363 2020-02-02 06:31 myclass_avro/part-00000-31b5df6e-2ab4-4310-b4dc-11346dc7fc88-c000.avro
-rw-r--r-- 2 me me 363 2020-02-02 06:31 myclass_avro/part-00001-31b5df6e-2ab4-4310-b4dc-11346dc7fc88-c000.avro

[me@myhost ~]$ hadoop fs -copyToLocal myclass_avro/part-00000-31b5df6e-2ab4-4310-b4dc-11346dc7fc88-c000.avro

[me@myhost ~]$ avro-tools getschema part-00000-31b5df6e-2ab4-4310-b4dc-11346dc7fc88-c000.avro
{
"type" : "record",
"name" : "topLevelRecord",
"fields" : [ {
"name" : "mystring",
"type" : [ "string", "null" ]
}, {
"name" : "myboolean",
"type" : "boolean"
}, {
"name" : "myint",
"type" : "int"
}, {
"name" : "myLong",
"type" : "long"
}, {
"name" : "myfloat",
"type" : "float"
}, {
"name" : "mydouble",
"type" : "double"
} ]
}

[me@myhost ~]$ avro-tools tojson part-00000-31b5df6e-2ab4-4310-b4dc-11346dc7fc88-c000.avro
{"mystring":{"string":"a"},"myboolean":true,"myint":1,"myLong":1,"myfloat":0.14285715,"mydouble":0.1428571492433548}

[me@myhost ~]$ spark2-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://myhost:4040
Spark context available as 'sc' (master = yarn, app id = application_1570004862164_138201).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.3.0.cloudera4
/_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_141)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val df = spark.read.parquet("myclass_parquet")
df: org.apache.spark.sql.DataFrame = [mystring: string, myboolean: boolean ... 4 more fields]

scala> df.show
+--------+---------+-----+------+----------+------------------+
|mystring|myboolean|myint|myLong| myfloat| mydouble|
+--------+---------+-----+------+----------+------------------+
| a| true| 1| 1|0.14285715|0.1428571492433548|
| b| false| 2| 2| 2.0| 2.0|
+--------+---------+-----+------+----------+------------------+

scala> df.printSchema
root
|-- mystring: string (nullable = true)
|-- myboolean: boolean (nullable = true)
|-- myint: integer (nullable = true)
|-- myLong: long (nullable = true)
|-- myfloat: float (nullable = true)
|-- mydouble: double (nullable = true)

scala> val df = spark.read.format("parquet").load("myclass_parquet")
df: org.apache.spark.sql.DataFrame = [mystring: string, myboolean: boolean ... 4 more fields]

scala> res5.show
+--------+---------+-----+------+----------+------------------+
|mystring|myboolean|myint|myLong| myfloat| mydouble|
+--------+---------+-----+------+----------+------------------+
| a| true| 1| 1|0.14285715|0.1428571492433548|
| b| false| 2| 2| 2.0| 2.0|
+--------+---------+-----+------+----------+------------------+

[me@myhost ~]$ spark2-shell --jars spark-avro_2.11-4.0.0.jar
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://myhost:4040
Spark context available as 'sc' (master = yarn, app id = application_1570004862164_138246).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.3.0.cloudera4
/_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_141)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val df = spark.read.avro("myclass_avro")
:23: error: value avro is not a member of org.apache.spark.sql.DataFrameReader
val df = spark.read.avro("myclass_avro")
^

scala> val df = spark.read.format("avro").load("myclass_avro")
org.apache.spark.sql.AnalysisException: Failed to find data source: avro. Please find an Avro package at http://spark.apache.org/third-party-projects.html;
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:634)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:190)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:174)
... 49 elided

scala> val df = spark.read.format("com.databricks.spark.avro").load("myclass_avro")
df: org.apache.spark.sql.DataFrame = [mystring: string, myboolean: boolean ... 4 more fields]

scala> df.show
+--------+---------+-----+------+----------+------------------+
|mystring|myboolean|myint|myLong| myfloat| mydouble|
+--------+---------+-----+------+----------+------------------+
| a| true| 1| 1|0.14285715|0.1428571492433548|
| b| false| 2| 2| 2.0| 2.0|
+--------+---------+-----+------+----------+------------------+

scala> df.printSchema
root
|-- mystring: string (nullable = true)
|-- myboolean: boolean (nullable = true)
|-- myint: integer (nullable = true)
|-- myLong: long (nullable = true)
|-- myfloat: float (nullable = true)
|-- mydouble: double (nullable = true)

scala> import com.databricks.spark.avro._
import com.databricks.spark.avro._

scala> val df = spark.read.avro("myclass_avro")
df: org.apache.spark.sql.DataFrame = [mystring: string, myboolean: boolean ... 4 more fields]

scala> df.show
+--------+---------+-----+------+----------+------------------+
|mystring|myboolean|myint|myLong| myfloat| mydouble|
+--------+---------+-----+------+----------+------------------+
| a| true| 1| 1|0.14285715|0.1428571492433548|
| b| false| 2| 2| 2.0| 2.0|
+--------+---------+-----+------+----------+------------------+

scala> df.printSchema
root
|-- mystring: string (nullable = true)
|-- myboolean: boolean (nullable = true)
|-- myint: integer (nullable = true)
|-- myLong: long (nullable = true)
|-- myfloat: float (nullable = true)
|-- mydouble: double (nullable = true)

Hive external table creation, data partitioning and avro schema evolution

Read first: https://bmwieczorek.wordpress.com/2017/05/29/unions-and-default-value-in-apache-avro-serialization-and-deserialization/

Create avro schema user.avsc for User with single string property name and serialize it to users.avro and upload it to hdfs:

[cloudera@quickstart ~]$ hdfs dfs -copyFromLocal /media/sf_vbox-shared/user.avsc /schema/
[cloudera@quickstart ~]$ hdfs dfs -mkdir -p /data/users/year=2017/month=05/day=24/hour=09
[cloudera@quickstart ~]$ hdfs dfs -copyFromLocal /media/sf_vbox-shared/users/year\=2017/month\=05/day\=24/hour\=09/users.avro /data/users/year=2017/month=05/day=24/hour=09/

Connect to Hive via beeline, create external table, load it with users.avro file, repair it via msck repair table and display the hive table content:

[cloudera@quickstart ~]$ beeline
Beeline version 1.1.0-cdh5.10.0 by Apache Hive
beeline> !connect jdbc:hive2://localhost:10000
scan complete in 1ms
Connecting to jdbc:hive2://localhost:10000
Enter username for jdbc:hive2://localhost:10000: cloudera
Enter password for jdbc:hive2://localhost:10000: ********
Connected to: Apache Hive (version 1.1.0-cdh5.10.0)
Driver: Hive JDBC (version 1.1.0-cdh5.10.0)
Transaction isolation: TRANSACTION_REPEATABLE_READ
0: jdbc:hive2://localhost:10000> CREATE EXTERNAL TABLE Users PARTITIONED BY (year String, month String, day String, hour String) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' LOCATION '/data/users' TBLPROPERTIES ('avro.schema.url'='hdfs:///schema/user.avsc');
INFO  : Compiling command(queryId=hive_20170530023737_04121f8c-8e27-4b6f-8816-6fa8ccf7d993): CREATE EXTERNAL TABLE Users PARTITIONED BY (year String, month String, day String, hour String) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' LOCATION '/data/users' TBLPROPERTIES ('avro.schema.url'='hdfs:///schema/user.avsc')
INFO  : Semantic Analysis Completed
INFO  : Returning Hive schema: Schema(fieldSchemas:null, properties:null)
INFO  : Completed compiling command(queryId=hive_20170530023737_04121f8c-8e27-4b6f-8816-6fa8ccf7d993); Time taken: 0.028 seconds
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Executing command(queryId=hive_20170530023737_04121f8c-8e27-4b6f-8816-6fa8ccf7d993): CREATE EXTERNAL TABLE Users PARTITIONED BY (year String, month String, day String, hour String) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' LOCATION '/data/users' TBLPROPERTIES ('avro.schema.url'='hdfs:///schema/user.avsc')
INFO  : Starting task [Stage-0:DDL] in serial mode
INFO  : Completed executing command(queryId=hive_20170530023737_04121f8c-8e27-4b6f-8816-6fa8ccf7d993); Time taken: 0.073 seconds
INFO  : OK
No rows affected (0.117 seconds)
0: jdbc:hive2://localhost:10000> SELECT * FROM Users;
INFO  : Compiling command(queryId=hive_20170530023838_cc15d926-28e7-477d-8de3-948a1a7c00a3): SELECT * FROM Users
INFO  : Semantic Analysis Completed
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:users.name, type:string, comment:null), FieldSchema(name:users.year, type:string, comment:null), FieldSchema(name:users.month, type:string, comment:null), FieldSchema(name:users.day, type:string, comment:null), FieldSchema(name:users.hour, type:string, comment:null)], properties:null)
INFO  : Completed compiling command(queryId=hive_20170530023838_cc15d926-28e7-477d-8de3-948a1a7c00a3); Time taken: 0.087 seconds
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Executing command(queryId=hive_20170530023838_cc15d926-28e7-477d-8de3-948a1a7c00a3): SELECT * FROM Users
INFO  : Completed executing command(queryId=hive_20170530023838_cc15d926-28e7-477d-8de3-948a1a7c00a3); Time taken: 0.0 seconds
INFO  : OK
+-------------+-------------+--------------+------------+-------------+--+
| users.name  | users.year  | users.month  | users.day  | users.hour  |
+-------------+-------------+--------------+------------+-------------+--+
+-------------+-------------+--------------+------------+-------------+--+
No rows selected (0.128 seconds)
0: jdbc:hive2://localhost:10000> MSCK REPAIR TABLE Users;
INFO  : Compiling command(queryId=hive_20170530023939_b04b73a2-e03e-4b4b-826d-e00214ffbe50): MSCK REPAIR TABLE Users
INFO  : Semantic Analysis Completed
INFO  : Returning Hive schema: Schema(fieldSchemas:null, properties:null)
INFO  : Completed compiling command(queryId=hive_20170530023939_b04b73a2-e03e-4b4b-826d-e00214ffbe50); Time taken: 0.004 seconds
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Executing command(queryId=hive_20170530023939_b04b73a2-e03e-4b4b-826d-e00214ffbe50): MSCK REPAIR TABLE Users
INFO  : Starting task [Stage-0:DDL] in serial mode
INFO  : Completed executing command(queryId=hive_20170530023939_b04b73a2-e03e-4b4b-826d-e00214ffbe50); Time taken: 0.155 seconds
INFO  : OK
No rows affected (0.175 seconds)
0: jdbc:hive2://localhost:10000> SELECT * FROM Users;
INFO  : Compiling command(queryId=hive_20170530023939_392a3a78-0f40-45b7-b227-fa6cfdc69fce): SELECT * FROM Users
INFO  : Semantic Analysis Completed
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:users.name, type:string, comment:null), FieldSchema(name:users.year, type:string, comment:null), FieldSchema(name:users.month, type:string, comment:null), FieldSchema(name:users.day, type:string, comment:null), FieldSchema(name:users.hour, type:string, comment:null)], properties:null)
INFO  : Completed compiling command(queryId=hive_20170530023939_392a3a78-0f40-45b7-b227-fa6cfdc69fce); Time taken: 0.064 seconds
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Executing command(queryId=hive_20170530023939_392a3a78-0f40-45b7-b227-fa6cfdc69fce): SELECT * FROM Users
INFO  : Completed executing command(queryId=hive_20170530023939_392a3a78-0f40-45b7-b227-fa6cfdc69fce); Time taken: 0.0 seconds
INFO  : OK
+-------------+-------------+--------------+------------+-------------+--+
| users.name  | users.year  | users.month  | users.day  | users.hour  |
+-------------+-------------+--------------+------------+-------------+--+
| Alyssa      | 2017        | 05           | 24         | 09          |
+-------------+-------------+--------------+------------+-------------+--+

Change the User schema to include additional property favorite_color (union null and string with default null), generate new users.avro file and upload the avro file to hdfs to different partition (different hour)

[cloudera@quickstart dev]$ hdfs dfs -mkdir -p /data/users/year=2017/month=05/day=24/hour=10
[cloudera@quickstart dev]$ hdfs dfs -copyFromLocal /media/sf_vbox-shared/users/year\=2017/month\=05/day\=24/hour\=10/users.avro /data/users/year=2017/month=05/day=24/hour=10/

Repair hive table and select results with all rows but yet without new column favorite_color:

0: jdbc:hive2://localhost:10000> SELECT * FROM Users;
INFO  : Compiling command(queryId=hive_20170530024040_5f1a6880-61b2-4c87-bb35-0c643cd511ba): SELECT * FROM Users
INFO  : Semantic Analysis Completed
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:users.name, type:string, comment:null), FieldSchema(name:users.year, type:string, comment:null), FieldSchema(name:users.month, type:string, comment:null), FieldSchema(name:users.day, type:string, comment:null), FieldSchema(name:users.hour, type:string, comment:null)], properties:null)
INFO  : Completed compiling command(queryId=hive_20170530024040_5f1a6880-61b2-4c87-bb35-0c643cd511ba); Time taken: 0.068 seconds
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Executing command(queryId=hive_20170530024040_5f1a6880-61b2-4c87-bb35-0c643cd511ba): SELECT * FROM Users
INFO  : Completed executing command(queryId=hive_20170530024040_5f1a6880-61b2-4c87-bb35-0c643cd511ba); Time taken: 0.0 seconds
INFO  : OK
+-------------+-------------+--------------+------------+-------------+--+
| users.name  | users.year  | users.month  | users.day  | users.hour  |
+-------------+-------------+--------------+------------+-------------+--+
| Alyssa      | 2017        | 05           | 24         | 09          |
+-------------+-------------+--------------+------------+-------------+--+
1 row selected (0.105 seconds)
0: jdbc:hive2://localhost:10000> MSCK REPAIR TABLE Users;
INFO  : Compiling command(queryId=hive_20170530024040_2db31c0d-7f15-4f72-934a-a7b0dfacc045): MSCK REPAIR TABLE Users
INFO  : Semantic Analysis Completed
INFO  : Returning Hive schema: Schema(fieldSchemas:null, properties:null)
INFO  : Completed compiling command(queryId=hive_20170530024040_2db31c0d-7f15-4f72-934a-a7b0dfacc045); Time taken: 0.002 seconds
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Executing command(queryId=hive_20170530024040_2db31c0d-7f15-4f72-934a-a7b0dfacc045): MSCK REPAIR TABLE Users
INFO  : Starting task [Stage-0:DDL] in serial mode
INFO  : Completed executing command(queryId=hive_20170530024040_2db31c0d-7f15-4f72-934a-a7b0dfacc045); Time taken: 0.114 seconds
INFO  : OK
No rows affected (0.128 seconds)
0: jdbc:hive2://localhost:10000> SELECT * FROM Users;
INFO  : Compiling command(queryId=hive_20170530024040_4f0cbff8-4c54-4e72-ada0-c7ea2100be71): SELECT * FROM Users
INFO  : Semantic Analysis Completed
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:users.name, type:string, comment:null), FieldSchema(name:users.year, type:string, comment:null), FieldSchema(name:users.month, type:string, comment:null), FieldSchema(name:users.day, type:string, comment:null), FieldSchema(name:users.hour, type:string, comment:null)], properties:null)
INFO  : Completed compiling command(queryId=hive_20170530024040_4f0cbff8-4c54-4e72-ada0-c7ea2100be71); Time taken: 0.07 seconds
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Executing command(queryId=hive_20170530024040_4f0cbff8-4c54-4e72-ada0-c7ea2100be71): SELECT * FROM Users
INFO  : Completed executing command(queryId=hive_20170530024040_4f0cbff8-4c54-4e72-ada0-c7ea2100be71); Time taken: 0.0 seconds
INFO  : OK
+-------------+-------------+--------------+------------+-------------+--+
| users.name  | users.year  | users.month  | users.day  | users.hour  |
+-------------+-------------+--------------+------------+-------------+--+
| Alyssa      | 2017        | 05           | 24         | 09          |
| Alyssa      | 2017        | 05           | 24         | 10          |
+-------------+-------------+--------------+------------+-------------+--+
2 rows selected (0.119 seconds)
0: jdbc:hive2://localhost:10000> --delete old schema and upload new schema with favorite color to hdfs

Delete old schema from hdfs and replace it with new one containing new column

[cloudera@quickstart dev]$ hdfs dfs -rm -skipTrash /schema/user.avsc
Deleted /schema/user.avsc
[cloudera@quickstart dev]$ hdfs dfs -copyFromLocal /media/sf_vbox-shared/schema-with-favorite-color/user.avsc /schema/

Fix hive table and display all results with new column

0: jdbc:hive2://localhost:10000> MSCK REPAIR TABLE Users;
INFO  : Compiling command(queryId=hive_20170530024444_0bb2c55f-7e12-4cde-8154-e77ce491e035): MSCK REPAIR TABLE Users
INFO  : Semantic Analysis Completed
INFO  : Returning Hive schema: Schema(fieldSchemas:null, properties:null)
INFO  : Completed compiling command(queryId=hive_20170530024444_0bb2c55f-7e12-4cde-8154-e77ce491e035); Time taken: 0.001 seconds
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Executing command(queryId=hive_20170530024444_0bb2c55f-7e12-4cde-8154-e77ce491e035): MSCK REPAIR TABLE Users
INFO  : Starting task [Stage-0:DDL] in serial mode
INFO  : Completed executing command(queryId=hive_20170530024444_0bb2c55f-7e12-4cde-8154-e77ce491e035); Time taken: 0.047 seconds
INFO  : OK
No rows affected (0.061 seconds)
0: jdbc:hive2://localhost:10000> SELECT * FROM Users;
INFO  : Compiling command(queryId=hive_20170530024545_e9142a48-92c5-41c9-a6c6-f2a611fa4e4f): SELECT * FROM Users
INFO  : Semantic Analysis Completed
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:users.name, type:string, comment:null), FieldSchema(name:users.favorite_number, type:int, comment:null), FieldSchema(name:users.year, type:string, comment:null), FieldSchema(name:users.month, type:string, comment:null), FieldSchema(name:users.day, type:string, comment:null), FieldSchema(name:users.hour, type:string, comment:null)], properties:null)
INFO  : Completed compiling command(queryId=hive_20170530024545_e9142a48-92c5-41c9-a6c6-f2a611fa4e4f); Time taken: 0.059 seconds
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Executing command(queryId=hive_20170530024545_e9142a48-92c5-41c9-a6c6-f2a611fa4e4f): SELECT * FROM Users
INFO  : Completed executing command(queryId=hive_20170530024545_e9142a48-92c5-41c9-a6c6-f2a611fa4e4f); Time taken: 0.0 seconds
INFO  : OK
+-------------+------------------------+-------------+--------------+------------+-------------+--+
| users.name  | users.favorite_number  | users.year  | users.month  | users.day  | users.hour  |
+-------------+------------------------+-------------+--------------+------------+-------------+--+
| Alyssa      | NULL                   | 2017        | 05           | 24         | 09          |
| Alyssa      | 256                    | 2017        | 05           | 24         | 10          |
+-------------+------------------------+-------------+--------------+------------+-------------+--+
2 rows selected (0.118 seconds)
0: jdbc:hive2://localhost:10000>

Unions and default value in apache avro serialization and deserialization

Initial avro schema (user.avsc) defines a User record with a name field only.

{
  "namespace": "com.bawi.avro.model",
  "type": "record",
  "name": "User",
  "fields": [
    {
      "name": "name",
      "type": "string"
    }
  ]
}

Maven pom.xml defines avro dependency

        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.8.1</version>
        </dependency>

so we can serialize the User data in Java to disc to user.avro file

        Schema schema = new Schema.Parser().parse(new File("user.avsc"));
        File avroFile = new File("target/user.avro");
        GenericRecord user = new GenericData.Record(schema);
        user.put("name", "Alyssa");
        DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
        DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
        dataFileWriter.create(schema, avroFile);
        dataFileWriter.append(user);
        dataFileWriter.close();

we can read (deserialize) User using the same schema from the disc either by Java

        DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
        DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(avroFile, datumReader);
        GenericRecord user2 = null;
        while (dataFileReader.hasNext()) {
            user2 = dataFileReader.next(user2);
            System.out.println(user2);
        }

or by using avro-utils jar that can be downloaded by maven when declared maven test dependency:

        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-tools</artifactId>
            <version>1.8.1</version>
            <scope>test</scope>
        </dependency>

and running with ‘tojson’ argument

me@MacBook:~/dev/my-projects/my-avro$ java -jar /Users/me/.m2/repository/org/apache/avro/avro-tools/1.8.1/avro-tools-1.8.1.jar tojson users.avro 
{"name":"Alyssa"}

Then we will add a new favorite_number element to the schema:

{
  "namespace": "com.bawi.avro.model",
  "type": "record",
  "name": "User",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "favorite_number",
      "type": "int"
    }
  ]
}

but not yet write favourite_number in the Java code.

When trying to write we get

org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.NullPointerException: null of int in field favorite_number of com.bawi.avro.model.User

since the favorite_number field is required by the avro schema but was not written by the writer.

Add a union of null and int value fixes the writing problem (union of int and null also works)

    {
      "name": "favorite_number",
      "type": [
        "null",
        "int"
      ]
    }

or

    {
      "name": "favorite_number",
      "type": [
        "int",
        "null"
      ]
    }

If written avro file has schema with favorite_number and it is written as null then it will always be read as null irregardless how the read schema looks like (default value affects only reading fields that were not defined in schema used for writing so the null values were not written, only schema used for reading should define that field (including default), schema used for writing should not define that field at all)

Lets assume different scenario where the write schema has only name field (without favorite_number):

{
  "namespace": "com.bawi.avro.model",
  "type": "record",
  "name": "User",
  "fields": [
    {
      "name": "name",
      "type": "string"
    }
  ]
}

and we write only name field into avro

Lets assume we want favorite_number to be set to -1 (with lets say new requirement to always populate in java code the favorite_number since we do not want to check for null for favorite_number fields when reading avro/hive table on the top of avro). Then lets modify the read schema to include default -1:
user_with_default_favourite_number.avsc:

{
  "namespace": "com.bawi.avro.model",
  "type": "record",
  "name": "User",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "favorite_number",
      "type": [
        "int",
        "null"
      ],
      "default": -1
    }
  ]
}

with

File file2 = new File("user_with_default_favourite_number.avsc");
Schema schema2 = new Schema.Parser().parse(file2);
DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema2);

and the output is:

{"name": "Alyssa", "favorite_number": -1}

If we change read schema for favorite_number to invalid:

    {
      "name": "favorite_number",
      "type": [
        "null",
        "int"
      ],
      "default": -1
    }

then we get:

org.apache.avro.AvroTypeException: Non-null default value for null type: -1

so if default non-null value is given then null in union needs to on second place.

If we want to have "default": null then on the first place in the union needs to be null:

    {
      "name": "favorite_number",
      "type": [
        "null",
        "int"
      ],
      "default": null
    }

since for invalid:

    {
      "name": "favorite_number",
      "type": [
        "int",
        "null"
      ],
      "default": null
    }

we will get

org.apache.avro.AvroTypeException: Non-numeric default value for int: null

as described in https://avro.apache.org/docs/1.7.7/spec.html#Unions