All Products
Search
Document Center

E-MapReduce:Use EMR Serverless Spark to submit a PySpark streaming job

Last Updated:Nov 04, 2024

With the rapid development of big data, the stream processing technology has become essential for real-time data analysis. E-MapReduce (EMR) Serverless Spark is a powerful and scalable platform that simplifies real-time data processing and saves the trouble of managing servers, thereby improving efficiency. This topic describes how to use EMR Serverless Spark to submit a PySpark streaming job. You can understand the usability and maintainability of EMR Serverless Spark in stream processing based on this topic.

Prerequisites

An EMR Serverless Spark workspace is created. For more information, see Create a workspace.

Procedure

Step 1: Create a Dataflow cluster and generate messages

  1. Log on to the EMR console and create a Dataflow cluster that contains the Kafka service on the EMR on ECS page. For more information, see Create a cluster.

  2. Log on to the master node of the EMR cluster. For more information, see Log on to a cluster.

  3. Run the following command to switch the directory:

    cd /var/log/emr/taihao_exporter
  4. Run the following command to create a topic:

    # Create a topic named taihaometrics, with 10 partitions and a replica factor of 2. 
    kafka-topics.sh --partitions 10 --replication-factor 2 --bootstrap-server core-1-1:9092 --topic taihaometrics --create
  5. Run the following command to send messages:

    # Use the kafka-console-producer CLI to send messages to the taihaometrics topic. 
    tail -f metrics.log | kafka-console-producer.sh --broker-list core-1-1:9092 --topic taihaometrics

Step 2: Create a network connection

  1. Go to the Network Connections page.

    1. In the left-side navigation pane of the EMR console, choose EMR Serverless > Spark.

    2. On the Spark page, find the desired workspace and click the name of the workspace.

    3. In the left-side navigation pane of the EMR Serverless Spark page, click Network Connections.

  2. On the Network Connections page, click Create Network Connection.

  3. In the Create Network Connection dialog box, configure parameters and click OK. The following table describes the parameters.

    Parameter

    Description

    Name

    The name of the network connection. Example: connection_to_emr_kafka.

    VPC

    The virtual private cloud (VPC) in which your EMR cluster resides.

    If no VPC is available, click Create VPC to create a VPC in the VPC console. For more information, see Create and manage a VPC.

    vSwitch

    A vSwitch that is deployed in the VPC in which the EMR cluster is deployed.

    If no vSwitch is available in the current zone, click vSwitch to create a vSwitch in the VPC console. For more information, see Create and manage a vSwitch.

    If Succeeded is displayed in the Status column of the connection, the network connection is created.

Step 3: Configure security group rules for the EMR cluster

  1. Obtain the CIDR block of the vSwitch to which a cluster node is connected.

    On the Nodes tab, click the name of a node group to view the associated vSwitch. Then, log on to the VPC console and obtain the CIDR block of the vSwitch on the vSwitch page.

    image

  2. Configure security group rules.

    1. On the EMR on ECS page, find the desired cluster and click the name of the cluster.

    2. In the Security section of the Basic Information tab, click the link to the right of Cluster Security Group.

    3. On the Security Group Details page, click Add Rule, configure the Port Range and Authorization Object parameters, and then click Save in the Actions column.

      Parameter

      Description

      Port Range

      The port number. In this example, enter 9092.

      Authorization Object

      The CIDR block of the vSwitch obtained in the previous step.

      Important

      To prevent attacks from external users, we recommend that you do not set the Authorization Object parameter to 0.0.0.0/0.

Step 4: Upload JAR files to OSS

Decompress the kafka.zip package and upload all the obtained JAR files to Object Storage Service (OSS). For more information, see Simple upload.

Step 5: Upload the resource file

  1. In the left-side navigation pane of the EMR Serverless Spark page, click Files.

  2. On the Files page, click Upload File.

  3. In the Upload File dialog box, click the area in the dotted line rectangle to select the pyspark_ss_demo.py file, or drag the file to the area.

Step 6: Create and start a streaming job

  1. In the left-side navigation pane of the EMR Serverless Spark page, click Data Development.

  2. Click Create.

  3. Enter a name in the Name field, choose Streaming Job > PySpark from the Type drop-down list, and then click OK.

  4. On the configuration tab that appears, configure parameters and click Save. The following table describes the parameters that you must configure.

    Parameter

    Description

    Main Python Resources

    Enter the path of the pyspark_ss_demo.py file that you uploaded on the Artifacts page in the previous step.

    Engine Version

    Select the Spark version. For more information, see Engine versions.

    Execution Parameters

    Enter the private IP address of the core-1-1 node in the EMR cluster. You can go to the Nodes tab of the EMR cluster and click the + icon to the left of the core node group to view the private IP address of the core-1-1 node.

    Spark Configuration

    Enter the Spark configurations. The following code provides an example of Spark configurations:

    spark.jars oss://path/to/commons-pool2-2.11.1.jar,oss://path/to/kafka-clients-2.8.1.jar,oss://path/to/spark-sql-kafka-0-10_2.12-3.3.1.jar,oss://path/to/spark-token-provider-kafka-0-10_2.12-3.3.1.jar
    spark.emr.serverless.network.service.name connection_to_emr_kafka
    Note
    • spark.jars: specifies the path of the external JAR files that you want to load when the Spark job is running. Replace the value in the preceding code with the paths of all JAR files that you uploaded in Step 4.

    • spark.emr.serverless.network.service.name: specifies the name of the network connection. Replace the value in the preceding code with the name of the network connection that you created in Step 2.

  5. Click Publish.

  6. In the Publish dialog box, click OK.

  7. Start the streaming job.

    1. Click Go to O&M.

    2. On the page that appears, click Start. In the Start Task dialog box, configure the Execution Queue parameter and click OK.

Step 7: View logs

  1. Click the Log Exploration tab.

  2. On the Log Exploration tab, click Driver Logs. Then, click stdOut.log.

    In the log file, you can view information about the running of the application program and the returned results.

    image

References

For information about how to develop a PySpark batch job, see Get started with the development of PySpark batch jobs.