Tag: avro

Hive external table creation, data partitioning and avro schema evolution

Read first: https://bmwieczorek.wordpress.com/2017/05/29/unions-and-default-value-in-apache-avro-serialization-and-deserialization/

Create avro schema user.avsc for User with single string property name and serialize it to users.avro and upload it to hdfs:

[cloudera@quickstart ~]$ hdfs dfs -copyFromLocal /media/sf_vbox-shared/user.avsc /schema/
[cloudera@quickstart ~]$ hdfs dfs -mkdir -p /data/users/year=2017/month=05/day=24/hour=09
[cloudera@quickstart ~]$ hdfs dfs -copyFromLocal /media/sf_vbox-shared/users/year\=2017/month\=05/day\=24/hour\=09/users.avro /data/users/year=2017/month=05/day=24/hour=09/

Connect to Hive via beeline, create external table, load it with users.avro file, repair it via msck repair table and display the hive table content:

[cloudera@quickstart ~]$ beeline
Beeline version 1.1.0-cdh5.10.0 by Apache Hive
beeline> !connect jdbc:hive2://localhost:10000
scan complete in 1ms
Connecting to jdbc:hive2://localhost:10000
Enter username for jdbc:hive2://localhost:10000: cloudera
Enter password for jdbc:hive2://localhost:10000: ********
Connected to: Apache Hive (version 1.1.0-cdh5.10.0)
Driver: Hive JDBC (version 1.1.0-cdh5.10.0)
Transaction isolation: TRANSACTION_REPEATABLE_READ
0: jdbc:hive2://localhost:10000> CREATE EXTERNAL TABLE Users PARTITIONED BY (year String, month String, day String, hour String) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' LOCATION '/data/users' TBLPROPERTIES ('avro.schema.url'='hdfs:///schema/user.avsc');
INFO  : Compiling command(queryId=hive_20170530023737_04121f8c-8e27-4b6f-8816-6fa8ccf7d993): CREATE EXTERNAL TABLE Users PARTITIONED BY (year String, month String, day String, hour String) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' LOCATION '/data/users' TBLPROPERTIES ('avro.schema.url'='hdfs:///schema/user.avsc')
INFO  : Semantic Analysis Completed
INFO  : Returning Hive schema: Schema(fieldSchemas:null, properties:null)
INFO  : Completed compiling command(queryId=hive_20170530023737_04121f8c-8e27-4b6f-8816-6fa8ccf7d993); Time taken: 0.028 seconds
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Executing command(queryId=hive_20170530023737_04121f8c-8e27-4b6f-8816-6fa8ccf7d993): CREATE EXTERNAL TABLE Users PARTITIONED BY (year String, month String, day String, hour String) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' LOCATION '/data/users' TBLPROPERTIES ('avro.schema.url'='hdfs:///schema/user.avsc')
INFO  : Starting task [Stage-0:DDL] in serial mode
INFO  : Completed executing command(queryId=hive_20170530023737_04121f8c-8e27-4b6f-8816-6fa8ccf7d993); Time taken: 0.073 seconds
INFO  : OK
No rows affected (0.117 seconds)
0: jdbc:hive2://localhost:10000> SELECT * FROM Users;
INFO  : Compiling command(queryId=hive_20170530023838_cc15d926-28e7-477d-8de3-948a1a7c00a3): SELECT * FROM Users
INFO  : Semantic Analysis Completed
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:users.name, type:string, comment:null), FieldSchema(name:users.year, type:string, comment:null), FieldSchema(name:users.month, type:string, comment:null), FieldSchema(name:users.day, type:string, comment:null), FieldSchema(name:users.hour, type:string, comment:null)], properties:null)
INFO  : Completed compiling command(queryId=hive_20170530023838_cc15d926-28e7-477d-8de3-948a1a7c00a3); Time taken: 0.087 seconds
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Executing command(queryId=hive_20170530023838_cc15d926-28e7-477d-8de3-948a1a7c00a3): SELECT * FROM Users
INFO  : Completed executing command(queryId=hive_20170530023838_cc15d926-28e7-477d-8de3-948a1a7c00a3); Time taken: 0.0 seconds
INFO  : OK
+-------------+-------------+--------------+------------+-------------+--+
| users.name  | users.year  | users.month  | users.day  | users.hour  |
+-------------+-------------+--------------+------------+-------------+--+
+-------------+-------------+--------------+------------+-------------+--+
No rows selected (0.128 seconds)
0: jdbc:hive2://localhost:10000> MSCK REPAIR TABLE Users;
INFO  : Compiling command(queryId=hive_20170530023939_b04b73a2-e03e-4b4b-826d-e00214ffbe50): MSCK REPAIR TABLE Users
INFO  : Semantic Analysis Completed
INFO  : Returning Hive schema: Schema(fieldSchemas:null, properties:null)
INFO  : Completed compiling command(queryId=hive_20170530023939_b04b73a2-e03e-4b4b-826d-e00214ffbe50); Time taken: 0.004 seconds
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Executing command(queryId=hive_20170530023939_b04b73a2-e03e-4b4b-826d-e00214ffbe50): MSCK REPAIR TABLE Users
INFO  : Starting task [Stage-0:DDL] in serial mode
INFO  : Completed executing command(queryId=hive_20170530023939_b04b73a2-e03e-4b4b-826d-e00214ffbe50); Time taken: 0.155 seconds
INFO  : OK
No rows affected (0.175 seconds)
0: jdbc:hive2://localhost:10000> SELECT * FROM Users;
INFO  : Compiling command(queryId=hive_20170530023939_392a3a78-0f40-45b7-b227-fa6cfdc69fce): SELECT * FROM Users
INFO  : Semantic Analysis Completed
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:users.name, type:string, comment:null), FieldSchema(name:users.year, type:string, comment:null), FieldSchema(name:users.month, type:string, comment:null), FieldSchema(name:users.day, type:string, comment:null), FieldSchema(name:users.hour, type:string, comment:null)], properties:null)
INFO  : Completed compiling command(queryId=hive_20170530023939_392a3a78-0f40-45b7-b227-fa6cfdc69fce); Time taken: 0.064 seconds
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Executing command(queryId=hive_20170530023939_392a3a78-0f40-45b7-b227-fa6cfdc69fce): SELECT * FROM Users
INFO  : Completed executing command(queryId=hive_20170530023939_392a3a78-0f40-45b7-b227-fa6cfdc69fce); Time taken: 0.0 seconds
INFO  : OK
+-------------+-------------+--------------+------------+-------------+--+
| users.name  | users.year  | users.month  | users.day  | users.hour  |
+-------------+-------------+--------------+------------+-------------+--+
| Alyssa      | 2017        | 05           | 24         | 09          |
+-------------+-------------+--------------+------------+-------------+--+

