本文介绍如何通过Serverless Spark访问OSS数据源。您需要先配置访问OSS的权限,然后可以使用SQL的方式或者提交代码包(Python或者Jar包)的方式访问OSS。
云原生数据湖分析(DLA)产品已退市,云原生数据仓库 AnalyticDB MySQL 版湖仓版支持DLA已有功能,并提供更多的功能和更好的性能。AnalyticDB for MySQL相关使用文档,请参见访问OSS数据源。
操作步骤
配置DLA访问OSS的权限。
如果您使用的是阿里云主账号访问OSS,则默认您拥有该账号下所有OSS数据以及DLA OSS表的访问权限,无需配置,可直接使用。
如果您使用RAM子账号访问OSS并提交代码包作业,需要配置代码包和Spark代码访问OSS的权限。具体操作请参见细粒度配置RAM子账号权限。
如果您使用Spark SQL访问DLA OSS表的数据,需要确保您的RAM子账号关联了DLA账号,并且DLA账号拥有对应表的访问权限。如果您的RAM子账号未关联DLA账号,请进行关联操作,具体操作请参见DLA子账号绑定RAM账号。DLA账号对于表的访问权限,您可以登录DLA控制台,在 页面,使用
GRANT
或REVOKE
语法进行操作。
配置Spark OSS Connector。
配置了OSS访问权限之后,您就可以使用Spark来访问OSS数据了。在Spark的作业配置文件中,您需要添加配置项
“spark.dla.connectors” : “oss”
。DLA平台内置了Spark OSS Connector相关的实现,默认不生效,需要配置该参数令其生效。如果您有Spark OSS Connector的其他实现方式,您不需要配置该参数,您只需提交您自己的实现Jar包,并添加相应的配置即可。访问OSS数据。
您可以通过以下两种方式访问OSS数据:
通过提交Spark SQL语句的方式来访问OSS数据,具体操作请参见Spark SQL。作业示例配置如下所示:
{ "sqls": [ "select * from `1k_tables`.`table0` limit 100", "insert into `1k_tables`.`table0` values(1, 'test')" ], "name": "sql oss test", "conf": { "spark.dla.connectors": "oss", "spark.driver.resourceSpec": "small", "spark.sql.hive.metastore.version": "dla", "spark.executor.instances": 10, "spark.dla.job.log.oss.uri": "oss://test/spark-logs", "spark.executor.resourceSpec": "small" } }
通过Java、Scala、Python代码访问OSS数据。下面以Scala为例进行说明:
{ "args": ["oss://${oss-buck-name}/data/test/test.csv"], "name": "spark-oss-test", "file": "oss://${oss-buck-name}/jars/test/spark-examples-0.0.1-SNAPSHOT.jar", "className": "com.aliyun.spark.oss.SparkReadOss", "conf": { "spark.driver.resourceSpec": "medium", "spark.executor.resourceSpec": "medium", "spark.executor.instances": 2, "spark.dla.connectors": "oss" } }
说明SparkReadOss
对应的源码可以参考DLA Spark OSS demo。
启用OSS数据写入性能优化
当您使用自建HiveMetaStore或者DLA元数据服务访问OSS时,社区版Spark HiveClient的rename操作比较低效,DLA对此进行了优化。您只需要将参数spark.sql.hive.dla.metastoreV2.enable
设置为true
即可启用这项优化。示例如下:
{
"args": ["oss://${oss-buck-name}/data/test/test.csv"],
"name": "spark-oss-test",
"file": "oss://${oss-buck-name}/jars/test/spark-examples-0.0.1-SNAPSHOT.jar",
"className": "com.aliyun.spark.oss.WriteParquetFile",
"conf": {
"spark.driver.resourceSpec": "medium",
"spark.executor.resourceSpec": "medium",
"spark.executor.instances": 2,
"spark.dla.connectors": "oss",
"spark.sql.hive.dla.metastoreV2.enable": "true"
}
}
OSS Connector数据写入性能优化
OSS Connector数据写入性能优化功能是DLA Spark团队基于OSS分片上传功能,针对Spark写入数据到OSS过程中大量调用OSS API导致写入性能差的问题,实现的性能优化提升。在典型场景下,性能可提升1~3倍。
您需要启用DLA Spark内置的OSS connector,并开启性能优化开关,才能使用该功能。具体配置如下:
spark.dla.connectors = oss; //启用DLA Spark内置的OSS connector。
spark.hadoop.job.oss.fileoutputcommitter.enable = true; //开启性能优化开关。
如果启用该性能优化功能,在作业被强制Kill等情况下,可能会产生一些没有被清理的文件碎片,占用您OSS的存储空间。建议对相关OSS Bucket设置碎片生命周期规则,对过期未合并的碎片自动进行清理,建议配置周期为3天以上。具体操作请参见设置生命周期规则。
该性能优化功能对RDD的
saveAsHadoop
前缀和saveAsNewAPIHadoop
前缀的方法不生效。
使用示例:
{
"args": ["oss://${oss-buck-name}/data/test/test.csv"],
"name": "spark-oss-test",
"file": "oss://${oss-buck-name}/jars/test/spark-examples-0.0.1-SNAPSHOT.jar",
"className": "com.aliyun.spark.oss.WriteParquetFile",
"conf": {
"spark.driver.resourceSpec": "medium",
"spark.executor.resourceSpec": "medium",
"spark.executor.instances": 2,
"spark.dla.connectors": "oss",
"spark.hadoop.job.oss.fileoutputcommitter.enable": true
}
}