全部產品
Search
文件中心

E-MapReduce:基礎使用

更新時間:Jul 01, 2024

本文為您介紹E-MapReduce中DeltaLake的配置資訊及其常用命令的樣本。

DeltaLake配置資訊

EMR中DeltaLake的預設配置資訊如下:

  • Spark 2.X環境

    spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
  • Spark 3.X環境

    spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
    spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog

常用命令樣本

  • 建立表

    CREATE TABLE delta_table (id INT) USING delta;
  • 插入資料

    INSERT INTO delta_table VALUES 0,1,2,3,4;
  • 覆蓋寫資料

    INSERT OVERWRITE TABLE delta_table VALUES 5,6,7,8,9;
  • 查詢資料

    SELECT * FROM delta_table;
  • 更新資料

    給偶數id加100。

    UPDATE delta_table SET id = id + 100 WHERE mod(id, 2) = 0;
  • 刪除資料

    刪除偶數id的記錄。

    DELETE FROM delta_table WHERE mod(id, 2) = 0;

Merge操作

  1. 建立Source表用於Merge操作。

    CREATE TABLE newData(id INT) USING delta;
  2. 向表中插入資料。

    INSERT INTO newData VALUES 0,1,2,3,4,5,6,7,8,9;
  3. 使用newData作為Source Merge進delta_table表。如果匹配到相同id的記錄,則該id加100;如果沒有匹配到,則直接插入新記錄。

    MERGE INTO delta_table AS target
      USING newData AS source
      ON target.id = source.id
      WHEN MATCHED THEN UPDATE SET target.id = source.id + 100
      WHEN NOT MATCHED THEN INSERT *;
  4. 流式讀資料

    1. 建立流式目標表。

      CREATE TABLE stream_debug_table (id INT) USING delta;
    2. 建立流。

      CREATE SCAN stream_delta_table on delta_table USING STREAM;
      說明

      本文樣本中的delta_table為您已存在的delta表。

    3. 流式寫入目標表。

      CREATE STREAM job
      options (
        triggerType='ProcessingTime',
        checkpointLocation = '/tmp/streaming_read_cp'
      )
      INSERT INTO stream_debug_table
      SELECT * FROM stream_delta_table;

流式程式碼範例

  1. 通過SSH方式串連叢集,詳情請參見登入叢集

  2. 執行以下命令,啟動streaming-sql。

    streaming-sql
    說明

    如果您已添加DeltaLake組件,則可以直接執行streaming-sql命令。如果叢集內沒有預設配置,您可以通過以下配置來使用Delta Lake。

    streaming-sql --jars /path/to/delta-core_2.11-0.6.1.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
  3. 執行以下命令,建立流式目標表。

    CREATE TABLE stream_debug_table (id INT) USING DELTA;
  4. 執行以下命令,建立流。

    CREATE SCAN stream_delta_table on delta_table USING STREAM;
  5. 執行以下命令,以delta_table作為Source,流式寫入目標表。

    CREATE STREAM job
    options (
      triggerType='ProcessingTime',
      checkpointLocation = '/tmp/streaming_read_cp'
    )
    INSERT INTO stream_debug_table
    SELECT * FROM stream_delta_table;
  6. 您可以新開啟一個streaming-sql用戶端,向Source中插入新資料,並查詢目標表的資料。

    1. 執行以下命令,驗證Source存量寫入。

      SELECT * FROM stream_debug_table;
    2. 執行以下命令,插入新資料。

      INSERT INTO delta_table VALUES 801, 802;
    3. 執行以下命令,查詢插入的資料。

      SELECT * FROM stream_debug_table;
    4. 執行以下命令,插入新資料。

      INSERT INTO delta_table VALUES 901, 902;
    5. 執行以下命令,查詢插入的資料。

      SELECT * FROM stream_debug_table;