本文介绍如何在Spark中进行MaxCompute数据的读写操作。
操作步骤
初始化一个OdpsOps对象。
在Spark中,MaxCompute的数据操作通过OdpsOps类完成。
import com.aliyun.odps.TableSchema import com.aliyun.odps.data.Record import org.apache.spark.aliyun.odps.OdpsOps import org.apache.spark.{SparkContext, SparkConf} object Sample { def main(args: Array[String]): Unit = { // == Step-1 == val accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID") val accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET") // 以内网地址为例。 val urls = Seq("http://odps-ext.aliyun-inc.com/api", "http://dt-ext.odps.aliyun-inc.com") val conf = new SparkConf().setAppName("Test Odps") val sc = new SparkContext(conf) val odpsOps = OdpsOps(sc, accessKeyId, accessKeySecret, urls(0), urls(1)) // 下面是一些调用代码。 // == Step-2 == ... // == Step-3 == ... } }
说明运行代码示例前必须先配置环境变量。关于如何配置环境变量,请参见配置环境变量。
加载MaxCompute中的表数据至Spark。
通过OdpsOps对象的readTable方法,您可以将MaxCompute中的表数据加载到Spark中。
// == Step-2 == val project = <odps-project> val table = <odps-table> val numPartitions = 2 val inputData = odpsOps.readTable(project, table, read, numPartitions) inputData.top(10).foreach(println) // == Step-3 == ...
在上面的代码中,您还需要定义一个read函数,用来解析和预处理MaxCompute表数据,代码如下所示。
def read(record: Record, schema: TableSchema): String = { record.getString(0) }
保存Spark中的结果数据至MaxCompute表中。
通过OdpsOps对象的saveToTable方法,您可以保存Spark中的结果数据至MaxCompute表中。
val resultData = inputData.map(e => s"$e has been processed.") odpsOps.saveToTable(project, table, dataRDD, write)
在上面的代码中,您还需要定义一个write函数,用来进行数据预处理,代码如下所示。
def write(s: String, emptyRecord: Record, schema: TableSchema): Unit = { val r = emptyRecord r.set(0, s) }
分区表参数写法说明
SDK支持对MaxCompute分区表的读写,分区名的写法标准是分区列名=分区名,多个分区时以逗号(,)分隔。
读分区pt为1的表数据:pt=‘1’。
读分区pt为1和分区ps为2的表数据:pt=‘1’, ps=‘2’。
附录
完整示例代码,请参见Spark对接MaxCompute。