1. Start zookeeper
me@Mac:~/kafka_2.11-1.1.1$ rm -rf /tmp/zookeeper/ && bin/zookeeper-server-start.sh config/zookeeper.properties
2. Start kafka
me@Mac:~/kafka_2.11-1.1.1$ rm -rf /tmp/kafka-logs && bin/kafka-server-start.sh config/server.properties
3. Create topic
me@Mac:~/kafka_2.11-1.1.1$ bin/kafka-topics.sh --create --zookeeper localhost:2181 \ --replication-factor 1 --partitions 1 --topic my-topic
4. Create maven project with:
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <scope>compile</scope> <flume.version>1.9.0</flume.version> <avro.version>1.7.5</avro.version> <!-- org.apache.avro.SchemaBuilder available since 1.7.5 --> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>${avro.version}</version> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>${flume.version}</version> <scope>${scope}</scope> </dependency> <dependency> <groupId>org.apache.flume.flume-ng-channels</groupId> <artifactId>flume-kafka-channel</artifactId> <version>${flume.version}</version> <scope>${scope}</scope> </dependency> </dependencies>
5. Create KafkaChannelWithHeaders based on KafkaChannel using diff:
me@Mac:/tmp$ curl https://repo1.maven.org/maven2/org/apache/flume/flume-ng-channels/flume-kafka-channel/1.9.0/flume-kafka-channel-1.9.0-sources.jar -O | \ unzip -p flume-kafka-channel-1.9.0-sources.jar \ org/apache/flume/channel/kafka/KafkaChannel.java > KafkaChannel.java && \ diff --ignore-space-change KafkaChannel.java \ ~/dev/my-apache-kafka/src/main/java/org/apache/flume/channel/kafka/KafkaChannelWithHeaders.java
55a56 > import org.apache.kafka.common.header.Header; 61a63 > 66d67 import java.util.function.Function; > import java.util.stream.Collectors; > import java.util.stream.Stream; 83c87 public class KafkaChannelWithHeaders extends BasicChannelSemantics { 502c506 e = deserializeValue(record, parseAsFlumeEvent); 631c635,636 private Event deserializeValue(ConsumerRecord record, boolean parseAsFlumeEvent) throws IOException { > byte[] value = record.value(); 645c650,654 Header[] headers = record.headers().toArray(); > Map headerKeyValues = > headers == null ? > Collections.emptyMap() : > Stream.of(headers).collect(Collectors.toMap(Header::key, header -> new String(header.value()))); > e = EventBuilder.withBody(value, headerKeyValues); 694c703 records = consumer.poll(pollTimeout);
6. Create MySerializer
package org.apache.flume.channel.kafka; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.serialization.AbstractAvroEventSerializer; import org.apache.flume.serialization.EventSerializer; import java.io.OutputStream; public class MySerializer extends AbstractAvroEventSerializer<MySerializer.MyRecord> { /* private static final Schema SCHEMA = new Schema.Parser().parse( "{\"type\":\"record\", \"name\": \"MyRecord\", \"fields\": [" + "{\"name\": \"myHeader\", \"type\": \"string\"}, " + "{\"name\": \"body\", \"type\": \"bytes\"}" + "]}"); */ private static final Schema SCHEMA = SchemaBuilder.record("MyRecord") .fields() .optionalString("myHeader") .requiredBytes("body") .endRecord(); public static class MyRecord { private String myHeader; private byte[] body; public MyRecord(String myHeader, byte[] body) { this.myHeader = myHeader; this.body = body; } public String getMyHeader() { return myHeader; } public byte[] getBody() { return body; } } private final OutputStream out; public MySerializer(OutputStream out) { this.out = out; } @Override protected OutputStream getOutputStream() { return out; } @Override protected Schema getSchema() { SchemaBuilder.record("MyRecord") .fields() .nullableString("myHeader", null) .requiredBytes("body") .endRecord(); return SCHEMA; } @Override protected MySerializer.MyRecord convert(Event event) { return new MySerializer.MyRecord(event.getHeaders().get("my-header-key"), event.getBody()); } @SuppressWarnings("unused") // used in flume agent properties: a1.sinks.k1.sink.serializer = org.apache.flume.channel.kafka.MySerializer$Builder public static class Builder implements EventSerializer.Builder{ @Override public EventSerializer build(Context context, OutputStream out) { MySerializer mySerializer = new MySerializer(out); mySerializer.configure(context); return mySerializer; } } }
7. Build the jar with custom channer and serializer files and copy the jar to apache-flume-1.9.0-bin/lib
. If you want to build avro schema BuilderSchema
programmatically then you need to upgrade avro version to at least 1.7.5 in maven project and also in flume lib
directory. You do not need to upgrade avro if you define schema as String and parse it (commented approach).
8. Create flume agent configuration
me@Mac:~/apache-flume-1.9.0-bin$ cat conf/example-kafka-channel-roll-file-my-serializer-sink.conf
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source # no source # Describe the sink a1.sinks.k1.type = file_roll a1.sinks.k1.sink.directory = /tmp/flume a1.sinks.k1.sink.serializer = org.apache.flume.channel.kafka.MySerializer$Builder # Use a channel which buffers events in memory a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannelWithHeaders a1.channels.c1.kafka.bootstrap.servers = localhost:9092 a1.channels.c1.kafka.topic = my-topic a1.channels.c1.parseAsFlumeEvent = false # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
9. Start flume
me@Mac:~/apache-flume-1.9.0-bin$ bin/flume-ng agent --conf conf \ --conf-file conf/example-kafka-channel-roll-file-my-serializer-sink.conf --name a1 \ -Dflume.root.logger=DEBUG,console -Dorg.apache.flume.log.printconfig=true \ -Dorg.apache.flume.log.rawdata=true
or with debug
me@Mac:~/apache-flume-1.9.0-bin$ /Library/Java/JavaVirtualMachines/jdk1.8.0_241.jdk/Contents/Home/bin/java -Xdebug -Xrunjdwp:server=y,transport=dt_socket,address=5005,suspend=y -Xmx20m -Dflume.root.logger=DEBUG,console -Dorg.apache.flume.log.printconfig=true -Dorg.apache.flume.log.rawdata=true -cp '/Users/me/apache-flume-1.9.0-bin/conf:/Users/me/apache-flume-1.9.0-bin/lib/*' -Djava.library.path= org.apache.flume.node.Application --conf-file conf/example-kafka-channel-roll-file-my-serializer-sink.conf --name a1
see Intellij debugging local apache flume with netcat source and logger sink
10. Send data using java kafka producer Apache Kafka – producers and consumers – command line and java for details
11. View the ingested avro file content and schema:
me@Mac:/tmp/flume$ java -jar ~/Downloads/avro-tools-1.8.1.jar tojson 1602059778485-4
{"myHeader":{"string":"my-header-value"},"body":"hello kafka123"}
me@Mac:/tmp/flume$ java -jar ~/Downloads/avro-tools-1.8.1.jar getschema 1602059778485-4
{ "type" : "record", "name" : "MyRecord", "fields" : [ { "name" : "myHeader", "type" : [ "null", "string" ], "default" : null }, { "name" : "body", "type" : "bytes" } ] }
me@Mac:/tmp/flume java -jar ~/Downloads/avro-tools-1.8.1.jar cat 1602005189205-1
Objavro.schema?{"type":"record","name":"MyRecord","fields":[{"name":"myHeader","type":["null","string"],"default":null},{"name":"body","type":"bytes"}]}avro.c@b ?ֵG?.? t