2. Create org.apache.spark.rdd.RDD / org.apache.spark.api.java.JavaRDD, rdd has a partitioned collections of objects (without a schema)
Using directly sparkContext to parallelize scala.collection.Seq[A]:
Scala:
val stringRDD: RDD[String] = sparkSession.sparkContext.parallelize(Seq("a", "bb", "ccc"))
val rowRDD: RDD[Row] = stringRDD.map(string => RowFactory.create(string))
case class Person(name: String, age: Integer) // defined outside main method
val personRDD: RDD[Person] = stringRDD.map(string => Person(string, new java.util.Random().nextInt(100)))
personRDD.foreach(p => println(p))
personRDD.foreach(println) // same as above
output:
Person(ccc,63)
Person(a,50)
Person(bb,93)
Use JavaSparkContext wrapping sparkContext to allow parallelize java.util.List<T>
Java:
JavaRDD<String> stringJavaRDD
= new JavaSparkContext(sparkSession.sparkContext()).parallelize(Arrays.asList("a", "bb", "ccc"));
JavaRDD<Row> rowJavaRDD = stringJavaRDD.map(string -> RowFactory.create(string));
public static class Person implements Serializable { // class need to be public static outside main method
private String name;
private int age;
public Person() { } // default construction and setters/getters are required by spark
public Person(String name, int age) { this.name = name; this.age = age; }
public void setName(String name) { this.name = name; }
public void setAge(int age) { this.age = age; }
public String getName() { return name; }
public int getAge() { return age; }
@Override public String toString() { return "Person{name='" + name + "',age='" + age + "'}"; }
}
JavaRDD<Person> personJavaRDD = stringJavaRDD.map(name -> new Person(name, new Random().nextInt(100)));
personJavaRDD.foreach(p -> System.out.println(p)); // no .show() method on rdd
// personJavaRDD.foreach(System.out::println); // java.io.NotSerializableException: java.io.PrintStream
// using scala api in java requires scala imports and lots of boilerplate code:
Iterator<String> iterator = Arrays.asList("a", "b").iterator();
// import scala.collection.JavaConverters;
// import scala.collection.Seq;
// import scala.reflect.ClassManifestFactory;
// import scala.reflect.ClassTag$;
Seq<String> seq = JavaConverters.asScalaIteratorConverter(iterator).asScala().toSeq();
RDD<String> rdd = sparkSession.sparkContext().parallelize(seq, 2,
ClassTag$.MODULE$.apply(String.class));
// using explicit scala code in java (instead of implicit import)
implicits$ implicits = sparkSession.implicits(); // compiles (but highlighed as error in Intellij, no warnings in Eclipse Scala IDE)
Encoder<String> stringEncoder = implicits.newStringEncoder();
Dataset<Row> rowDataset1 = implicits.localSeqToDatasetHolder(stringSeq, stringEncoder).toDF();
3. Create org.apache.spark.sql.DataFrame and org.apache.spark.sql.Dataset: Dataframe is not available in Java because Dataframe is a scala alias for Dataset[Row]. Dataset is typed. Dataset typed with Row (DataSet<Row>) is Dataframe.
Scala:
// .toDF/.toDS requires sparkSession.implicits._
import sparkSession.implicits._
// 1. from rdd
val df: DataFrame = rdd.toDF() // defaults to `value` as column name: value: string (nullable = true)
val df2: DataFrame = rdd.toDF("str_name") // explicitly set column name: str_name: string (nullable = true)
val ds: Dataset[String] = rdd.toDS() // defaults to `value` as column name: value: string (nullable = true)
val personDF: DataFrame = personRDD.toDF()
personDF.printSchema() // both Dataset and Dataframe have schema associated
personDF.show()
// 2. from Seq
val df11: DataFrame = Seq("X", "Y", "Z").toDF()
val df12: DataFrame = Seq("X", "Y", "Z").toDF("my_string")
val ds11: Dataset[String] = Seq("X", "Y", "Z").toDS()
val personDf: DataFrame = Seq(Person("Bob", 21), Person("Alice", 20)).toDF()
val personDs: Dataset[Person] = Seq(Person("Bob", 21), Person("Alice", 20)).toDS()
// 3. from sparkSession.createDataframe/createDataset
// a) using RDD[Row] and Schema StructType
val df31: DataFrame = sparkSession.createDataFrame(rowRDD, StructType(Seq(StructField("my_column_name", StringType))))
// dataset can be created from RDD, Seq, util.List of type T and implicit Encoder[T]
val ds31: Dataset[Person] = sparkSession.createDataset(util.Arrays.asList(Person("Bob", 21)))
// 4. from / to Dataset / Dataframe:
val ds_2: Dataset[Person] = personDf.as[Person]
val df_2: DataFrame = personDs.toDF()
val rowRDD: RDD[Row] = stringRDD.map(string => RowFactory.create(string))
val rowRDD2: RDD[Row] = stringRDD.map(RowFactory.create(_)) // as above
val intDataset: Dataset[Int] = stringDataset.map(string => string.length)
val intDataset2: Dataset[Int] = stringDataset.map(_.length) // as above
Java:
// Mapping RDD
// java functional interface org.apache.spark.api.java.function.Function
JavaRDD<Row> rowJavaRDD = stringJavaRDD.map(new Function<String, Row>() {
@Override
public Row call(String string) throws Exception {
return RowFactory.create(string);
}
});
// lambda
JavaRDD<Row> rowJavaRDD2 = stringJavaRDD.map(RowFactory::create); // as above
stringJavaRDD.foreach(s -> System.out.println(s));
// java.io.NotSerializableException: java.io.PrintStream
// stringJavaRDD.foreach(System.out::println);
RDD<Row> rowRDD = rowJavaRDD.rdd();
// requires import of scala.reflect.ClassManifestFactory and scala.runtime.AbstractFunction1
RDD<Row> rowRDD1 = rowRDD.map(new AbstractFunction1<Row, Row>() {
@Override
public Row apply(Row row) {
return row;
}
}, ClassManifestFactory.fromClass(Row.class));
// Mapping Dataset - requires explicit encoder of mapping output type (in scala it used implicit encoder)
// org.apache.spark.api.java.function.MapFunction (from java package)
dataset.map(new MapFunction<String, Integer>() {
@Override
public Integer call(String value) throws Exception {
return value.length();
}
}, Encoders.INT());
// lambda usage as MapFunction is functional interface
// need to cast to MapFunction to avoid ambiguous method call
dataset.map((MapFunction<String, Integer>) String::length, Encoders.INT());
// compilation error - scala.Function1 is scala specialized and requires implementing additional abstract method apply$mcVJ$sp(long)1
/*
dataset.map(new Function1<String, Integer>() {
@Override
public Integer apply(String string) {
return string.length();
}
}, Encoders.INT());
*/
// compilation error: incompatible types: scala.Function1 is not a functional interface multiple non-overriding abstract methods found in interface scala.Function1
// dataset.map((Function1<String, Integer>) String::length, Encoders.INT());
// Ambiguous method call conflicting with map(Function1)
// dataset.map(value -> value.length(), Encoders.INT())
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
public class MySchemaBuilder {
public static void main(String[] args) throws IOException {
Schema schema = SchemaBuilder.record("myRecordName").fields()
.requiredInt("myRequiredInt")
//.name("myRequiredInt2").type().intType().noDefault()
.optionalDouble("myOptionalDouble")
//.name("myOptionalDouble2").type().optional().doubleType()
.nullableString("myNullableString", "myNullableStringDefaultValue")
//.name("myNullableString2").type().nullable().stringType().stringDefault("myNullableStringDefaultValue2")
.name("myRequiredArrayLongs").type().array().items().longType().noDefault()
.name("myRequiredSubRecord")
.type(
SchemaBuilder.record("myRequiredSubRecordType").fields().requiredFloat("myRequiredFloat").endRecord()
).noDefault()
.name("myOptionalArraySubRecords").type().nullable().array()
.items(
SchemaBuilder.record("myOptionalArraySubRecordType").fields().requiredBoolean("myRequiredBoolean").endRecord()
).noDefault()
.endRecord();
System.out.println(schema);
File file = new File("my-file.avro");
GenericData.Record record = new GenericData.Record(schema);
record.put("myRequiredInt", 1);
record.put("myOptionalDouble", 2.2d); // this line can be commented since optional long
record.put("myNullableString", "abc"); // this line can be commented since optional string
record.put("myRequiredArrayLongs", Arrays.asList(1L, 2L, 3L)); // required otherwise java.lang.NullPointerException: null of array in field myRequiredArrayLongs of myRecordName
GenericData.Record myRequiredSubRecord = new GenericData.Record(schema.getField("myRequiredSubRecord").schema());
myRequiredSubRecord.put("myRequiredFloat", 1.0f); // required otherwise null of int in field myRequiredFloat of myRequiredSubRecordType in field myRequiredSubRecord of myRecordName
record.put("myRequiredSubRecord", myRequiredSubRecord); // required otherwise java.lang.NullPointerException: null of myRequiredSubRecordType in field myRequiredSubRecord of myRecordName
GenericData.Record myOptionalArraySubRecord = new GenericData.Record(schema.getField("myOptionalArraySubRecords").schema().getTypes().get(0).getElementType());
myOptionalArraySubRecord.put("myRequiredBoolean", true);
record.put("myOptionalArraySubRecords", Arrays.asList(myOptionalArraySubRecord, myOptionalArraySubRecord));
GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(genericDatumWriter);
dataFileWriter.create(schema, file);
dataFileWriter.append(record);
dataFileWriter.close();
}
}
INFO: 2020-11-18T12:55:59.104Z: Starting 1 workers...
Nov 18, 2020 1:56:00 PM org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2020-11-18T12:56:01.092Z: Pub/Sub resources set up for topic 'projects/xyz-123/topics/bartek-test-topic'.
Nov 18, 2020 1:56:04 PM org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
Nov 18, 2020 1:56:45 PM org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2020-11-18T12:56:41.647Z: Worker configuration: n1-standard-4 in us-central1-f.
Nov 18, 2020 1:56:56 PM org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2020-11-18T12:56:56.738Z: Workers have started successfully.
Nov 18, 2020 2:38:16 PM org.apache.beam.runners.dataflow.DataflowPipelineJob lambda$waitUntilFinish$0
WARNING: Job is already running in Google Cloud Platform, Ctrl-C will not cancel it.
To cancel the job in the cloud, run:
> gcloud dataflow jobs --project=xyz-123 cancel --region=us-central1 2020-11-18_04_55_47-17323678150575849943
AH00558: apache2: Could not reliably determine the server's fully qualified domain name, using 127.0.1.1. Set the 'ServerName' directive globally to suppress this message
Syntax OK
######################
# Become a Certificate Authority
######################
# Generate private key
openssl genrsa -des3 -out myCA.key 2048
Generating RSA private key, 2048 bit long modulus (2 primes)
.........................+++++
.............................................................................................+++++
e is 65537 (0x010001)
Enter pass phrase for myCA.key:
Verifying - Enter pass phrase for myCA.key:
Enter pass phrase for myCA.key:
Can't load /home/me/.rnd into RNG
140349095199168:error:2406F079:random number generator:RAND_load_file:Cannot open file:../crypto/rand/randfile.c:88:Filename=/home/me/.rnd
You are about to be asked to enter information that will be incorporated
into your certificate request.
What you are about to enter is what is called a Distinguished Name or a DN.
There are quite a few fields but you can leave some blank
For some fields there will be a default value,
If you enter '.', the field will be left blank.
-----
Country Name (2 letter code) [AU]:PL
State or Province Name (full name) [Some-State]:Malopolskie
Locality Name (eg, city) []:Krakow
Organization Name (eg, company) [Internet Widgits Pty Ltd]:My Certificate Authority Company
Organizational Unit Name (eg, section) []:My Certificate Authority Company Department
Common Name (e.g. server FQDN or YOUR name) []:My CA
Email Address []:myCA@localhost
########################
# Create CA-signed certs
########################
NAME=localhost # Use your own domain name e.g. domain.com
# Generate a private key
openssl genrsa -out $NAME.key 2048
Generating RSA private key, 2048 bit long modulus (2 primes)
...................................+++++
........................+++++
e is 65537 (0x010001)
Can't load /home/me/.rnd into RNG
139631538540992:error:2406F079:random number generator:RAND_load_file:Cannot open file:../crypto/rand/randfile.c:88:Filename=/home/me/.rnd
You are about to be asked to enter information that will be incorporated
into your certificate request.
What you are about to enter is what is called a Distinguished Name or a DN.
There are quite a few fields but you can leave some blank
For some fields there will be a default value,
If you enter '.', the field will be left blank.
-----
Country Name (2 letter code) [AU]:PL
State or Province Name (full name) [Some-State]:Malopolskie
Locality Name (eg, city) []:Krakow
Organization Name (eg, company) [Internet Widgits Pty Ltd]:My Company
Organizational Unit Name (eg, section) []:My Company Department
Common Name (e.g. server FQDN or YOUR name) []:localhost
Email Address []:me@localhost
Please enter the following 'extra' attributes
to be sent with your certificate request
A challenge password []:
An optional company name []:
# Create a config file for the extensions
>$NAME.ext cat <<-EOF
authorityKeyIdentifier=keyid,issuer
basicConstraints=CA:FALSE
keyUsage = digitalSignature, nonRepudiation, keyEncipherment, dataEncipherment
subjectAltName = @alt_names
[alt_names]
DNS.1 = $NAME # Be sure to include the domain name here because Common Name is not so commonly honoured by itself
DNS.2 = bar.$NAME # Optionally, add additional domains (I've added a subdomain here)
IP.1 = 10.0.2.15 # Optionally, add an IP address (if the connection which you have planned requires it)
EOF
# Create the signed certificate
openssl x509 -req -in $NAME.csr -CA myCA.pem -CAkey myCA.key -CAcreateserial \
-out $NAME.crt -days 825 -sha256 -extfile $NAME.ext
Signature ok
subject=C = PL, ST = Malopolskie, L = Krakow, O = My Company, OU = My Company Department, CN = localhost, emailAddress = me@localhost
Getting CA Private Key
Enter pass phrase for myCA.key:
# Check your work
openssl verify -CAfile myCA.pem -verify_hostname bar.localhost localhost.crt
Secure Connection Failed An error occurred during a connection to localhost. SSL received a record that exceeded the maximum permissible length. Error code: SSL_ERROR_RX_RECORD_TOO_LONG
# enable default-ssl conf, so that /etc/apache2/sites-enabled/ will have a symlink to default-ssl.conf -> ../sites-available/default-ssl.conf
sudo a2ensite default-ssl
sudo systemctl restart apache2
curl -vv https://localhost
* Rebuilt URL to: https://localhost/
* Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 443 (#0)
* ALPN, offering h2
* ALPN, offering http/1.1
* successfully set certificate verify locations:
* CAfile: /etc/ssl/certs/ca-certificates.crt
CApath: /etc/ssl/certs
* TLSv1.3 (OUT), TLS handshake, Client hello (1):
* TLSv1.3 (IN), TLS handshake, Server hello (2):
* TLSv1.3 (IN), TLS Unknown, Certificate Status (22):
* TLSv1.3 (IN), TLS handshake, Unknown (8):
* TLSv1.3 (IN), TLS Unknown, Certificate Status (22):
* TLSv1.3 (IN), TLS handshake, Certificate (11):
* TLSv1.3 (OUT), TLS alert, Server hello (2):
* SSL certificate problem: unable to get local issuer certificate
* stopped the pause stream!
* Closing connection 0
curl: (60) SSL certificate problem: unable to get local issuer certificate
More details here: https://curl.haxx.se/docs/sslcerts.html
curl failed to verify the legitimacy of the server and therefore could not
establish a secure connection to it. To learn more about this situation and
how to fix it, please visit the web page mentioned above.
curl -vv --cacert localhost.crt https://localhost | head -n 20
#curl: (60) SSL certificate problem: unable to get local issuer certificate
# Use the myCA.pem certificate (using localhost.crt would create #curl: (60) SSL certificate problem: unable to get local issuer certificate)
curl -vv --cacert myCA.pem https://localhost | head -n 11
Warning: Potential Security Risk Ahead. Firefox detected a potential security threat and did not continue to localhost. If you visit this site, attackers could try to steal information like your passwords, emails, or credit card details.
#STOP firefox via ^C - REQUIRED to create firefox profile for the first time!
^CExiting due to channel error.
# Detect firefox profile in use
profile_in_use=$(cd ~/.mozilla/firefox && ls -lat | grep default | head -n 1 | sed 's/.*[[:space:]]\(.*default.*$\)/\1/g')
echo "profile is use: $profile_in_use"
# Run the Certificate Database Tool
# add (-A) an existing certificate to a certificate database, specify the nickname (-n) of a certificate or key,
# specify (-t) the trust attributes to modify in an existing certificate or to apply to a certificate when creating it or adding it to a database. There are three available trust categories for each certificate, expressed in this order: "SSL ,email ,object signing ":
# c Valid CA
# T Trusted CA to issue client certificates (implies c)
# C Trusted CA to issue server certificates (SSL only)(implies c)
certutil -A -n "My CA certificate" -t "TC,," -i myCA.pem -d sql:/$HOME/.mozilla/firefox/$profile_in_use
# List the certificate
certutil -d sql:/$HOME/.mozilla/firefox/$profile_in_use -L
# Delete certificate (do not execute it here)
# certutil -D -n "My CA certificate" -d sql:/$HOME/.mozilla/firefox/$profile_in_use
Certificate Nickname Trust Attributes
SSL,S/MIME,JAR/XPI
DigiCert SHA2 Secure Server CA ,,
Amazon ,,
...
My CA certificate CT,,
# Start firefox again
firefox https://localhost
Note 2 certificates (based on localhost.crt and myCA.pem)
Your connection is not private. Attackers might be trying to steal your information from localhost (for example, passwords, messages, or credit cards). NET::ERR_CERT_AUTHORITY_INVALID
In Authorites tab import myCA.pem certificate
Now reload the page
It also work for explicitly specified https://10.0.2.15 in Certificate Subject Alternative Name
1. IDM – create user login myuser
2. IDM – create user group kafka_dev_myuser_rw_grp, kafka_dev_myuser_ro_grp and add user login myuser to that group
so
bash-4.2$ id myuser
uid=12345678(myuser) gid=12345678(myuser), 87654321(kafka_dev_myuser_rw_grp), 87654320(kafka_dev_myuser_ro_grp)
3. Hue/Hive sql create role:
0: jdbc:hive2://host> CREATE ROLE kafka_dev_myuser_rw_role;
0: jdbc:hive2://host> GRANT ROLE kafka_dev_myuser_rw_role TO GROUP kafka_dev_myuser_rw_grp;
0: jdbc:hive2://host> SHOW ROLE GRANT GROUP kafka_dev_myuser_rw_grp;
+---------------------------+---------------+-------------+----------+--+
| role | grant_option | grant_time | grantor |
+---------------------------+---------------+-------------+----------+--+
| kafka_dev_myuser_rw_role | false | NULL | -- |
+---------------------------+---------------+-------------+----------+--+
0: jdbc:hive2://host> CREATE ROLE kafka_dev_myuser_ro_role;
0: jdbc:hive2://host> GRANT ROLE kafka_dev_myuser_ro_role TO GROUP kafka_dev_myuser_ro_grp;
0: jdbc:hive2://host> SHOW ROLE GRANT GROUP kafka_dev_myuser_ro_grp;
+---------------------------+---------------+-------------+----------+--+
| role | grant_option | grant_time | grantor |
+---------------------------+---------------+-------------+----------+--+
| kafka_dev_myuser_ro_role | false | NULL | -- |
+---------------------------+---------------+-------------+----------+--+
1 row selected (0.106 seconds)
4. Setup topic and producer and consumer group access for roles:
cd /run/cloudera-scm-agent/process/1234-kafka-KAFKA_BROKER/
kinit -kt kafka.keytab kafka/host.hadoop.com@HADOOP.COM
5. Send data using Kafka console producer:
Init kerberos keytab session:
kinit -kt /path/to/me.keytab me@HADOOP.COM
If that fails due to expired password, change the password and re – create / generate kerberos keytab:
ipa-getkeytab -s my-kdc-server.hadoop.com -p me@HADOOP.COM -P -k me.keytab
20/12/14 03:49:05 INFO producer.ProducerConfig: ProducerConfig values:
...
Debug is true storeKey true useTicketCache false useKeyTab true doNotPrompt false ticketCache is null isInitiator true KeyTab is /path/to/me.keytab refreshKrb5Config is false principal is me@HADOOP.COM tryFirstPass is false useFirstPass is false storePass is false clearPass is false
principal is me@HADOOP.COM
Will use keytab
Commit Succeeded
...
20/12/14 03:49:06 INFO authenticator.AbstractLogin: Successfully logged in.
20/12/14 03:49:06 INFO utils.AppInfoParser: Kafka version : 1.0.1-kafka-3.1.1
20/12/14 03:49:06 INFO utils.AppInfoParser: Kafka commitId : unknown
>hello from bartek
>^
6. Read 2 messages using kafka console consumer:
It does not require ksession to be present: klist: No credentials cache found
Debug is true storeKey true useTicketCache false useKeyTab true doNotPrompt false ticketCache is null isInitiator true KeyTab is /path/to/me.keytab refreshKrb5Config is false principal is me@HADOOP.COM tryFirstPass is false useFirstPass is false storePass is false clearPass is false
principal is me@HADOOP.COM
Will use keytab
Commit Succeeded
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c
2. Examine process command via ps -ef or watch the logs:
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1