Category: kafka

Kafka console producer and consumer with Kerberos, Sentry and IDM

1. IDM – create user login myuser
2. IDM – create user group kafka_dev_myuser_rw_grp, kafka_dev_myuser_ro_grp and add user login myuser to that group
so

bash-4.2$ id myuser
uid=12345678(myuser) gid=12345678(myuser), 87654321(kafka_dev_myuser_rw_grp), 87654320(kafka_dev_myuser_ro_grp)

3. Hue/Hive sql create role:

0: jdbc:hive2://host> CREATE ROLE kafka_dev_myuser_rw_role; 
0: jdbc:hive2://host> GRANT  ROLE kafka_dev_myuser_rw_role TO GROUP kafka_dev_myuser_rw_grp;
0: jdbc:hive2://host> SHOW ROLE GRANT GROUP kafka_dev_myuser_rw_grp;
+---------------------------+---------------+-------------+----------+--+
|           role            | grant_option  | grant_time  | grantor  |
+---------------------------+---------------+-------------+----------+--+
| kafka_dev_myuser_rw_role  | false         | NULL        | --       |
+---------------------------+---------------+-------------+----------+--+
0: jdbc:hive2://host> CREATE ROLE kafka_dev_myuser_ro_role; 
0: jdbc:hive2://host> GRANT  ROLE kafka_dev_myuser_ro_role TO GROUP kafka_dev_myuser_ro_grp;
0: jdbc:hive2://host> SHOW ROLE GRANT GROUP kafka_dev_myuser_ro_grp;
+---------------------------+---------------+-------------+----------+--+
|           role            | grant_option  | grant_time  | grantor  |
+---------------------------+---------------+-------------+----------+--+
| kafka_dev_myuser_ro_role  | false         | NULL        | --       |
+---------------------------+---------------+-------------+----------+--+
1 row selected (0.106 seconds)

4. Setup topic and producer and consumer group access for roles:

cd /run/cloudera-scm-agent/process/1234-kafka-KAFKA_BROKER/
kinit -kt kafka.keytab kafka/host.hadoop.com@HADOOP.COM
kafka-sentry -lp -r kafka_dev_myuser_rw_role
HOST=*->TOPIC=my-topic->action=write
HOST=*->TOPIC=my-topic->action=describe

or set action to all

kafka-sentry -lp -r kafka_dev_myuser_ro_role
HOST=*->TOPIC=my-topic->action=describe
HOST=*->TOPIC=my-topic->action=read
HOST=*->CONSUMERGROUP=*->action=describe
HOST=*->CONSUMERGROUP=*->action=read

optionally host and consumer group can be restricted to specific values:

HOST=127.0.0.1->TOPIC=my-topic->action=read
HOST=10.0.0.123->CONSUMERGROUP=my-flume-group->action=read

5. Send data using Kafka console producer:
Init kerberos keytab session:
kinit -kt /path/to/me.keytab me@HADOOP.COM
If that fails due to expired password, change the password and re – create / generate kerberos keytab:
ipa-getkeytab -s my-kdc-server.hadoop.com -p me@HADOOP.COM -P -k me.keytab

Create jks file based on crt file:

keytool -importcert -file /path/to/gsnonpublicroot2.crt -keystore /path/to/my.jks 
Enter keystore password:  mypass1234
 
Trust this certificate? [no]:  yes
Certificate was added to keystore

producer.properties:

security.protocol=SASL_SSL
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
ssl.truststore.location=/path/to/my.jks
ssl.truststore.password=mypass1234
export KAFKA_OPTS="-Djava.security.auth.login.config=jaas.conf"
kafka-console-producer --broker-list host1:9093,host2:9093 --topic my-topic --producer.config producer.properties
20/12/14 03:49:05 INFO producer.ProducerConfig: ProducerConfig values: 
...
Debug is  true storeKey true useTicketCache false useKeyTab true doNotPrompt false ticketCache is null isInitiator true KeyTab is /path/to/me.keytab refreshKrb5Config is false principal is me@HADOOP.COM tryFirstPass is false useFirstPass is false storePass is false clearPass is false
principal is me@HADOOP.COM
Will use keytab
Commit Succeeded 
...
20/12/14 03:49:06 INFO authenticator.AbstractLogin: Successfully logged in.
20/12/14 03:49:06 INFO utils.AppInfoParser: Kafka version : 1.0.1-kafka-3.1.1
20/12/14 03:49:06 INFO utils.AppInfoParser: Kafka commitId : unknown
>hello from bartek
>^

6. Read 2 messages using kafka console consumer:
It does not require ksession to be present: klist: No credentials cache found

consumer.properties:

security.protocol=SASL_SSL
sasl.kerberos.service.name=kafka
sasl.mechanism=GSSAPI
#group.id=my-consumer-group
#auto.offset.reset=earliest

