全部產品
Search
文件中心

E-MapReduce:Spark對接MaxCompute

更新時間:Jul 01, 2024

本文介紹如何在Spark中進行MaxCompute資料的讀寫操作。

操作步驟

  1. 初始化一個OdpsOps對象。

    在Spark中,MaxCompute的資料操作通過OdpsOps類完成。

    import com.aliyun.odps.TableSchema
         import com.aliyun.odps.data.Record
         import org.apache.spark.aliyun.odps.OdpsOps
         import org.apache.spark.{SparkContext, SparkConf}
         object Sample {
           def main(args: Array[String]): Unit = {    
             // == Step-1 ==
             val accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
             val accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
             // 以內網地址為例。
             val urls = Seq("http://odps-ext.aliyun-inc.com/api", "http://dt-ext.odps.aliyun-inc.com") 
             val conf = new SparkConf().setAppName("Test Odps")
             val sc = new SparkContext(conf)
             val odpsOps = OdpsOps(sc, accessKeyId, accessKeySecret, urls(0), urls(1))
             // 下面是一些調用代碼。
             // == Step-2 ==
             ...
             // == Step-3 ==
             ...
           }
         }
    說明

    運行程式碼範例前必須先配置環境變數。關於如何配置環境變數,請參見配置環境變數

  2. 載入MaxCompute中的表資料至Spark。

    通過OdpsOps對象的readTable方法,您可以將MaxCompute中的表資料載入到Spark中。

    // == Step-2 ==
             val project = <odps-project>
             val table = <odps-table>
             val numPartitions = 2
             val inputData = odpsOps.readTable(project, table, read, numPartitions)
             inputData.top(10).foreach(println)
             // == Step-3 ==
             ...

    在上面的代碼中,您還需要定義一個read函數,用來解析和預先處理MaxCompute表資料,代碼如下所示。

    def read(record: Record, schema: TableSchema): String = {
               record.getString(0)
             }
  3. 儲存Spark中的結果資料至MaxCompute表中。

    通過OdpsOps對象的saveToTable方法,您可以儲存Spark中的結果資料至MaxCompute表中。

    val resultData = inputData.map(e => s"$e has been processed.")
             odpsOps.saveToTable(project, table, dataRDD, write)

    在上面的代碼中,您還需要定義一個write函數,用來進行資料預先處理,代碼如下所示。

    def write(s: String, emptyRecord: Record, schema: TableSchema): Unit = {
               val r = emptyRecord
               r.set(0, s)
             }
  4. 分區表參數寫法說明

    SDK支援對MaxCompute分區表的讀寫,分區名的寫法標準是分區列名=分區名,多個分區時以逗號(,)分隔。

    • 讀分區pt為1的表資料:pt=‘1’。

    • 讀分區pt為1和分區ps為2的表資料:pt=‘1’, ps=‘2’。

附錄

完整範例程式碼,請參見Spark對接MaxCompute