全部產品
Search
文件中心

AnalyticDB:通過Spark SQL讀寫Delta外表

更新時間:Sep 06, 2024

Delta是一種可以基於OSSObject Storage Service的資料湖表格式,支援UPDATE、DELETE和INSERT操作。AnalyticDB for MySQL和Delta表格式進行了整合,您可以通過Spark SQL讀寫Delta外表。本文主要介紹如何通過Spark SQL讀寫Delta外表。

前提條件

  • 叢集的產品系列為湖倉版

  • 已在湖倉版叢集中建立Job型資源群組。具體操作,請參見建立資源群組

  • 已建立湖倉版叢集的資料庫帳號。

注意事項

  • Xihe引擎不支援讀寫Delta表。

  • AnalyticDB for MySQL Spark側負責整合Spark對應版本的Delta版本,並對內建Delta版本進行升級,不負責Delta核心問題排查,以及不同Delta版本之前相容性問題。

讀寫Delta外表

AnalyticDB for MySQL內建Delta包可滿足通過Spark SQL讀寫Delta外表資料。若內建Delta包版本(2.0.2)無法滿足讀寫需求時,可自編譯Delta包版本讀寫Delta外表資料。

使用AnalyticDB MySQL內建Delta包

步驟一:進入資料開發

  1. 登入雲原生資料倉儲AnalyticDB MySQL控制台,在左上方選擇叢集所在地區。在左側導覽列,單擊集群清單,在湖倉版頁簽下,單擊目的地組群ID。

  2. 在左側導覽列,單擊作業開發 > SQL開發

  3. SQLConsole視窗,選擇Spark引擎和Job型資源群組。

步驟二:建立外庫和Delta外表

說明

您可以選擇批處理或互動式執行任意一種方式執行以下SQL語句。詳情請參見Spark SQL執行方式

  1. 執行以下語句,建立資料庫。如果已有資料庫,可跳過本步驟。

    CREATE DATABASE if not exists external_delta_db
    location "oss://<bucket_name>/test/";      /*用於在該路徑中建立表,請替換為自己的OSS路徑。*/
  2. 執行以下語句,建立Delta外表。

    CREATE TABLE if not exists external_delta_db.delta_test_tbl (
      id int, 
      name string, 
      age int
    ) using delta 
    partitioned by (age) 
    location "oss://<bucket_name>/test/delta_test_tbl";

步驟三:寫入Delta外表資料

說明

您可以選擇批處理或互動式執行任意一種方式執行以下SQL語句。詳情請參見Spark SQL執行方式

INSERT

執行以下語句,寫入資料。您可以選擇以下任意一種方式向Delta外表中寫入資料。

  • 方式一:INSERT INTO寫入

    INSERT INTO external_delta_db.delta_test_tbl values(1, 'lisa', 10),(2, 'jams', 10);
  • 方式二:INSERT OVERWRITE全表寫入

    INSERT OVERWRITE external_delta_db.delta_test_tbl values (2, 'zhangsan', 10), (4, 'lisi', 30);
  • 方式三:INSERT OVERWRITE靜態分區寫入

    INSERT OVERWRITE external_delta_db.delta_test_tbl partition(age=17) values(3, 'anna');
  • 方式四:INSERT OVERWRITE動態分區寫入

    INSERT OVERWRITE external_delta_db.delta_test_tbl partition (age) values (1, 'bom', 10);

UPDATE

執行以下語句更新資料,本文以將id=1的name列更新為box為例。

UPDATE external_delta_db.delta_test_tbl set name = 'box' where id = 1;

DELETE

執行以下語句刪除資料,本文以刪除id列為1的資料為例。

DELETE FROM external_delta_db.delta_test_tbl where id = 1;

步驟四:查詢資料

說明
  • 您可以選擇批處理或互動式執行任意一種方式執行以下SQL語句。詳情請參見Spark SQL執行方式

  • 執行Spark SQL語句,只返回執行成功或者失敗,不返回資料。您可以在Spark Jar開發頁面應用列表頁簽中的日誌查看錶資料。詳情請參見查看Spark應用資訊

