1. Dowload, up-pack kafka_2.11-1.1.1.tgz and run zookeper and kafka server from command line:
me@Mac:~/kafka_2.11-1.1.1$ bin/zookeeper-server-start.sh config/zookeeper.properties
and
me@Mac:~/kafka_2.11-1.1.1$ bin/kafka-server-start.sh config/server.properties
and create my-topic topic
me@Mac:~/kafka_2.11-1.1.1$ bin/kafka-topics.sh --create \ --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-topic
2a. To send data to kafka use command line kafka console producer
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic
>hello from console producer
2b. To send data to kafka using Java, create a maven project with dependency:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.1.0</version> </dependency>
and add Java code:
package com.bawi.kafka; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.*; import java.util.HashMap; import java.util.Map; import java.util.concurrent.*; public class MyKafkaProducer { public static void main(String[] args) { String topic = "my-topic"; Map<String, Object> properties = new HashMap<>(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "host1:9092,host2:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); // Kerberos props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); props.put(SaslConfigs.SASL_MECHANISM, "GSSAPI"); props.put(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka"); System.setProperty("java.security.krb5.conf", "/path/to/krb5.conf"); // use SaslConfigs.SASL_JAAS_CONFIG or // System.setProperty("java.security.auth.login.config", "/path/to/jaas.conf") String jaasConfigValue = "com.sun.security.auth.module.Krb5LoginModule required " + "useKeyTab=true " + "debug=true " + "storeKey=true " + "renewTicket=true " + "keyTab=\"/path/to/me.keytab\" " + "principal=\"me@HADOOP.HOST.COM\";"; props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfigValue); props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "mypass1234"); props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/path/to/my.jks"); KafkaProducer<String, byte[]> kafkaProducer = new KafkaProducer<>(properties); byte[] data = "hello kafka".getBytes(); ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(topic, data); producerRecord.headers().add(new RecordHeader("my-header-key", "my-header-value".getBytes())); // asynchronously send data bytes to topic // optionally wait (blocking) on future get method to get metadata Future<RecordMetadata> metadataFuture = kafkaProducer.send(producerRecord); try { RecordMetadata metadata = metadataFuture.get(); System.out.println(String.format("RecordMetadata topic: %s, offset: %d", metadata.topic(), metadata.offset())); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
output from java program:
RecordMetadata topic: my-topic, offset: 0
3a. To read data from kafka run kafka console consumer
me@Mac:~/kafka_2.11-1.1.1$ bin/kafka-console-consumer.sh --topic my-topic \ --bootstrap-server localhost:9092
alternatively read to read from beginning instead of last offset use additional property --from-beginning
3b. To read data from kafka using java:
import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.*; import java.util.stream.Stream; import static java.util.stream.Collectors.joining; public class MyKafkaConsumer { public static void main(String[] args) { String topic = "my-topic"; Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "host1:9092,host2:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); // consumer specific props props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, numRecords); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 3000); props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // or specific //props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); // Kerberos props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); props.put(SaslConfigs.SASL_MECHANISM, "GSSAPI"); props.put(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka"); // use SaslConfigs.SASL_JAAS_CONFIG or // System.setProperty("java.security.auth.login.config", "/path/to/jaas.conf") String jaasConfigValue = "com.sun.security.auth.module.Krb5LoginModule required " + "useKeyTab=true " + "debug=true " + "storeKey=true " + "renewTicket=true " + "keyTab=\"/path/to/me.keytab\" " + "principal=\"me@HADOOP.HOST.COM\";"; System.setProperty("java.security.krb5.conf", "/path/to/krb5.conf"); props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfigValue); Consumer<String, byte[]> kafkaConsumer = new KafkaConsumer<>(properties); kafkaConsumer.subscribe(Collections.singletonList(topic)); // or optionally assign to all partitions List<TopicPartition> topicPartitions = kafkaConsumer.partitionsFor(topic) .stream() .map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition())) .collect(Collectors.toList()); kafkaConsumer.assign(topicPartitions); // for testing you can always move offset to begin or end // consumer.seekToEnd(topicPartitions); // consumer.seekToBeginning(topicPartitions); try { while (true) { ConsumerRecords<String, byte[]> consumerRecords = kafkaConsumer.poll(1000); consumerRecords.forEach(record -> System.out.println(String.format("Received record with headers: %s, offset: %d, value: %s", Stream.of(record.headers().toArray()) .map(h -> h.key() + "=" + new String(h.value())) .collect(joining(",")), record.offset(), new String(record.value())))); } } catch (Exception e) { e.printStackTrace(); } finally { kafkaConsumer.close(); } } }
output:
Received record with headers: my-header-key=my-header-value, offsetL 0, value: hello kafka
4, Removing items from kafka can be done by decreasing retention to e.g. 60s:
me@Mac:~/kafka_2.11-1.1.1$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter \ --entity-type topics --entity-name my-topic --add-config retention.ms=60000
Completed Updating config for entity: topic 'my-topic'.
and check the change
me@Mac:~/kafka_2.11-1.1.1$ bin/kafka-configs.sh --zookeeper localhost:2181 --describe \ --entity-type topics --entity-name my-topic
Configs for topic 'my-topic' are retention.ms=60000