本文为您介绍E-MapReduce中DeltaLake的配置信息及其常用命令的示例。
DeltaLake配置信息
EMR中DeltaLake的默认配置信息如下:
Spark 2.X环境
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
Spark 3.X环境
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog
常用命令示例
创建表
CREATE TABLE delta_table (id INT) USING delta;
插入数据
INSERT INTO delta_table VALUES 0,1,2,3,4;
覆盖写数据
INSERT OVERWRITE TABLE delta_table VALUES 5,6,7,8,9;
查询数据
SELECT * FROM delta_table;
更新数据
给偶数
id
加100。UPDATE delta_table SET id = id + 100 WHERE mod(id, 2) = 0;
删除数据
删除偶数
id
的记录。DELETE FROM delta_table WHERE mod(id, 2) = 0;
Merge操作
创建Source表用于Merge操作。
CREATE TABLE newData(id INT) USING delta;
向表中插入数据。
INSERT INTO newData VALUES 0,1,2,3,4,5,6,7,8,9;
使用newData作为Source Merge进delta_table表。如果匹配到相同
id
的记录,则该id
加100;如果没有匹配到,则直接插入新记录。MERGE INTO delta_table AS target USING newData AS source ON target.id = source.id WHEN MATCHED THEN UPDATE SET target.id = source.id + 100 WHEN NOT MATCHED THEN INSERT *;
流式读数据
创建流式目标表。
CREATE TABLE stream_debug_table (id INT) USING delta;
创建流。
CREATE SCAN stream_delta_table on delta_table USING STREAM;
说明本文示例中的delta_table为您已存在的delta表。
流式写入目标表。
CREATE STREAM job options ( triggerType='ProcessingTime', checkpointLocation = '/tmp/streaming_read_cp' ) INSERT INTO stream_debug_table SELECT * FROM stream_delta_table;
流式代码示例
通过SSH方式连接集群,详情请参见登录集群。
执行以下命令,启动streaming-sql。
streaming-sql
说明如果您已添加DeltaLake组件,则可以直接执行
streaming-sql
命令。如果集群内没有默认配置,您可以通过以下配置来使用Delta Lake。streaming-sql --jars /path/to/delta-core_2.11-0.6.1.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
执行以下命令,创建流式目标表。
CREATE TABLE stream_debug_table (id INT) USING DELTA;
执行以下命令,创建流。
CREATE SCAN stream_delta_table on delta_table USING STREAM;
执行以下命令,以delta_table作为Source,流式写入目标表。
CREATE STREAM job options ( triggerType='ProcessingTime', checkpointLocation = '/tmp/streaming_read_cp' ) INSERT INTO stream_debug_table SELECT * FROM stream_delta_table;
您可以新开启一个streaming-sql客户端,向Source中插入新数据,并查询目标表的数据。
执行以下命令,验证Source存量写入。
SELECT * FROM stream_debug_table;
执行以下命令,插入新数据。
INSERT INTO delta_table VALUES 801, 802;
执行以下命令,查询插入的数据。
SELECT * FROM stream_debug_table;
执行以下命令,插入新数据。
INSERT INTO delta_table VALUES 901, 902;
执行以下命令,查询插入的数据。
SELECT * FROM stream_debug_table;