Big Data Ingestion with apache Kafka and Flume and custom avro data serialization

1. Start zookeeper

me@Mac:~/kafka_2.11-1.1.1$ rm -rf /tmp/zookeeper/ && bin/zookeeper-server-start.sh config/zookeeper.properties

2. Start kafka

me@Mac:~/kafka_2.11-1.1.1$ rm -rf /tmp/kafka-logs && bin/kafka-server-start.sh config/server.properties

3. Create topic

me@Mac:~/kafka_2.11-1.1.1$ bin/kafka-topics.sh --create --zookeeper localhost:2181 \
  --replication-factor 1 --partitions 1 --topic my-topic

4. Create maven project with:

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <scope>compile</scope>
        <flume.version>1.9.0</flume.version>
        <avro.version>1.7.5</avro.version> <!-- org.apache.avro.SchemaBuilder available since 1.7.5 -->
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro</artifactId>
                <version>${avro.version}</version>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>${flume.version}</version>
            <scope>${scope}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flume.flume-ng-channels</groupId>
            <artifactId>flume-kafka-channel</artifactId>
            <version>${flume.version}</version>
            <scope>${scope}</scope>
        </dependency>
    </dependencies>

5. Create KafkaChannelWithHeaders based on KafkaChannel using diff:

me@Mac:/tmp$ curl https://repo1.maven.org/maven2/org/apache/flume/flume-ng-channels/flume-kafka-channel/1.9.0/flume-kafka-channel-1.9.0-sources.jar -O | \
  unzip -p flume-kafka-channel-1.9.0-sources.jar \
    org/apache/flume/channel/kafka/KafkaChannel.java > KafkaChannel.java && \
  diff --ignore-space-change KafkaChannel.java \
    ~/dev/my-apache-kafka/src/main/java/org/apache/flume/channel/kafka/KafkaChannelWithHeaders.java 
55a56
> import org.apache.kafka.common.header.Header;
61a63
> 
66d67
 import java.util.function.Function;
