现有湖仓一体架构是以MaxCompute为中心读写Hadoop集群数据,有些线下IDC场景,客户不愿意对公网暴露集群内部信息,需要从Hadoop集群发起访问云上的数据。本文以开源大数据开发平台E-MapReduce(云上Hadoop)方式模拟本地Hadoop集群,为您介绍如何读写MaxCompute数据。
背景信息
实践架构图如下所示。

准备开发环境
- 准备E-MapReduce(EMR)环境。
- 购买EMR集群。
详情请参见E-MapReduce快速入门。
- 登录EMR集群。说明 登录E-MapReduce集群详情请参见登录集群。
本实践登录ECS实例进行操作,连接ECS实例请参见连接ECS实例。
- 购买EMR集群。
- 准备本地IDEA。
- 安装IntelliJ IDEA。
本实践在IntelliJ IDEA运行,需要安装IntelliJ IDEA,详情请参见Install IntelliJ IDEA。
- 安装Maven。
详情请参见安装Maven。
- 创建Scala项目。
- 下载Scala插件。打开IDEA,选择File>Settings。在Settings对话框左侧导航栏单击Plugins,单击Scala后的Install。

- 安装Scala JDK
- 创建Scala项目在IDEA里新建项目,选择Scala>IDEA,即可创建Scala项目。

- 下载Scala插件。
- 安装IntelliJ IDEA。
- 准备MaxCompute数据
- 创建项目
MaxCompute创建Project请参见创建MaxCompute项目。
- 获取AccessKey
您可以进入AccessKey管理页面获取AccessKey ID和AccessKey Secret。
- 获取Endpoint
MaxCompute服务的连接地址。您需要根据创建MaxCompute项目时选择的地域以及网络连接方式配置Endpoint。各地域及网络对应的Endpoint值,请参见Endpoint。
- 创建Table
本实践需准备分区表和非分区表供测试使用,创建表详情请参见创建表。
- 创建项目
读写MaxCompute数据
- 代码开发。本实践提供如下读非分区表代码开发示例。说明 读分区表、写非分区表和写分区表代码示例请参见PartitionDataReaderTest.scala、DataWriterTest.scala和PartitionDataWriterTest.scala,可以根据实际业务情况进行代码开发。
/* * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import org.apache.spark.sql.SparkSession /** * @author renxiang * @date 2021-12-20 */ object DataReaderTest { val ODPS_DATA_SOURCE = "org.apache.spark.sql.odps.datasource.DefaultSource" val ODPS_ENDPOINT = "http://service.cn.maxcompute.aliyun.com/api" def main(args: Array[String]): Unit = { val odpsProject = args(0) val odpsAkId = args(1) val odpsAkKey = args(2) val odpsTable = args(3) val spark = SparkSession .builder() .appName("odps-datasource-reader") .getOrCreate() import spark._ val df = spark.read.format(ODPS_DATA_SOURCE) .option("spark.hadoop.odps.project.name", odpsProject) .option("spark.hadoop.odps.access.id", odpsAkId) .option("spark.hadoop.odps.access.key", odpsAkKey) .option("spark.hadoop.odps.end.point", ODPS_ENDPOINT) .option("spark.hadoop.odps.table.name", odpsTable) .load() df.createOrReplaceTempView("odps_table") println("select * from odps_table") val dfFullScan = sql("select * from odps_table") println(dfFullScan.count) dfFullScan.show(10) Thread.sleep(72*3600*1000) } } - 代码打包和上传。
- 运行代码。
- 运行模式。
- Local模式。
- 使用Local模式运行的命令语法如下。
./bin/spark-submit \ --master local \ --jars ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar,${project.dir}/spark-datasource-v2.3/libs/cupid-table-api-1.1.5-SNAPSHOT.jar,${project.dir}/spark-datasource-v2.3/libs/table-api-tunnel-impl-1.1.5-SNAPSHOT.jar \ --class DataReaderTest \ ${jar-path} \ ${maxcompute-project-name} \ ${aliyun-access-key-id} \ ${aliyun-access-key-secret} \ ${maxcompute-table-name} - 参数说明如下。
参数 说明 master 运行模式,取值如下。 - Local:运行代码只调用当前ECS的计算资源。
- Yarn:运行代码使用EMR集群所有ECS的计算资源,运行效率比Local模式高。
jars 依赖的jar包路径。 class 需要执行的类名称。 jar-path 需要执行的jar包路径。 maxcompute-project-name MaxCompute的项目(Project)名称。 aliyun-access-key-id 阿里云账号或RAM用户的AccessKey ID。 您可以进入AccessKey管理页面获取AccessKey ID。
aliyun-access-key-secret AccessKey ID对应的AccessKey Secret。 您可以进入AccessKey管理页面获取AccessKey Secret。
maxcompute-table-name 进行读或写的MaxCompute表名称。
- 使用Local模式运行的命令语法如下。
- Yarn模式。
- 使用yarn模式运行的命令语法如下。
val ODPS_ENDPOINT = "http://service.cn-beijing.maxcompute.aliyun-inc.com/api" ./bin/spark-submit \ --master yarn \ --jars ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar,${project.dir}/spark-datasource-v2.3/libs/cupid-table-api-1.1.5-SNAPSHOT.jar,${project.dir}/spark-datasource-v2.3/libs/table-api-tunnel-impl-1.1.5-SNAPSHOT.jar \ --class DataReaderTest \ ${jar-path} \ ${maxcompute-project-name} \ ${aliyun-access-key-id} \ ${aliyun-access-key-secret} \ ${maxcompute-table-name} - 参数说明如下。
参数 说明 master 运行模式,取值如下。 - Local:运行代码只调用当前ECS的计算资源。
- Yarn:运行代码使用EMR集群所有ECS的计算资源,运行效率比Local模式高。
jars 依赖的jar包路径。 class 需要执行的类名称。 jar-path 需要执行的jar包路径。 maxcompute-project-name MaxCompute的项目(Project)名称。 aliyun-access-key-id 阿里云账号或RAM用户的AccessKey ID。 您可以进入AccessKey管理页面获取AccessKey ID。
aliyun-access-key-secret AccessKey ID对应的AccessKey Secret。 您可以进入AccessKey管理页面获取AccessKey Secret。
maxcompute-table-name 进行读或写的MaxCompute表名称。
- 使用yarn模式运行的命令语法如下。
- Local模式。
- 读非分区表示例。
- 命令语法如下。
-- 进入spark执行环境 cd /usr/lib/spark-current -- 提交任务 ./bin/spark-submit \ --master local \ --jars ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar,${project.dir}/spark-datasource-v2.3/libs/cupid-table-api-1.1.5-SNAPSHOT.jar,${project.dir}/spark-datasource-v2.3/libs/table-api-tunnel-impl-1.1.5-SNAPSHOT.jar \ --class DataReaderTest \ ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-tests.jar \ ${maxcompute-project-name} \ ${aliyun-access-key-id} \ ${aliyun-access-key-secret} \ ${maxcompute-table-name} - 执行界面如下。