jaas.conf:

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  debug=true
  storeKey=true
  useTicketCache=false
  renewTicket=true
  keyTab="/path/to/me.keytab"
  principal="me@HADOOP.COM";
};
export KAFKA_OPTS="-Djava.security.auth.login.config=/path/to/jaas.conf"
kafka-console-consumer --bootstrap-server host1:9093,host2:9093 --consumer.config /path/to/consumer.properties --topic my-topic --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --max-messages 2
Debug is  true storeKey true useTicketCache false useKeyTab true doNotPrompt false ticketCache is null isInitiator true KeyTab is /path/to/me.keytab refreshKrb5Config is false principal is me@HADOOP.COM tryFirstPass is false useFirstPass is false storePass is false clearPass is false
principal is me@HADOOP.COM
Will use keytab
Commit Succeeded
Output data

Big Data Ingestion with apache Kafka and Flume and custom avro data serialization

1. Start zookeeper

me@Mac:~/kafka_2.11-1.1.1$ rm -rf /tmp/zookeeper/ && bin/zookeeper-server-start.sh config/zookeeper.properties

2. Start kafka

me@Mac:~/kafka_2.11-1.1.1$ rm -rf /tmp/kafka-logs && bin/kafka-server-start.sh config/server.properties

3. Create topic

me@Mac:~/kafka_2.11-1.1.1$ bin/kafka-topics.sh --create --zookeeper localhost:2181 \
  --replication-factor 1 --partitions 1 --topic my-topic

4. Create maven project with:

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <scope>compile</scope>
        <flume.version>1.9.0</flume.version>
        <avro.version>1.7.5</avro.version> <!-- org.apache.avro.SchemaBuilder available since 1.7.5 -->
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro</artifactId>
                <version>${avro.version}</version>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>${flume.version}</version>
            <scope>${scope}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flume.flume-ng-channels</groupId>
            <artifactId>flume-kafka-channel</artifactId>
            <version>${flume.version}</version>
            <scope>${scope}</scope>
        </dependency>
    </dependencies>

5. Create KafkaChannelWithHeaders based on KafkaChannel using diff:

me@Mac:/tmp$ curl https://repo1.maven.org/maven2/org/apache/flume/flume-ng-channels/flume-kafka-channel/1.9.0/flume-kafka-channel-1.9.0-sources.jar -O | \
  unzip -p flume-kafka-channel-1.9.0-sources.jar \
    org/apache/flume/channel/kafka/KafkaChannel.java > KafkaChannel.java && \
  diff --ignore-space-change KafkaChannel.java \
    ~/dev/my-apache-kafka/src/main/java/org/apache/flume/channel/kafka/KafkaChannelWithHeaders.java 
55a56
> import org.apache.kafka.common.header.Header;
61a63
> 
66d67
 import java.util.function.Function;