Change the User schema to include additional property favorite_color (union null and string with default null), generate new users.avro file and upload the avro file to hdfs to different partition (different hour)

[cloudera@quickstart dev]$ hdfs dfs -mkdir -p /data/users/year=2017/month=05/day=24/hour=10
[cloudera@quickstart dev]$ hdfs dfs -copyFromLocal /media/sf_vbox-shared/users/year\=2017/month\=05/day\=24/hour\=10/users.avro /data/users/year=2017/month=05/day=24/hour=10/

Repair hive table and select results with all rows but yet without new column favorite_color:

0: jdbc:hive2://localhost:10000> SELECT * FROM Users;
INFO  : Compiling command(queryId=hive_20170530024040_5f1a6880-61b2-4c87-bb35-0c643cd511ba): SELECT * FROM Users
INFO  : Semantic Analysis Completed
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:users.name, type:string, comment:null), FieldSchema(name:users.year, type:string, comment:null), FieldSchema(name:users.month, type:string, comment:null), FieldSchema(name:users.day, type:string, comment:null), FieldSchema(name:users.hour, type:string, comment:null)], properties:null)
INFO  : Completed compiling command(queryId=hive_20170530024040_5f1a6880-61b2-4c87-bb35-0c643cd511ba); Time taken: 0.068 seconds
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Executing command(queryId=hive_20170530024040_5f1a6880-61b2-4c87-bb35-0c643cd511ba): SELECT * FROM Users
INFO  : Completed executing command(queryId=hive_20170530024040_5f1a6880-61b2-4c87-bb35-0c643cd511ba); Time taken: 0.0 seconds
INFO  : OK
+-------------+-------------+--------------+------------+-------------+--+
| users.name  | users.year  | users.month  | users.day  | users.hour  |
+-------------+-------------+--------------+------------+-------------+--+
| Alyssa      | 2017        | 05           | 24         | 09          |
+-------------+-------------+--------------+------------+-------------+--+
1 row selected (0.105 seconds)
0: jdbc:hive2://localhost:10000> MSCK REPAIR TABLE Users;
INFO  : Compiling command(queryId=hive_20170530024040_2db31c0d-7f15-4f72-934a-a7b0dfacc045): MSCK REPAIR TABLE Users
INFO  : Semantic Analysis Completed
INFO  : Returning Hive schema: Schema(fieldSchemas:null, properties:null)
INFO  : Completed compiling command(queryId=hive_20170530024040_2db31c0d-7f15-4f72-934a-a7b0dfacc045); Time taken: 0.002 seconds
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Executing command(queryId=hive_20170530024040_2db31c0d-7f15-4f72-934a-a7b0dfacc045): MSCK REPAIR TABLE Users
INFO  : Starting task [Stage-0:DDL] in serial mode
INFO  : Completed executing command(queryId=hive_20170530024040_2db31c0d-7f15-4f72-934a-a7b0dfacc045); Time taken: 0.114 seconds
INFO  : OK
No rows affected (0.128 seconds)
0: jdbc:hive2://localhost:10000> SELECT * FROM Users;
INFO  : Compiling command(queryId=hive_20170530024040_4f0cbff8-4c54-4e72-ada0-c7ea2100be71): SELECT * FROM Users
INFO  : Semantic Analysis Completed
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:users.name, type:string, comment:null), FieldSchema(name:users.year, type:string, comment:null), FieldSchema(name:users.month, type:string, comment:null), FieldSchema(name:users.day, type:string, comment:null), FieldSchema(name:users.hour, type:string, comment:null)], properties:null)
INFO  : Completed compiling command(queryId=hive_20170530024040_4f0cbff8-4c54-4e72-ada0-c7ea2100be71); Time taken: 0.07 seconds
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Executing command(queryId=hive_20170530024040_4f0cbff8-4c54-4e72-ada0-c7ea2100be71): SELECT * FROM Users
INFO  : Completed executing command(queryId=hive_20170530024040_4f0cbff8-4c54-4e72-ada0-c7ea2100be71); Time taken: 0.0 seconds
INFO  : OK
+-------------+-------------+--------------+------------+-------------+--+
| users.name  | users.year  | users.month  | users.day  | users.hour  |
+-------------+-------------+--------------+------------+-------------+--+
| Alyssa      | 2017        | 05           | 24         | 09          |
| Alyssa      | 2017        | 05           | 24         | 10          |
+-------------+-------------+--------------+------------+-------------+--+
2 rows selected (0.119 seconds)
0: jdbc:hive2://localhost:10000> --delete old schema and upload new schema with favorite color to hdfs

