This topic describes how to run a Spark Streaming job in an E-MapReduce (EMR) Hadoop cluster to process data in a Kafka cluster.
Background information
EMR Hadoop and Kafka clusters run based on open source software. Therefore, you can use the relevant official documentation for reference during data development.
- Spark official documentation: streaming-kafka-integration and structured-streaming-kafka-integration
- EMR demo: GitHub
Methods to access Kafka clusters for which Kerberos authentication is enabled
- Hadoop cluster for which Kerberos authentication is disabled: Provide the kafka_client_jaas.conf and krb5.conf files that are used for the Kerberos authentication of the Kafka cluster.
- Hadoop cluster for which Kerberos authentication is enabled: Provide the kafka_client_jaas.conf and krb5.conf files that are used for the Kerberos authentication of the Hadoop cluster. The Kafka cluster can be authenticated based on the cross-domain trust feature of Kerberos authentication.
For more information about the cross-domain trust feature, see Configure cross-realm trust.
Both methods require you to provide the kafka_client_jaas.conf and krb5.conf files to support Kerberos authentication when you run a job.
- Content of the kafka_client_jaas.conf file:
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true serviceName="kafka" keyTab="/path/to/kafka.keytab" principal="kafka/emr-header-1.cluster-12345@EMR.12345.COM"; };
Note For information about how to obtain the keytab file, see Configure MIT Kerberos authentication. - You can obtain the krb5.conf file from the /etc/ directory of the Kafka cluster.
Use Spark Streaming to access a Kafka cluster for which Kerberos authentication is enabled
Add the long domain name and IP address of each node of the Kafka cluster to the /etc/hosts file for each node of the Hadoop cluster. The long domain name and IP address of a node can be obtained in the /etc/hosts file for the node. A long domain name is in the format of emr-xxx-x.cluster-xxx
.
spark-submit --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config={{PWD}}/kafka_client_jaas.conf -Djava.security.krb5.conf={{PWD}}/krb5.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config={{PWD}}//kafka_client_jaas.conf -Djava.security.krb5.conf={{PWD}}/krb5.conf" --files /local/path/to/kafka_client_jaas.conf,/local/path/to/kafka.keytab,/local/path/to/krb5.conf --class xx.xx.xx.KafkaSample --num-executors 2 --executor-cores 2 --executor-memory 1g --master yarn-cluster xxx.jar arg1 arg2 arg3
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
serviceName="kafka"
keyTab="kafka.keytab"
principal="kafka/emr-header-1.cluster-12345@EMR.12345.COM";
};
Use Spark SQL statements to access Kafka
spark-sql --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*
/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*
contains the type of the data source that you want to use to access the Kafka data source. If your EMR cluster uses Spark 2, you must change spark3
in the preceding statement to spark2
. create table test_kafka
using loghub
options(kafka.bootstrap.servers='alikafka-post-cn-7mz2sqqr****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-7mz2sqqr****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-7mz2sqqr****-3-vpc.alikafka.aliyuncs.com:9092',
subscribe='test_topic',
startingoffsets='earliest'
)
select * from test_kafka;
Appendix
- For the sample code, visit GitHub.
- For more information about the parameters, see Structured Streaming + Kafka Integration Guide.