本文為您介紹E-MapReduce中DeltaLake的配置資訊及其常用命令的樣本。
DeltaLake配置資訊
EMR中DeltaLake的預設配置資訊如下:
Spark 2.X環境
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtensionSpark 3.X環境
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog
常用命令樣本
建立表
CREATE TABLE delta_table (id INT) USING delta;插入資料
INSERT INTO delta_table VALUES 0,1,2,3,4;覆蓋寫資料
INSERT OVERWRITE TABLE delta_table VALUES 5,6,7,8,9;查詢資料
SELECT * FROM delta_table;更新資料
給偶數
id加100。UPDATE delta_table SET id = id + 100 WHERE mod(id, 2) = 0;刪除資料
刪除偶數
id的記錄。DELETE FROM delta_table WHERE mod(id, 2) = 0;
Merge操作
建立Source表用於Merge操作。
CREATE TABLE newData(id INT) USING delta;向表中插入資料。
INSERT INTO newData VALUES 0,1,2,3,4,5,6,7,8,9;使用newData作為Source Merge進delta_table表。如果匹配到相同
id的記錄,則該id加100;如果沒有匹配到,則直接插入新記錄。MERGE INTO delta_table AS target USING newData AS source ON target.id = source.id WHEN MATCHED THEN UPDATE SET target.id = source.id + 100 WHEN NOT MATCHED THEN INSERT *;流式讀資料
建立流式目標表。
CREATE TABLE stream_debug_table (id INT) USING delta;建立流。
CREATE SCAN stream_delta_table on delta_table USING STREAM;說明本文樣本中的delta_table為您已存在的delta表。
流式寫入目標表。
CREATE STREAM job options ( triggerType='ProcessingTime', checkpointLocation = '/tmp/streaming_read_cp' ) INSERT INTO stream_debug_table SELECT * FROM stream_delta_table;
流式程式碼範例
通過SSH方式串連叢集,詳情請參見登入叢集。
執行以下命令,啟動streaming-sql。
streaming-sql說明如果您已添加DeltaLake組件,則可以直接執行
streaming-sql命令。如果叢集內沒有預設配置,您可以通過以下配置來使用Delta Lake。streaming-sql --jars /path/to/delta-core_2.11-0.6.1.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension執行以下命令,建立流式目標表。
CREATE TABLE stream_debug_table (id INT) USING DELTA;執行以下命令,建立流。
CREATE SCAN stream_delta_table on delta_table USING STREAM;執行以下命令,以delta_table作為Source,流式寫入目標表。
CREATE STREAM job options ( triggerType='ProcessingTime', checkpointLocation = '/tmp/streaming_read_cp' ) INSERT INTO stream_debug_table SELECT * FROM stream_delta_table;您可以新開啟一個streaming-sql用戶端,向Source中插入新資料,並查詢目標表的資料。
執行以下命令,驗證Source存量寫入。
SELECT * FROM stream_debug_table;執行以下命令,插入新資料。
INSERT INTO delta_table VALUES 801, 802;執行以下命令,查詢插入的資料。
SELECT * FROM stream_debug_table;執行以下命令,插入新資料。
INSERT INTO delta_table VALUES 901, 902;執行以下命令,查詢插入的資料。
SELECT * FROM stream_debug_table;