全部產品
Search
文件中心

:通過外表匯入至湖倉版

更新時間:Nov 07, 2024

AnalyticDB for MySQL在通過外表訪問並匯入MaxCompute資料時,預設使用Tunnel Record API方式。您也可以進一步選擇Tunnel Arrow API方式,相較於Tunnel Record API方式,Tunnel Arrow API方式可以列式讀取MaxCompute的資料,從而提高資料訪問和處理的效率。

前提條件

  • AnalyticDB for MySQL叢集的產品系列為湖倉版

  • MaxCompute專案與AnalyticDB for MySQL叢集位於同一地區。

  • AnalyticDB for MySQL叢集已開啟ENI訪問。

    說明

    登入雲原生資料倉儲AnalyticDB MySQL控制台,在叢集管理 > 叢集資訊網路資訊地區,開啟ENI網路開關。

  • 已添加AnalyticDB for MySQL的VPC網段到MaxCompute專案的白名單中。

    說明

    登入雲原生資料倉儲AnalyticDB MySQL控制台,在叢集資訊頁面查詢VPC ID。然後登入專用網路控制台,在專用網路頁面根據VPC ID查詢網段。設定MaxCompute白名單的操作,請參見管理IP白名單

  • 使用Tunnel Arrow API方式訪問並匯入MaxCompute資料時,AnalyticDB for MySQL叢集需為3.2.2.1及以上版本。

    說明

    查看湖倉版叢集的核心版本,請執行SELECT adb_version();。如需升級核心版本,請聯絡支援人員。

樣本資料

本文樣本中的MaxCompute專案為test_adb,樣本表person。樣本如下:

CREATE TABLE IF NOT EXISTS person (
    id INT,
    name VARCHAR(1023),
    age INT)
partitioned BY (dt string);

person表中添加分區,樣本如下:

ALTER TABLE person 
ADD 
PARTITION (dt='202207');

向分區中添加資料,樣本如下:

INSERT INTO test_adb.person 
PARTITION (dt='202207') 
VALUES (1,'james',10),(2,'bond',20),(3,'jack',30),(4,'lucy',40);

開啟Arrow API(可選)

預設情況下,AnalyticDB for MySQL叢集會使用Tunnel Record API方式訪問並匯入MaxCompute資料。 若您需要通過Tunnel Arrow API方式訪問並匯入MaxCompute資料,請先開啟Arrow API功能。開啟後,AnalyticDB for MySQL叢集會使用Tunnel Arrow API方式進行匯入。

開啟方法

您可以通過SET命令或Hint在叢集層級和查詢層級開啟Arrow API:

  • 叢集層級開啟Arrow API:

    SET ADB_CONFIG <config_name>= <value>;
  • 查詢層級開啟Arrow API:

    /*<config_name>= <value>*/ SELECT * FROM table;

Arrow API相關配置參數

參數(config_name)

說明

ODPS_TUNNEL_ARROW_ENABLED

是否開啟Arrow API。取值:

  • true:是。

  • false(預設值):否。

ODPS_TUNNEL_SPLIT_BY_SIZE_ENABLED

是否開啟動態Split切分。取值:

  • true:是。

  • false(預設值):否。

操作步驟

資料匯入方式分為常規匯入(預設)和彈性匯入。常規匯入在計算節點中讀取來源資料,然後在儲存節點中構建索引,消耗計算資源和儲存資源。彈性匯入在Serverless Spark Job中讀取來源資料和構建索引,消耗Job型資源群組的資源。僅核心版本3.1.10.0及以上且已建立Job型資源群組的叢集支援彈性匯入資料。相較於常規匯入,彈性匯入可以大幅減少資源的消耗,降低匯入處理程序中對線上讀寫業務的影響,提升資源隔離性和資料匯入效率。更多內容,請參見資料匯入方式介紹