> import java.util.stream.Collectors;
> import java.util.stream.Stream;
83c87
 public class KafkaChannelWithHeaders extends BasicChannelSemantics {
502c506
             e = deserializeValue(record, parseAsFlumeEvent);
631c635,636
         private Event deserializeValue(ConsumerRecord record, boolean parseAsFlumeEvent) throws IOException {
>             byte[] value = record.value();
645c650,654
             Header[] headers = record.headers().toArray();
>             Map headerKeyValues = 
>                         headers == null ?
>                         Collections.emptyMap() :
>                         Stream.of(headers).collect(Collectors.toMap(Header::key, header -> new String(header.value())));
>             e = EventBuilder.withBody(value, headerKeyValues);
694c703
             records = consumer.poll(pollTimeout);

6. Create MySerializer

package org.apache.flume.channel.kafka;

import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.serialization.AbstractAvroEventSerializer;
import org.apache.flume.serialization.EventSerializer;

import java.io.OutputStream;

public class MySerializer extends AbstractAvroEventSerializer<MySerializer.MyRecord> {

/*
    private static final Schema SCHEMA = new Schema.Parser().parse(
            "{\"type\":\"record\", \"name\": \"MyRecord\", \"fields\": [" +
                    "{\"name\": \"myHeader\", \"type\": \"string\"}, " +
                    "{\"name\": \"body\", \"type\": \"bytes\"}" +
               "]}");
*/
    private static final Schema SCHEMA =
            SchemaBuilder.record("MyRecord")
                    .fields()
                    .optionalString("myHeader")
                    .requiredBytes("body")
                .endRecord();

    public static class MyRecord {
        private String myHeader;
        private byte[] body;

        public MyRecord(String myHeader, byte[] body) {
            this.myHeader = myHeader;
            this.body = body;
        }

        public String getMyHeader() {
            return myHeader;
        }

        public byte[] getBody() {
            return body;
        }
    }

    private final OutputStream out;

    public MySerializer(OutputStream out) {
        this.out = out;
    }

    @Override
    protected OutputStream getOutputStream() {
        return out;
    }

    @Override
    protected Schema getSchema() {
        SchemaBuilder.record("MyRecord")
                .fields()
                    .nullableString("myHeader", null)
                    .requiredBytes("body")
                .endRecord();
        return SCHEMA;
    }

    @Override
    protected MySerializer.MyRecord convert(Event event) {
        return new MySerializer.MyRecord(event.getHeaders().get("my-header-key"), event.getBody());
    }

    @SuppressWarnings("unused") // used in flume agent properties: a1.sinks.k1.sink.serializer = org.apache.flume.channel.kafka.MySerializer$Builder
    public static class Builder implements EventSerializer.Builder{

        @Override
        public EventSerializer build(Context context, OutputStream out) {
            MySerializer mySerializer = new MySerializer(out);
            mySerializer.configure(context);
            return mySerializer;
        }
    }
}

7. Build the jar with custom channer and serializer files and copy the jar to apache-flume-1.9.0-bin/lib. If you want to build avro schema BuilderSchema programmatically then you need to upgrade avro version to at least 1.7.5 in maven project and also in flume lib directory. You do not need to upgrade avro if you define schema as String and parse it (commented approach).

8. Create flume agent configuration

me@Mac:~/apache-flume-1.9.0-bin$ cat conf/example-kafka-channel-roll-file-my-serializer-sink.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
# no source

# Describe the sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /tmp/flume
a1.sinks.k1.sink.serializer = org.apache.flume.channel.kafka.MySerializer$Builder

# Use a channel which buffers events in memory
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannelWithHeaders
a1.channels.c1.kafka.bootstrap.servers = localhost:9092
a1.channels.c1.kafka.topic = my-topic
a1.channels.c1.parseAsFlumeEvent = false

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

9. Start flume

me@Mac:~/apache-flume-1.9.0-bin$ bin/flume-ng agent --conf conf \
  --conf-file conf/example-kafka-channel-roll-file-my-serializer-sink.conf --name a1 \
  -Dflume.root.logger=DEBUG,console -Dorg.apache.flume.log.printconfig=true \
  -Dorg.apache.flume.log.rawdata=true

or with debug

me@Mac:~/apache-flume-1.9.0-bin$ /Library/Java/JavaVirtualMachines/jdk1.8.0_241.jdk/Contents/Home/bin/java -Xdebug -Xrunjdwp:server=y,transport=dt_socket,address=5005,suspend=y -Xmx20m -Dflume.root.logger=DEBUG,console -Dorg.apache.flume.log.printconfig=true -Dorg.apache.flume.log.rawdata=true -cp '/Users/me/apache-flume-1.9.0-bin/conf:/Users/me/apache-flume-1.9.0-bin/lib/*' -Djava.library.path= org.apache.flume.node.Application --conf-file conf/example-kafka-channel-roll-file-my-serializer-sink.conf --name a1

see Intellij debugging local apache flume with netcat source and logger sink

10. Send data using java kafka producer Apache Kafka – producers and consumers – command line and java for details

11. View the ingested avro file content and schema:

me@Mac:/tmp/flume$ java -jar ~/Downloads/avro-tools-1.8.1.jar tojson 1602059778485-4
{"myHeader":{"string":"my-header-value"},"body":"hello kafka123"}
me@Mac:/tmp/flume$ java -jar ~/Downloads/avro-tools-1.8.1.jar getschema 1602059778485-4
{
  "type" : "record",
  "name" : "MyRecord",
  "fields" : [ {
    "name" : "myHeader",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "body",
    "type" : "bytes"
  } ]
}
me@Mac:/tmp/flume java -jar ~/Downloads/avro-tools-1.8.1.jar cat 1602005189205-1
Objavro.schema?{"type":"record","name":"MyRecord","fields":[{"name":"myHeader","type":["null","string"],"default":null},{"name":"body","type":"bytes"}]}avro.c@b ?ֵG?.? t

Apache Kafka – producers and consumers – command line and java – Kerberos auth

1. Dowload, up-pack kafka_2.11-1.1.1.tgz and run zookeper and kafka server from command line:

me@Mac:~/kafka_2.11-1.1.1$ bin/zookeeper-server-start.sh config/zookeeper.properties

and

me@Mac:~/kafka_2.11-1.1.1$ bin/kafka-server-start.sh config/server.properties

and create my-topic topic

me@Mac:~/kafka_2.11-1.1.1$ bin/kafka-topics.sh --create \
  --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-topic

2a. To send data to kafka use command line kafka console producer

 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic
>hello from console producer

2b. To send data to kafka using Java, create a maven project with dependency:

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.1.0</version>
        </dependency>

and add Java code:

package com.bawi.kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.*;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;

public class MyKafkaProducer {
  public static void main(String[] args) {

    String topic = "my-topic";

    Map<String, Object> properties = new HashMap<>();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "host1:9092,host2:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);

    // Kerberos
    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
    props.put(SaslConfigs.SASL_MECHANISM, "GSSAPI");
    props.put(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka");

    System.setProperty("java.security.krb5.conf", "/path/to/krb5.conf");

    // use SaslConfigs.SASL_JAAS_CONFIG or
    // System.setProperty("java.security.auth.login.config", "/path/to/jaas.conf")
    String jaasConfigValue = "com.sun.security.auth.module.Krb5LoginModule required " +
            "useKeyTab=true " +
            "debug=true " +
            "storeKey=true " +
            "renewTicket=true " +
            "keyTab=\"/path/to/me.keytab\" " +
            "principal=\"me@HADOOP.HOST.COM\";";
    props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfigValue);

    props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "mypass1234");
    props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/path/to/my.jks");

    KafkaProducer<String, byte[]> kafkaProducer = new KafkaProducer<>(properties);


    byte[] data = "hello kafka".getBytes();
    ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(topic, data);
    producerRecord.headers().add(new RecordHeader("my-header-key", "my-header-value".getBytes()));

    // asynchronously send data bytes to topic
    // optionally wait (blocking) on future get method to get metadata
    Future<RecordMetadata> metadataFuture = kafkaProducer.send(producerRecord);  
    try {
      RecordMetadata metadata = metadataFuture.get();
      System.out.println(String.format("RecordMetadata topic: %s, offset: %d",
                    metadata.topic(), metadata.offset()));
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
  }
}

output from java program:

RecordMetadata topic: my-topic, offset: 0

3a. To read data from kafka run kafka console consumer

me@Mac:~/kafka_2.11-1.1.1$ bin/kafka-console-consumer.sh --topic my-topic \
  --bootstrap-server localhost:9092

alternatively read to read from beginning instead of last offset use additional property --from-beginning

3b. To read data from kafka using java:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;
import java.util.stream.Stream;

import static java.util.stream.Collectors.joining;

public class MyKafkaConsumer {
  public static void main(String[] args) {

    String topic = "my-topic";

    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "host1:9092,host2:9092");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

    // consumer specific props
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,  numRecords);
    props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 3000);

    props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    // or specific 
    //props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");

    // Kerberos
    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
    props.put(SaslConfigs.SASL_MECHANISM, "GSSAPI");
    props.put(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka");

    // use SaslConfigs.SASL_JAAS_CONFIG or
    // System.setProperty("java.security.auth.login.config", "/path/to/jaas.conf")

    String jaasConfigValue = "com.sun.security.auth.module.Krb5LoginModule required " +
            "useKeyTab=true " +
            "debug=true " +
            "storeKey=true " +
            "renewTicket=true " +
            "keyTab=\"/path/to/me.keytab\" " +
            "principal=\"me@HADOOP.HOST.COM\";";

    System.setProperty("java.security.krb5.conf", "/path/to/krb5.conf");

    props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfigValue);

    Consumer<String, byte[]> kafkaConsumer = new KafkaConsumer<>(properties);
    kafkaConsumer.subscribe(Collections.singletonList(topic));

    // or optionally assign to all partitions
    List<TopicPartition> topicPartitions = kafkaConsumer.partitionsFor(topic)
            .stream()
            .map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition()))
            .collect(Collectors.toList());    
    kafkaConsumer.assign(topicPartitions);

// for testing you can always move offset to begin or end 
//        consumer.seekToEnd(topicPartitions);
//        consumer.seekToBeginning(topicPartitions);

    try {
      while (true) {
        ConsumerRecords<String, byte[]> consumerRecords = kafkaConsumer.poll(1000);
        consumerRecords.forEach(record
          -> System.out.println(String.format("Received record with headers: %s, offset: %d, value: %s",
               Stream.of(record.headers().toArray())
                     .map(h -> h.key() + "=" + new String(h.value()))
                     .collect(joining(",")),
               record.offset(),
               new String(record.value()))));
      }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        kafkaConsumer.close();
    }
  }
}

output:

Received record with headers: my-header-key=my-header-value, offsetL 0, value: hello kafka

4, Removing items from kafka can be done by decreasing retention to e.g. 60s:

me@Mac:~/kafka_2.11-1.1.1$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter \
  --entity-type topics --entity-name my-topic --add-config retention.ms=60000
Completed Updating config for entity: topic 'my-topic'.

and check the change

me@Mac:~/kafka_2.11-1.1.1$ bin/kafka-configs.sh --zookeeper localhost:2181 --describe \
  --entity-type topics --entity-name  my-topic
Configs for topic 'my-topic' are retention.ms=60000