> import java.util.stream.Collectors;
> import java.util.stream.Stream;
83c87
 public class KafkaChannelWithHeaders extends BasicChannelSemantics {
502c506
             e = deserializeValue(record, parseAsFlumeEvent);
631c635,636
         private Event deserializeValue(ConsumerRecord record, boolean parseAsFlumeEvent) throws IOException {
>             byte[] value = record.value();
645c650,654
             Header[] headers = record.headers().toArray();
>             Map headerKeyValues = 
>                         headers == null ?
>                         Collections.emptyMap() :
>                         Stream.of(headers).collect(Collectors.toMap(Header::key, header -> new String(header.value())));
>             e = EventBuilder.withBody(value, headerKeyValues);
694c703
             records = consumer.poll(pollTimeout);

6. Create MySerializer

package org.apache.flume.channel.kafka;

import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.serialization.AbstractAvroEventSerializer;
import org.apache.flume.serialization.EventSerializer;

import java.io.OutputStream;

public class MySerializer extends AbstractAvroEventSerializer<MySerializer.MyRecord> {

/*
    private static final Schema SCHEMA = new Schema.Parser().parse(
            "{\"type\":\"record\", \"name\": \"MyRecord\", \"fields\": [" +
                    "{\"name\": \"myHeader\", \"type\": \"string\"}, " +
                    "{\"name\": \"body\", \"type\": \"bytes\"}" +
               "]}");
*/
    private static final Schema SCHEMA =
            SchemaBuilder.record("MyRecord")
                    .fields()
                    .optionalString("myHeader")
                    .requiredBytes("body")
                .endRecord();

    public static class MyRecord {
        private String myHeader;
        private byte[] body;

        public MyRecord(String myHeader, byte[] body) {
            this.myHeader = myHeader;
            this.body = body;
        }

        public String getMyHeader() {
            return myHeader;
        }

        public byte[] getBody() {
            return body;
        }
    }

    private final OutputStream out;

    public MySerializer(OutputStream out) {
        this.out = out;
    }

    @Override
    protected OutputStream getOutputStream() {
        return out;
    }

    @Override
    protected Schema getSchema() {
        SchemaBuilder.record("MyRecord")
                .fields()
                    .nullableString("myHeader", null)
                    .requiredBytes("body")
                .endRecord();
        return SCHEMA;
    }

    @Override
    protected MySerializer.MyRecord convert(Event event) {
        return new MySerializer.MyRecord(event.getHeaders().get("my-header-key"), event.getBody());
    }

    @SuppressWarnings("unused") // used in flume agent properties: a1.sinks.k1.sink.serializer = org.apache.flume.channel.kafka.MySerializer$Builder
    public static class Builder implements EventSerializer.Builder{

        @Override
        public EventSerializer build(Context context, OutputStream out) {
            MySerializer mySerializer = new MySerializer(out);
            mySerializer.configure(context);
            return mySerializer;
        }
    }
}

7. Build the jar with custom channer and serializer files and copy the jar to apache-flume-1.9.0-bin/lib. If you want to build avro schema BuilderSchema programmatically then you need to upgrade avro version to at least 1.7.5 in maven project and also in flume lib directory. You do not need to upgrade avro if you define schema as String and parse it (commented approach).

8. Create flume agent configuration

me@Mac:~/apache-flume-1.9.0-bin$ cat conf/example-kafka-channel-roll-file-my-serializer-sink.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
# no source

# Describe the sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /tmp/flume
a1.sinks.k1.sink.serializer = org.apache.flume.channel.kafka.MySerializer$Builder

# Use a channel which buffers events in memory
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannelWithHeaders
a1.channels.c1.kafka.bootstrap.servers = localhost:9092
a1.channels.c1.kafka.topic = my-topic
a1.channels.c1.parseAsFlumeEvent = false

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

9. Start flume

me@Mac:~/apache-flume-1.9.0-bin$ bin/flume-ng agent --conf conf \
  --conf-file conf/example-kafka-channel-roll-file-my-serializer-sink.conf --name a1 \
  -Dflume.root.logger=DEBUG,console -Dorg.apache.flume.log.printconfig=true \
  -Dorg.apache.flume.log.rawdata=true

or with debug

me@Mac:~/apache-flume-1.9.0-bin$ /Library/Java/JavaVirtualMachines/jdk1.8.0_241.jdk/Contents/Home/bin/java -Xdebug -Xrunjdwp:server=y,transport=dt_socket,address=5005,suspend=y -Xmx20m -Dflume.root.logger=DEBUG,console -Dorg.apache.flume.log.printconfig=true -Dorg.apache.flume.log.rawdata=true -cp '/Users/me/apache-flume-1.9.0-bin/conf:/Users/me/apache-flume-1.9.0-bin/lib/*' -Djava.library.path= org.apache.flume.node.Application --conf-file conf/example-kafka-channel-roll-file-my-serializer-sink.conf --name a1

see Intellij debugging local apache flume with netcat source and logger sink

10. Send data using java kafka producer Apache Kafka – producers and consumers – command line and java for details

11. View the ingested avro file content and schema:

me@Mac:/tmp/flume$ java -jar ~/Downloads/avro-tools-1.8.1.jar tojson 1602059778485-4
{"myHeader":{"string":"my-header-value"},"body":"hello kafka123"}
me@Mac:/tmp/flume$ java -jar ~/Downloads/avro-tools-1.8.1.jar getschema 1602059778485-4
{
  "type" : "record",
  "name" : "MyRecord",
  "fields" : [ {
    "name" : "myHeader",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "body",
    "type" : "bytes"
  } ]
}
me@Mac:/tmp/flume java -jar ~/Downloads/avro-tools-1.8.1.jar cat 1602005189205-1
Objavro.schema?{"type":"record","name":"MyRecord","fields":[{"name":"myHeader","type":["null","string"],"default":null},{"name":"body","type":"bytes"}]}avro.c@b ?ֵG?.? t

Leave a comment