Delete old schema from hdfs and replace it with new one containing new column

[cloudera@quickstart dev]$ hdfs dfs -rm -skipTrash /schema/user.avsc
Deleted /schema/user.avsc
[cloudera@quickstart dev]$ hdfs dfs -copyFromLocal /media/sf_vbox-shared/schema-with-favorite-color/user.avsc /schema/

Fix hive table and display all results with new column

0: jdbc:hive2://localhost:10000> MSCK REPAIR TABLE Users;
INFO  : Compiling command(queryId=hive_20170530024444_0bb2c55f-7e12-4cde-8154-e77ce491e035): MSCK REPAIR TABLE Users
INFO  : Semantic Analysis Completed
INFO  : Returning Hive schema: Schema(fieldSchemas:null, properties:null)
INFO  : Completed compiling command(queryId=hive_20170530024444_0bb2c55f-7e12-4cde-8154-e77ce491e035); Time taken: 0.001 seconds
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Executing command(queryId=hive_20170530024444_0bb2c55f-7e12-4cde-8154-e77ce491e035): MSCK REPAIR TABLE Users
INFO  : Starting task [Stage-0:DDL] in serial mode
INFO  : Completed executing command(queryId=hive_20170530024444_0bb2c55f-7e12-4cde-8154-e77ce491e035); Time taken: 0.047 seconds
INFO  : OK
No rows affected (0.061 seconds)
0: jdbc:hive2://localhost:10000> SELECT * FROM Users;
INFO  : Compiling command(queryId=hive_20170530024545_e9142a48-92c5-41c9-a6c6-f2a611fa4e4f): SELECT * FROM Users
INFO  : Semantic Analysis Completed
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:users.name, type:string, comment:null), FieldSchema(name:users.favorite_number, type:int, comment:null), FieldSchema(name:users.year, type:string, comment:null), FieldSchema(name:users.month, type:string, comment:null), FieldSchema(name:users.day, type:string, comment:null), FieldSchema(name:users.hour, type:string, comment:null)], properties:null)
INFO  : Completed compiling command(queryId=hive_20170530024545_e9142a48-92c5-41c9-a6c6-f2a611fa4e4f); Time taken: 0.059 seconds
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Executing command(queryId=hive_20170530024545_e9142a48-92c5-41c9-a6c6-f2a611fa4e4f): SELECT * FROM Users
INFO  : Completed executing command(queryId=hive_20170530024545_e9142a48-92c5-41c9-a6c6-f2a611fa4e4f); Time taken: 0.0 seconds
INFO  : OK
+-------------+------------------------+-------------+--------------+------------+-------------+--+
| users.name  | users.favorite_number  | users.year  | users.month  | users.day  | users.hour  |
+-------------+------------------------+-------------+--------------+------------+-------------+--+
| Alyssa      | NULL                   | 2017        | 05           | 24         | 09          |
| Alyssa      | 256                    | 2017        | 05           | 24         | 10          |
+-------------+------------------------+-------------+--------------+------------+-------------+--+
2 rows selected (0.118 seconds)
0: jdbc:hive2://localhost:10000>

