All Products
Search
Document Center

E-MapReduce:Use Spark to consume data in offline mode

Last Updated:May 11, 2024

This topic describes how to use Spark to consume data of Log Service in offline mode.

Use Spark RDD to consume data of Log Service

Sample code

## TestBatchLoghub.Scala

object TestBatchLoghub {
  def main(args: Array[String]): Unit = {
    if (args.length < 6) {
      System.err.println(
        """Usage: TestBatchLoghub <sls project> <sls logstore> <sls endpoint>
          |  <access key id> <access key secret> <output path> <start time> <end time=now>
        """.stripMargin)
      System.exit(1)
    }

    val loghubProject = args(0)
    val logStore = args(1)
    val endpoint = args(2)
    val accessKeyId = args(3)
    val accessKeySecret = args(4)
    val outputPath = args(5)
    val startTime = args(6).toLong

    val sc = new SparkContext(new SparkConf().setAppName("test batch loghub"))
    var rdd:JavaRDD[String] = null
    if (args.length > 7) {
      rdd = LoghubUtils.createRDD(sc, loghubProject, logStore, accessKeyId, accessKeySecret, endpoint, startTime, args(7).toLong)
    } else {
      rdd = LoghubUtils.createRDD(sc, loghubProject, logStore, accessKeyId, accessKeySecret, endpoint, startTime)
    }

    rdd.saveAsTextFile(outputPath)
  }
}
Note

For more information about the Maven project object model (POM) file, see aliyun-emapreduce-demo.

Compile and run the code

Note

You must configure environment variables before you can run the sample code. For more information about how to configure environment variables, see the Configure environment variables section in this topic.

## Run a command to compile the code.
mvn clean package -DskipTests

## After the code is compiled, the JAR package of the job is stored in the target/shaded/ directory. 

## Submit and run the job.
spark-submit --master yarn-cluster --executor-cores 2 --executor-memory 1g --driver-memory 1g --num-executors 2 --class x.x.x.TestBatchLoghub xxx.jar <sls project> <sls logstore> <sls endpoint> $ALIBABA_CLOUD_ACCESS_KEY_ID $ALIBABA_CLOUD_ACCESS_KEY_SECRET <output path> <start time> [<end time=now>]
Important
  • You need to replace x.x.x.TestBatchLoghub and xxx.jar with the actual classpath and package path based on your business requirements.

  • You must adjust the configurations of job resources based on the actual data size and cluster scale. If the cluster uses low specifications, you may fail to run the job by running commands in the preceding code.

Use Spark SQL statements to consume data of Log Service

Sample SQL statements

spark-sql --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/* \
  --hiveconf accessKeyId=$ALIBABA_CLOUD_ACCESS_KEY_ID \
  --hiveconf accessKeySecret=$ALIBABA_CLOUD_ACCESS_KEY_SECRET
Note

/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/* contains the type of the LogHub data source. If your E-MapReduce (EMR) cluster uses Spark 2, you must change spark3 in the preceding statement to spark2.

If you want to use Spark 3 to consume data of Log Service in the development environment of your on-premises machine, you can perform the following steps. The steps are similar to the steps that you can perform when you use Spark 2.

  1. Download the JAR package in the /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12 directory of your cluster to your on-premises machine.

  2. Use Maven to install the JAR package to your on-premises machine.

    mvn install:install-file -DgroupId=com.aliyun.emr -DartifactId=emr-datasources_shaded_2.12 -Dversion=3.0.2 -Dpackaging=jar -Dfile=/Users/zhongqiang.czq/Downloads/tempory/emr-datasources_shaded_2.12-3.0.2.jar
  3. Add the following dependency to the pom.xml file:

    <dependency>
      <groupId>com.aliyun.emr</groupId>
      <artifactId>emr-datasources_shaded_2.12</artifactId>
      <version>3.0.2</version>
    </dependency>

Example of creating a table and reading data from the table

create table test_sls
using loghub
  options(endpoint='cn-hangzhou-intranet.log.aliyuncs.com',
          access.key.id='${hiveconf:accessKeyId}',
          access.key.secret='${hiveconf:accessKeySecret}',
          sls.project='test_project',
          sls.store='test_store',
          startingoffsets='earliest'
);

select * from test_sls;

Configure environment variables

This section describes how to configure the ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables in your operating system.

Important
  • The AccessKey pair of an Alibaba Cloud account can be used to access all API operations. We recommend that you use a RAM user to call API operations or perform routine O&M. For information about how to use a RAM user, see Create a RAM user.

  • We recommend that you do not include your AccessKey pair in files that are easily accessible to others, such as the project code. Otherwise, your AccessKey pair may be leaked and resources in your account become insecure.

  • Linux and macOS

    Run the following commands to configure the environment variables.

    Replace <access_key_id> and <access_key_secret> with the AccessKey ID and AccessKey secret of your RAM user.

    export ALIBABA_CLOUD_ACCESS_KEY_ID=<access_key_id>
    export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<access_key_secret>
  • Windows

    1. Create an environment variable file, add the ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables to the file, and then set the environment variables to your AccessKey ID and AccessKey secret.

    2. Restart Windows for the AccessKey pair to take effect.

References

For more information about how to use Spark to access Kafka, see Structured Streaming + Kafka Integration Guide.