本文介紹如何在Spark中進行MaxCompute資料的讀寫操作。
操作步驟
初始化一個OdpsOps對象。
在Spark中,MaxCompute的資料操作通過OdpsOps類完成。
import com.aliyun.odps.TableSchema import com.aliyun.odps.data.Record import org.apache.spark.aliyun.odps.OdpsOps import org.apache.spark.{SparkContext, SparkConf} object Sample { def main(args: Array[String]): Unit = { // == Step-1 == val accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID") val accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET") // 以內網地址為例。 val urls = Seq("http://odps-ext.aliyun-inc.com/api", "http://dt-ext.odps.aliyun-inc.com") val conf = new SparkConf().setAppName("Test Odps") val sc = new SparkContext(conf) val odpsOps = OdpsOps(sc, accessKeyId, accessKeySecret, urls(0), urls(1)) // 下面是一些調用代碼。 // == Step-2 == ... // == Step-3 == ... } }
說明運行程式碼範例前必須先配置環境變數。關於如何配置環境變數,請參見配置環境變數。
載入MaxCompute中的表資料至Spark。
通過OdpsOps對象的readTable方法,您可以將MaxCompute中的表資料載入到Spark中。
// == Step-2 == val project = <odps-project> val table = <odps-table> val numPartitions = 2 val inputData = odpsOps.readTable(project, table, read, numPartitions) inputData.top(10).foreach(println) // == Step-3 == ...
在上面的代碼中,您還需要定義一個read函數,用來解析和預先處理MaxCompute表資料,代碼如下所示。
def read(record: Record, schema: TableSchema): String = { record.getString(0) }
儲存Spark中的結果資料至MaxCompute表中。
通過OdpsOps對象的saveToTable方法,您可以儲存Spark中的結果資料至MaxCompute表中。
val resultData = inputData.map(e => s"$e has been processed.") odpsOps.saveToTable(project, table, dataRDD, write)
在上面的代碼中,您還需要定義一個write函數,用來進行資料預先處理,代碼如下所示。
def write(s: String, emptyRecord: Record, schema: TableSchema): Unit = { val r = emptyRecord r.set(0, s) }
分區表參數寫法說明
SDK支援對MaxCompute分區表的讀寫,分區名的寫法標準是分區列名=分區名,多個分區時以逗號(,)分隔。
讀分區pt為1的表資料:pt=‘1’。
讀分區pt為1和分區ps為2的表資料:pt=‘1’, ps=‘2’。
附錄
完整範例程式碼,請參見Spark對接MaxCompute。