Unions and default value in apache avro serialization and deserialization

Initial avro schema (schema/user.avsc) defines a User record with a name field only.

{
  "namespace": "com.bawi.avro.model",
  "type": "record",
  "name": "User",
  "fields": [
    {
      "name": "name",
      "type": "string"
    }
  ]
}

Maven pom.xml defines avro dependency

        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.8.1</version>
        </dependency>

so we can serialize the User data in Java to disc to user.avro file

        Schema schema = new Schema.Parser().parse(new File("schema/user.avsc"));
        File avroFile = new File("target/user.avro");
        GenericRecord user = new GenericData.Record(schema);
        user.put("name", "Alyssa");
        DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
        DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
        dataFileWriter.create(schema, avroFile);
        dataFileWriter.append(user);
        dataFileWriter.close();

we can read (deserialize) User from the disc either by Java

        DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
        DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(avroFile, datumReader);
        GenericRecord user = null;
        while (dataFileReader.hasNext()) {
            user = dataFileReader.next(user);
            System.out.println(user);
        }

or by using avro-utils jar that can be downloaded by maven when declared maven test dependency:

        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-tools</artifactId>
            <version>1.8.1</version>
            <scope>test</scope>
        </dependency>

and running with ‘tojson’ argument

me@MacBook:~/dev/my-projects/my-avro$ java -jar /Users/me/.m2/repository/org/apache/avro/avro-tools/1.8.1/avro-tools-1.8.1.jar tojson users.avro 
{"name":"Alyssa"}

Then we will add a new favorite_number element to the schema:

{
  "namespace": "com.bawi.avro.model",
  "type": "record",
  "name": "User",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "favorite_number",
      "type": "int"
    }
  ]
}

and run the deserialization Java code for existing data in the user.avro but against the new schema, then we get:

Exception in thread "main" org.apache.avro.AvroTypeException: Found com.bawi.avro.model.User, expecting com.bawi.avro.model.User, missing required field favorite_number

since the favorite_number does not exist in avro file.

Adding only a union of int and null value does not help to get rid the error above.

The solution is to add a default value with a union for favorite_number e.g.:

    {
      "name": "favorite_number",
      "type": [
        "null",
        "int"
      ],
      "default": null
    }

to get: {“name”: “Alyssa”, “favorite_number”: null}
or add

    {
      "name": "favorite_number",
      "type": "int",
      "default": 0
    }

to get: {“name”: “Alyssa”, “favorite_number”: 0}

Please note that placing int as first argument of a union and having null as default value such as:

    {
      "name": "favorite_number",
      "type": [
        "int",
        "null"
      ],
      "default": null
    }

gives an error:

Exception in thread "main" org.apache.avro.AvroTypeException: Non-numeric default value for int: null

or

Exception in thread "main" org.apache.avro.AvroTypeException: Non-null default value for null type: 0

when

    {
      "name": "favorite_number",
      "type": [
        "null",
        "int"
      ],
      "default": 0
    }

as described in https://avro.apache.org/docs/1.7.7/spec.html#Unions