執行以下語句,查詢Delta外表資料。

SELECT * FROM external_delta_db.delta_test_tbl;

使用自訂Delta包

重要

需要使用和AnalyticDB for MySQL Spark核心版本(3.2.0)匹配的Delta版本,以免出現版本不相容問題。

步驟一:進入資料開發

  1. 登入雲原生資料倉儲AnalyticDB MySQL控制台
  2. 在頁面左上方,選擇叢集所在地區。
  3. 在左側導覽列,單擊集群清單
  4. 湖倉版(3.0)頁簽下,單擊目標集群ID
  5. 在左側導覽列,單擊作業開發 > SQL開發

  6. SQLConsole視窗,選擇Spark引擎和Job型資源群組。

步驟二:建立外庫和Delta外表

說明

您可以選擇批處理或互動式執行任意一種方式執行以下SQL語句。詳情請參見Spark SQL執行方式

  1. 執行以下語句,建立資料庫。如果已有資料庫,可跳過本步驟。

    add jar oss://<bucket_name>/path/to/delta-core_xx.jar;    /*自編譯Delta包,需要手動上傳至OSS中。*/
    
    add jar oss://<bucket_name>/path/to/delta-storage-xx.jar; /*自編譯Delta包,需要手動上傳至OSS中。*/
    
    SET spark.adb.connectors=oss;   /*啟用AnalyticDB MySQL版Spark內建的連接器OSS。*/ 
    
    SET spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension;  /*開源Spark參數。*/
    
    SET spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog;   /*開源Spark參數。*/
    
    CREATE DATABASE if not exists external_delta_db
    location "oss://<bucket_name>/test/";          /*用於在該路徑中建立表,請替換為自己的OSS路徑。*/
  2. 執行以下語句,建立Delta外表。

    CREATE TABLE if not exists external_delta_db.delta_test_tbl (
      id int, 
      name string, 
      age int
    ) using delta 
    partitioned by (age) 
    location "oss://<bucket_name>/test/delta_test_tbl";

步驟三:寫入Delta外表資料

說明

您可以選擇批處理或互動式執行任意一種方式執行以下SQL語句。詳情請參見Spark SQL執行方式

INSERT

執行以下語句,寫入資料。您可以選擇以下任意一種方式向Delta外表中寫入資料。

  • 方式一:INSERT INTO寫入

    INSERT INTO external_delta_db.delta_test_tbl values(1, 'lisa', 10),(2, 'jams', 10);
  • 方式二:INSERT OVERWRITE全表寫入

    INSERT OVERWRITE external_delta_db.delta_test_tbl values (2, 'zhangsan', 10), (4, 'lisi', 30);
  • 方式三:INSERT OVERWRITE靜態分區寫入

    INSERT OVERWRITE external_delta_db.delta_test_tbl partition(age=17) values(3, 'anna');
  • 方式四:INSERT OVERWRITE動態分區寫入

    INSERT OVERWRITE external_delta_db.delta_test_tbl partition (age) values (1, 'bom', 10);

UPDATE

執行以下語句更新資料,本文以將id=1的name列更新為box為例。

UPDATE external_delta_db.delta_test_tbl set name = 'box' where id = 1;

DELETE

執行以下語句刪除資料,本文以刪除id列為1的資料為例。

DELETE FROM external_delta_db.delta_test_tbl where id = 1;

步驟四:查詢資料

說明
  • 您可以選擇批處理或互動式執行任意一種方式執行以下SQL語句。詳情請參見Spark SQL執行方式

  • 執行Spark SQL語句,只返回執行成功或者失敗,不返回資料。您可以在Spark Jar開發頁面應用列表頁簽中的日誌查看錶資料。詳情請參見查看Spark應用資訊

執行以下語句,查詢Delta外表資料。

SELECT * FROM external_delta_db.delta_test_tbl;