本文為您介紹E-MapReduce中DeltaLake的配置資訊及其常用命令的樣本。
DeltaLake配置資訊
EMR中DeltaLake的預設配置資訊如下:
Spark 2.X環境
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
Spark 3.X環境
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog
常用命令樣本
建立表
CREATE TABLE delta_table (id INT) USING delta;
插入資料
INSERT INTO delta_table VALUES 0,1,2,3,4;
覆蓋寫資料
INSERT OVERWRITE TABLE delta_table VALUES 5,6,7,8,9;
查詢資料
SELECT * FROM delta_table;
更新資料
給偶數
id
加100。UPDATE delta_table SET id = id + 100 WHERE mod(id, 2) = 0;
刪除資料
刪除偶數
id
的記錄。DELETE FROM delta_table WHERE mod(id, 2) = 0;
Merge操作
建立Source表用於Merge操作。
CREATE TABLE newData(id INT) USING delta;
向表中插入資料。
INSERT INTO newData VALUES 0,1,2,3,4,5,6,7,8,9;
使用newData作為Source Merge進delta_table表。如果匹配到相同
id
的記錄,則該id
加100;如果沒有匹配到,則直接插入新記錄。MERGE INTO delta_table AS target USING newData AS source ON target.id = source.id WHEN MATCHED THEN UPDATE SET target.id = source.id + 100 WHEN NOT MATCHED THEN INSERT *;
流式讀資料
建立流式目標表。
CREATE TABLE stream_debug_table (id INT) USING delta;
建立流。
CREATE SCAN stream_delta_table on delta_table USING STREAM;
說明本文樣本中的delta_table為您已存在的delta表。
流式寫入目標表。
CREATE STREAM job options ( triggerType='ProcessingTime', checkpointLocation = '/tmp/streaming_read_cp' ) INSERT INTO stream_debug_table SELECT * FROM stream_delta_table;
流式程式碼範例
通過SSH方式串連叢集,詳情請參見登入叢集。
執行以下命令,啟動streaming-sql。
streaming-sql
說明如果您已添加DeltaLake組件,則可以直接執行
streaming-sql
命令。如果叢集內沒有預設配置,您可以通過以下配置來使用Delta Lake。streaming-sql --jars /path/to/delta-core_2.11-0.6.1.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
執行以下命令,建立流式目標表。
CREATE TABLE stream_debug_table (id INT) USING DELTA;
執行以下命令,建立流。
CREATE SCAN stream_delta_table on delta_table USING STREAM;
執行以下命令,以delta_table作為Source,流式寫入目標表。
CREATE STREAM job options ( triggerType='ProcessingTime', checkpointLocation = '/tmp/streaming_read_cp' ) INSERT INTO stream_debug_table SELECT * FROM stream_delta_table;
您可以新開啟一個streaming-sql用戶端,向Source中插入新資料,並查詢目標表的資料。
執行以下命令,驗證Source存量寫入。
SELECT * FROM stream_debug_table;
執行以下命令,插入新資料。
INSERT INTO delta_table VALUES 801, 802;
執行以下命令,查詢插入的資料。
SELECT * FROM stream_debug_table;
執行以下命令,插入新資料。
INSERT INTO delta_table VALUES 901, 902;
執行以下命令,查詢插入的資料。
SELECT * FROM stream_debug_table;