全部产品
Search
文档中心

开源大数据平台E-MapReduce:Spark对接MaxCompute

更新时间:Aug 15, 2023

本文介绍如何在Spark中进行MaxCompute数据的读写操作。

操作步骤

  1. 初始化一个OdpsOps对象。

    在Spark中,MaxCompute的数据操作通过OdpsOps类完成。

    import com.aliyun.odps.TableSchema
         import com.aliyun.odps.data.Record
         import org.apache.spark.aliyun.odps.OdpsOps
         import org.apache.spark.{SparkContext, SparkConf}
         object Sample {
           def main(args: Array[String]): Unit = {    
             // == Step-1 ==
             val accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
             val accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
             // 以内网地址为例。
             val urls = Seq("http://odps-ext.aliyun-inc.com/api", "http://dt-ext.odps.aliyun-inc.com") 
             val conf = new SparkConf().setAppName("Test Odps")
             val sc = new SparkContext(conf)
             val odpsOps = OdpsOps(sc, accessKeyId, accessKeySecret, urls(0), urls(1))
             // 下面是一些调用代码。
             // == Step-2 ==
             ...
             // == Step-3 ==
             ...
           }
         }
    说明

    运行代码示例前必须先配置环境变量。关于如何配置环境变量,请参见配置环境变量

  2. 加载MaxCompute中的表数据至Spark。

    通过OdpsOps对象的readTable方法,您可以将MaxCompute中的表数据加载到Spark中。

    // == Step-2 ==
             val project = <odps-project>
             val table = <odps-table>
             val numPartitions = 2
             val inputData = odpsOps.readTable(project, table, read, numPartitions)
             inputData.top(10).foreach(println)
             // == Step-3 ==
             ...

    在上面的代码中,您还需要定义一个read函数,用来解析和预处理MaxCompute表数据,代码如下所示。

    def read(record: Record, schema: TableSchema): String = {
               record.getString(0)
             }
  3. 保存Spark中的结果数据至MaxCompute表中。

    通过OdpsOps对象的saveToTable方法,您可以保存Spark中的结果数据至MaxCompute表中。

    val resultData = inputData.map(e => s"$e has been processed.")
             odpsOps.saveToTable(project, table, dataRDD, write)

    在上面的代码中,您还需要定义一个write函数,用来进行数据预处理,代码如下所示。

    def write(s: String, emptyRecord: Record, schema: TableSchema): Unit = {
               val r = emptyRecord
               r.set(0, s)
             }
  4. 分区表参数写法说明

    SDK支持对MaxCompute分区表的读写,分区名的写法标准是分区列名=分区名,多个分区时以逗号(,)分隔。

    • 读分区pt为1的表数据:pt=‘1’。

    • 读分区pt为1和分区ps为2的表数据:pt=‘1’, ps=‘2’。

附录

完整示例代码,请参见Spark对接MaxCompute