- 执行结果如下。

- 命令语法如下。
- 读分区表示例。
- 命令语法如下。
-- 进入spark执行环境 cd /usr/lib/spark-current -- 提交任务 ./bin/spark-submit \ --master local \ --jars ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar,${project.dir}/spark-datasource-v2.3/libs/cupid-table-api-1.1.5-SNAPSHOT.jar,${project.dir}/spark-datasource-v2.3/libs/table-api-tunnel-impl-1.1.5-SNAPSHOT.jar \ --class PartitionDataReaderTest \ ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-tests.jar \ ${maxcompute-project-name} \ ${aliyun-access-key-id} \ ${aliyun-access-key-secret} \ ${maxcompute-table-name} \ ${partition-descripion} - 执行界面如下。

- 执行结果如下。

- 命令语法如下。
- 写非分区表测试。
- 命令语法如下。
./bin/spark-submit \ --master local \ --jars ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar,${project.dir}/spark-datasource-v2.3/libs/cupid-table-api-1.1.5-SNAPSHOT.jar,${project.dir}/spark-datasource-v2.3/libs/table-api-tunnel-impl-1.1.5-SNAPSHOT.jar \ --class DataWriterTest \ ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-tests.jar \ ${maxcompute-project-name} \ ${aliyun-access-key-id} \ ${aliyun-access-key-secret} \ ${maxcompute-table-name} - 执行界面如下。

- 执行结果如下。

- 命令语法如下。
- 写分区表测试。
- 命令语法如下。
./bin/spark-submit \ --master local \ --jars ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar,${project.dir}/spark-datasource-v2.3/libs/cupid-table-api-1.1.5-SNAPSHOT.jar,${project.dir}/spark-datasource-v2.3/libs/table-api-tunnel-impl-1.1.5-SNAPSHOT.jar \ --class PartitionDataWriterTest \ ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-tests.jar \ ${maxcompute-project-name} \ ${aliyun-access-key-id} \ ${aliyun-access-key-secret} \ ${maxcompute-table-name} \ ${partition-descripion} - 执行界面如下。

- 执行结果如下。

- 命令语法如下。
- 运行模式。
性能测试
本实践性能测试环境是E-MapReduce和MaxCompute,属于云上互联。如果IDC网络与云上相连性能取决于tunnel资源或者专线带宽。
- 实例规格。
实例 规格 E-MapReduce集群 - Master节点数量:2个。
- ECS规格:计算型(ecs.c6.2xlarge)8 vCPU,16 GiB,2.5 Gbps。
- 系统盘:ESSD云盘 120GiB。
- 数据盘:ESSD云盘 80GiB。
- Core节点数量:2个。
- ECS规格:计算型(ecs.c6.2xlarge)8 vCPU,16 GiB,2.5 Gbps。
- 系统盘:ESSD云盘 120GiB。
- 数据盘:ESSD云盘 80GiB * 4。
MaxCompute 按量计费标准版。 - Master节点数量:2个。
- 大表读测试。数据表规格如下。
结果如下。参数 规格 表名称 dwd_product_movie_basic_info 说明 此表为MaxCompute公开数据集MAXCOMPUTE_PUBLIC_DATA项目下的表,详情请参见公开数据集。表大小 4829258484 Byte。 分区数 593 个。 读取的分区名称 20170422。
耗时 0.850871秒。 - 大表写测试。
- 分区写入万条数据。
耗时2.533892秒。 - 分区写入十万条数据。
耗时8.441193秒。 - 分区写入百万条数据。
耗时73.28秒。
- 分区写入万条数据。