常規匯入

  1. 進入SQL編輯器。

    1. 登入雲原生資料倉儲AnalyticDB MySQL控制台,在左上方選擇叢集所在地區。在左側導覽列,單擊集群清單,在湖倉版頁簽下,單擊目的地組群ID。

    2. 在左側導覽列,單擊作業開發 > SQL開發

  2. 建立外部資料庫。樣本如下:

    CREATE EXTERNAL DATABASE adb_external_db;
  3. 建立外表。本文樣本為test_adb

    CREATE EXTERNAL TABLE IF NOT EXISTS adb_external_db.test_adb (
        id int,
        name varchar(1023),
        age int,
        dt string
        ) ENGINE='ODPS'
    TABLE_PROPERTIES='{
    "accessid":"LTAILd4****",
    "endpoint":"http://service.cn-hangzhou.maxcompute.aliyun.com/api",
    "accesskey":"4A5Q7ZVzcYnWMQPysX****",
    "partition_column":"dt",
    "project_name":"test_adb",
    "table_name":"person"
    }';
    說明
    • AnalyticDB for MySQL外表和MaxCompute中表的欄位名稱、欄位數量、欄位順序需要一致,欄位類型需要相容。

    • 外表的參數說明,請參見CREATE EXTERNAL TABLE

  4. 查詢資料。

    SELECT * FROM adb_external_db.test_adb;

    返回結果如下:

    +------+-------+------+---------+
    | id   | name  | age  |   dt    |
    +------+-------+------+---------+
    |    1 | james |   10 |  202207 |
    |    2 | bond  |   20 |  202207 |
    |    3 | jack  |   30 |  202207 |
    |    4 | lucy  |   40 |  202207 |
    +------+-------+------+---------+
    4 rows in set (0.35 sec)
  5. 執行以下步驟將MaxCompute資料匯入至AnalyticDB for MySQL

    1. AnalyticDB for MySQL中建立資料庫,樣本如下:

      CREATE DATABASE adb_demo; 
    2. AnalyticDB for MySQL中建立表用於儲存從MaxCompute中匯入的資料,樣本如下:

      說明

      新表和步驟3中建立的外表的欄位順序和欄位數量需要一致,欄位類型相容。

      CREATE TABLE IF NOT EXISTS adb_demo.adb_import_test(
          id int,
          name string,
          age int,
          dt string
          PRIMARY KEY(id,dt)
      )
      DISTRIBUTED BY HASH(id)  
      PARTITION BY VALUE('dt'); 
    3. 向表中寫入資料,樣本如下:

      • 方式一:執行INSERT INTO匯入資料,當主鍵重複時會自動忽略當前寫入資料,不做更新,作用等同於INSERT IGNORE INTO,詳情請參見INSERT INTO。樣本如下:

        INSERT INTO adb_demo.adb_import_test
        SELECT * FROM adb_external_db.test_adb;

        如果需要將特定分區的資料匯入adb_demo.adb_import_test,可以執行:

        INSERT INTO adb_demo.adb_import_test
        SELECT * FROM adb_external_db.test_adb 
        WHERE dt = '202207'; 
      • 方式二:執行INSERT OVERWRITE INTO匯入資料,會覆蓋表中原有的資料。樣本如下:

        INSERT OVERWRITE INTO adb_demo.adb_import_test
        SELECT * FROM adb_external_db.test_adb;
      • 方式三:非同步執行INSERT OVERWRITE INTO匯入資料。通常使用SUBMIT JOB提交非同步任務,由後台調度,可以在寫入任務前增加Hint(/*+ direct_batch_load=true*/)加速寫入任務。詳情請參見非同步寫入。樣本如下:

        SUBMIT job 
        INSERT OVERWRITE INTO adb_demo.adb_import_test
        SELECT * FROM adb_external_db.test_adb;

        返回結果如下:

        +---------------------------------------+
        | job_id                                |
        +---------------------------------------+
        | 2020112122202917203100908203303****** |
        +---------------------------------------+

        關於非同步提交任務詳情,請參見非同步提交匯入任務

