Apache Kafka – producers and consumers – command line and java – Kerberos auth

1. Dowload, up-pack kafka_2.11-1.1.1.tgz and run zookeper and kafka server from command line:

me@Mac:~/kafka_2.11-1.1.1$ bin/zookeeper-server-start.sh config/zookeeper.properties

and

me@Mac:~/kafka_2.11-1.1.1$ bin/kafka-server-start.sh config/server.properties

and create my-topic topic

me@Mac:~/kafka_2.11-1.1.1$ bin/kafka-topics.sh --create \
  --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-topic

2a. To send data to kafka use command line kafka console producer

 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic
>hello from console producer

2b. To send data to kafka using Java, create a maven project with dependency:

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.1.0</version>
        </dependency>

and add Java code:

package com.bawi.kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.*;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;

public class MyKafkaProducer {
  public static void main(String[] args) {

    String topic = "my-topic";

    Map<String, Object> properties = new HashMap<>();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "host1:9092,host2:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);

    // Kerberos
    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
    props.put(SaslConfigs.SASL_MECHANISM, "GSSAPI");
    props.put(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka");

    System.setProperty("java.security.krb5.conf", "/path/to/krb5.conf");

    // use SaslConfigs.SASL_JAAS_CONFIG or
    // System.setProperty("java.security.auth.login.config", "/path/to/jaas.conf")
    String jaasConfigValue = "com.sun.security.auth.module.Krb5LoginModule required " +
            "useKeyTab=true " +
            "debug=true " +
            "storeKey=true " +
            "renewTicket=true " +
            "keyTab=\"/path/to/me.keytab\" " +
            "principal=\"me@HADOOP.HOST.COM\";";
    props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfigValue);

    props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "mypass1234");
    props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/path/to/my.jks");

    KafkaProducer<String, byte[]> kafkaProducer = new KafkaProducer<>(properties);


    byte[] data = "hello kafka".getBytes();
    ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(topic, data);
    producerRecord.headers().add(new RecordHeader("my-header-key", "my-header-value".getBytes()));

    // asynchronously send data bytes to topic
    // optionally wait (blocking) on future get method to get metadata
    Future<RecordMetadata> metadataFuture = kafkaProducer.send(producerRecord);  
    try {
      RecordMetadata metadata = metadataFuture.get();
      System.out.println(String.format("RecordMetadata topic: %s, offset: %d",
                    metadata.topic(), metadata.offset()));
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
  }
}

output from java program:

RecordMetadata topic: my-topic, offset: 0

3a. To read data from kafka run kafka console consumer

me@Mac:~/kafka_2.11-1.1.1$ bin/kafka-console-consumer.sh --topic my-topic \
  --bootstrap-server localhost:9092

alternatively read to read from beginning instead of last offset use additional property --from-beginning

3b. To read data from kafka using java:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;
import java.util.stream.Stream;

import static java.util.stream.Collectors.joining;

public class MyKafkaConsumer {
  public static void main(String[] args) {

    String topic = "my-topic";

    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "host1:9092,host2:9092");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

    // consumer specific props
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,  numRecords);
    props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 3000);

    props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    // or specific 
    //props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");

    // Kerberos
    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
    props.put(SaslConfigs.SASL_MECHANISM, "GSSAPI");
    props.put(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka");

    // use SaslConfigs.SASL_JAAS_CONFIG or
    // System.setProperty("java.security.auth.login.config", "/path/to/jaas.conf")

    String jaasConfigValue = "com.sun.security.auth.module.Krb5LoginModule required " +
            "useKeyTab=true " +
            "debug=true " +
            "storeKey=true " +
            "renewTicket=true " +
            "keyTab=\"/path/to/me.keytab\" " +
            "principal=\"me@HADOOP.HOST.COM\";";

    System.setProperty("java.security.krb5.conf", "/path/to/krb5.conf");

    props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfigValue);

    Consumer<String, byte[]> kafkaConsumer = new KafkaConsumer<>(properties);
    kafkaConsumer.subscribe(Collections.singletonList(topic));

    // or optionally assign to all partitions
    List<TopicPartition> topicPartitions = kafkaConsumer.partitionsFor(topic)
            .stream()
            .map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition()))
            .collect(Collectors.toList());    
    kafkaConsumer.assign(topicPartitions);

// for testing you can always move offset to begin or end 
//        consumer.seekToEnd(topicPartitions);
//        consumer.seekToBeginning(topicPartitions);

    try {
      while (true) {
        ConsumerRecords<String, byte[]> consumerRecords = kafkaConsumer.poll(1000);
        consumerRecords.forEach(record
          -> System.out.println(String.format("Received record with headers: %s, offset: %d, value: %s",
               Stream.of(record.headers().toArray())
                     .map(h -> h.key() + "=" + new String(h.value()))
                     .collect(joining(",")),
               record.offset(),
               new String(record.value()))));
      }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        kafkaConsumer.close();
    }
  }
}

output:

Received record with headers: my-header-key=my-header-value, offsetL 0, value: hello kafka

4, Removing items from kafka can be done by decreasing retention to e.g. 60s:

me@Mac:~/kafka_2.11-1.1.1$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter \
  --entity-type topics --entity-name my-topic --add-config retention.ms=60000
Completed Updating config for entity: topic 'my-topic'.

and check the change

me@Mac:~/kafka_2.11-1.1.1$ bin/kafka-configs.sh --zookeeper localhost:2181 --describe \
  --entity-type topics --entity-name  my-topic
Configs for topic 'my-topic' are retention.ms=60000

Leave a comment