彈性匯入

  1. 進入SQL編輯器。

    1. 登入雲原生資料倉儲AnalyticDB MySQL控制台,在左上方選擇叢集所在地區。在左側導覽列,單擊集群清單,在湖倉版頁簽下,單擊目的地組群ID。

    2. 在左側導覽列,單擊作業開發 > SQL開發

  2. 建立資料庫。如果有已建立的資料庫,可以忽略本步驟。樣本如下:

    CREATE DATABASE adb_demo; 
  3. 建立外表。

    說明
    • AnalyticDB for MySQL外表的名稱需要和MaxCompute專案的名稱相同,否則建立外表會失敗。

    • AnalyticDB for MySQL外表和MaxCompute中表的欄位名稱、欄位數量、欄位順序需要一致,欄位類型需要相容。

    • 彈性匯入僅支援CREATE TABLE語句建立外表。

    CREATE TABLE IF NOT EXISTS test_adb
    (
        id int,
        name string,
        age int,
        dt string
    )
     ENGINE='ODPS'
     TABLE_PROPERTIES='{
     "endpoint":"http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api",
     "accessid":"LTAILd4****",
     "accesskey":"4A5Q7ZVzcYnWMQPysX****",
     "partition_column":"dt",
     "project_name":"test_adb",
     "table_name":"person"
     }';                 

    外表支援設定的參數及參數說明,請參見參數說明

  4. 查詢資料。

    SELECT * FROM adb_demo.test_adb;

    返回結果如下:

    +------+-------+------+---------+
    | id   | name  | age  |   dt    |
    +------+-------+------+---------+
    |    1 | james |   10 |  202207 |
    |    2 | bond  |   20 |  202207 |
    |    3 | jack  |   30 |  202207 |
    |    4 | lucy  |   40 |  202207 |
    +------+-------+------+---------+
    4 rows in set (0.35 sec)
  5. AnalyticDB for MySQL中建立表用於儲存從MaxCompute中匯入的資料。樣本如下:

    說明

    建立的內表和步驟3中建立的外表的欄位名稱、欄位數量、欄位順序、欄位類型必須相同。

    CREATE TABLE IF NOT EXISTS adb_import_test
    (   id int,
        name string,
        age int,
        dt string,
        PRIMARY KEY(id,dt)
    )
    DISTRIBUTED BY HASH(id)
    PARTITION BY VALUE('dt') LIFECYCLE 30;  
  6. 匯入資料。

    重要

    彈性匯入僅支援通過INSERT OVERWRITE INTO語句匯入資料。

    • 方法一:執行INSERT OVERWRITE INTO彈性匯入資料,會覆蓋表中原有的資料。樣本如下:

      /*+ elastic_load=true, elastic_load_configs=[adb.load.resource.group.name=resource_group|spark.adb.eni.vswitchId=vsw-bp12ldm83z4zu9k4d****]*/
      INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_demo.test_adb;
    • 方法二:非同步執行INSERT OVERWRITE INTO彈性匯入資料。通常使用SUBMIT JOB提交非同步任務,由後台調度。

      /*+ elastic_load=true, elastic_load_configs=[adb.load.resource.group.name=resource_group|spark.adb.eni.vswitchId=vsw-bp12ldm83z4zu9k4d****]*/
      SUBMIT JOB INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_demo.test_adb;
      重要

      非同步提交彈性匯入任務時,不支援設定優先權隊列。

      返回結果如下:

      +---------------------------------------+
      | job_id                                |
      +---------------------------------------+
      | 2023081517192220291720310090151****** |
      +---------------------------------------+

    使用SUBMIT JOB提交非同步任務後,返回結果僅表示非同步任務提交成功。您可以通過job_id終止非同步任務或查詢非同步任務狀態,判斷任務是否執行成功。具體操作,請參見非同步提交匯入任務

    Hint參數說明:

    • elastic_load:是否使用彈性匯入方式。取值:truefalse(預設值)。

    • elastic_load_configs:彈性匯入方式支援配置的參數。參數需使用方括弧([ ])括起來,且多個參數之間以豎線(|)分隔,支援配置的參數如下表所示:

      參數

      是否必填

      說明

      adb.load.resource.group.name

      執行彈性匯入任務的Job資源群組名稱。

      adb.load.job.max.acu

      單個彈性匯入任務最多使用的資源。單位為ACU,最小值為5 ACU。預設值為叢集Shard個數+1。

      執行如下語句可查詢叢集Shard個數:

      SELECT count(1) FROM information_schema.kepler_meta_shards;

      spark.driver.resourceSpec

      Spark driver的資源規格。預設值為small。取值範圍,請參見Spark資源規格列表的型號列。

      spark.executor.resourceSpec

      Spark executor的資源規格。預設值為large。取值範圍,請參見Spark資源規格列表的型號列。

      spark.adb.executorDiskSize

      Spark executor的磁碟容量,取值範圍為(0,100],單位為GiB,預設值為10 Gi。更多資訊,請參見指定Driver和Executor資源

  7. (可選)查看已提交的匯入任務是否為彈性匯入任務。

    SELECT job_name, (job_type = 3) AS is_elastic_load FROM INFORMATION_SCHEMA.kepler_meta_async_jobs WHERE job_name = "2023081818010602101701907303151******";

    返回結果如下:

    +---------------------------------------+------------------+
    | job_name                              | is_elastic_load  |
    +---------------------------------------+------------------+
    | 2023081517195203101701907203151****** |       1          |
    +---------------------------------------+------------------+

    is_elastic_load的傳回值為1,表示已提交的匯入任務是彈性匯入任務;若為0,則表示已提交的匯入任務是常規匯入任務。