全部產品
Search
文件中心

MaxCompute:使用Flink寫入資料到Delta Table

更新時間:Nov 29, 2024

當前MaxCompute為您提供了新版的Flink Connector外掛程式,新版外掛程式支援將Flink資料寫入至MaxCompute的普通表和Delta Table類型表,提高了Flink資料寫入MaxCompute的便捷性。本文為您介紹新版Flink Connector寫入MaxCompute的能力支援情況與主要操作流程。

背景資訊

  • 支援的寫入模式:

    使用新版Flink Connector將資料寫入MaxCompute時,支援通過Upsert和Insert寫入方式。其中使用Upsert時支援以下兩種資料寫入流:

    • 按照Primary Key進行分組

    • 按照分區欄位進行分組

      若分區數量過多,您可以按照分區欄位進行分組,但使用該流程可能導致資料扭曲。

  • Upsert模式下,通過Flink Connector進行資料寫入流程和參數配置建議,詳情請參見資料即時入倉實踐

  • 您可在配置Flink資料寫入MaxCompute時,通過設定Flink Connector參數指定使用哪種寫入方式,全量Connector參數介紹請參見下文的附錄:新版Flink Connector全量參數

  • Flink Upsert寫入任務的Checkpoint間隔建議設定3分鐘以上,設定太小的話,寫入效率得不到保障,並且可能引入大量小檔案。

  • MaxCompute與Realtime ComputeFlink版的欄位類型對照關係如下:

    Flink 資料類型

    MaxCompute 資料類型

    CHAR(p)

    CHAR(p)

    VARCHAR(p)

    VARCHAR(p)

    STRING

    STRING

    BOOLEAN

    BOOLEAN

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    INT

    INT

    BIGINT

    LONG

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    DECIMAL(p, s)

    DECIMAL(p, s)

    DATE

    DATE

    TIMESTAMP(9) WITHOUT TIME ZONE、TIMESTAMP_LTZ(9)

    TIMESTAMP

    TIMESTAMP(3) WITHOUT TIME ZONE、TIMESTAMP_LTZ(3)

    DATETIME

    BYTES

    BINARY

    ARRAY<T>

    LIST<T>

    MAP<K, V>

    MAP<K, V>

    ROW

    STRUCT

    說明

    Flink的TIMESTAMP資料類型不含時區,MaxCompute TIMESTAMP資料類型含時區。此差異會導致8小時的時間差。其通過使用TIMESTAMP_LTZ(9)來對齊時間戳記。

    --FlinkSQL
    CREATE TEMPORARY TABLE odps_source(
      id BIGINT NOT NULL COMMENT 'id',
      created_time TIMESTAMP NOT NULL COMMENT '建立時間',
      updated_time TIMESTAMP_LTZ(9) NOT NULL COMMENT '更新時間',
    PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
    'connector' = 'maxcompute',
    ...
    );

Flink資料寫入MaxCompute流程:自建開源Flink

  1. 準備工作:建立MaxCompute表。

    您需先建立好MaxCompute表,用於後續Flink資料寫入。以下以建立兩張表(Delta Table非分區表和分區表)作為樣本,為您示範Flink資料寫入MaxCompute的主要流程,其中表屬性設定請參考Delta Table表參數

    --建立Delta Table非分區表
    CREATE TABLE mf_flink_tt (
      id BIGINT not null,
      name STRING,
      age INT,
      status BOOLEAN, primary key (id)
    )
    tblproperties ("transactional"="true", 
                   "write.bucket.num" = "64", 
                   "acid.data.retain.hours"="12") ;
    
    --建立Delta Table分區表
    CREATE TABLE mf_flink_tt_part (
      id BIGINT not null,
      name STRING,
      age INT,
      status BOOLEAN, 
      primary key (id)
    )
      partitioned by (dd string, hh string) 
      tblproperties ("transactional"="true", 
                     "write.bucket.num" = "64", 
                     "acid.data.retain.hours"="12") ;
    
  2. 搭建開源Flink叢集。當前支援1.13、1.15、1.16和1.17版本的開源Flink,您可以選擇對應版本的Flink:

    說明
    • Flink 1.17版本可複用1.16版本。

    • 本文以Flink Connector 1.13為例,將包下載至本地環境,下載完成後進行解壓。

  3. 下載Flink Connector並添加至Flink叢集包中。

    1. 將Flink Connector Jar包下載至本地環境。

    2. 將Flink Connector Jar包添加至解壓後的Flink安裝包的lib目錄中。

      mv flink-connector-odps-1.13-shaded.jar $FLINK_HOME/lib/flink-connector-odps-1.13-shaded.jar
  4. 啟動Flink執行個體服務。

    cd $FLINK_HOME/bin
    ./start-cluster.sh
  5. 啟動Flink用戶端。

    cd $FLINK_HOME/bin
    ./sql-client.sh
  6. 建立Flink表,並配置Flink Connector參數。

    當前支援直接使用Flink SQL建立Flink表並配置參數,也支援使用Flink的DataStream API進行相關操作。兩種操作的核心樣本如下。

    使用Flink SQL

    1. 進入Flink SQL的編輯介面,執行以下命令完成建表與參數配置。

      -- 在 Flink SQL中註冊一張對應的非分區表
      CREATE TABLE mf_flink (
        id BIGINT,
        name STRING,
        age INT,
        status BOOLEAN,
        PRIMARY KEY(id) NOT ENFORCED
      ) WITH (
        'connector' = 'maxcompute',
        'table.name' = 'mf_flink_tt',
        'sink.operation' = 'upsert',
      	'odps.access.id'='LTAI5tRzd4W8cTyLZKT****',
        'odps.access.key'='gJwKaF3hK9MDAQgbO0zs****',
      	'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api',
      	'odps.project.name'='mf_mc_bj'
      );
      
      -- 在 Flink SQL 中註冊一張對應的分區表
      CREATE TABLE mf_flink_part (
        id BIGINT,
        name STRING,
        age INT,
        status BOOLEAN,
        dd STRING,
        hh STRING,
        PRIMARY KEY(id) NOT ENFORCED
      ) PARTITIONED BY (`dd`,`hh`)
      WITH (
        'connector' = 'maxcompute',
        'table.name' = 'mf_flink_tt_part',
        'sink.operation' = 'upsert',
      	'odps.access.id'='LTAI5tRzd4W8cTyLZKT****',
        'odps.access.key'='gJwKaF3hK9MDAQgbO0zs*******',
      	'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api',
      	'odps.project.name'='mf_mc_bj'
      );
    2. 向Flink表中寫入資料,並在MaxCompute表中查詢,驗證Flink資料寫入MaxCompute的結果。

      --在flink Sql用戶端中往非分區表裡插入資料
      INSERT INTO mf_flink VALUES (1,'Danny',27, false);
      
      --在Maxcompute中查詢返回
      SELECT * FROM mf_flink_tt;
      +------------+------+------+--------+
      | id         | name | age  | status |
      +------------+------+------+--------+
      | 1          | Danny | 27   | false  |
      +------------+------+------+--------+
      
      --在flink Sql用戶端中往非分區表裡插入資料
      INSERT INTO mf_flink VALUES (1,'Danny',28, false);
      --在Maxcompute中查詢返回
      SELECT * FROM mf_flink_tt;
      +------------+------+------+--------+
      | id         | name | age  | status |
      +------------+------+------+--------+
      | 1          | Danny | 28   | false  |
      +------------+------+------+--------+
      
      --在flink Sql用戶端中往分區表裡插入資料
      INSERT INTO mf_flink_part VALUES (1,'Danny',27, false, '01','01');
      --在Maxcompute中查詢返回
      SELECT * FROM mf_flink_tt_part WHERE dd=01 AND hh=01;
      +------------+------+------+--------+----+----+
      | id         | name | age  | status | dd | hh |
      +------------+------+------+--------+----+----+
      | 1          | Danny | 27   | false  | 01 | 01 |
      +------------+------+------+--------+----+----+
      
      --在flink Sql用戶端中分區表裡插入資料
      INSERT INTO mf_flink_part VALUES (1,'Danny',30, false, '01','01');
      --在Maxcompute中查詢返回
      SELECT * FROM mf_flink_tt_part WHERE dd=01 AND hh=01;
      +------------+------+------+--------+----+----+
      | id         | name | age  | status | dd | hh |
      +------------+------+------+--------+----+----+
      | 1          | Danny | 30   | false  | 01 | 01 |
      +------------+------+------+--------+----+----+

    使用DataStream API

    1. 使用DataStream介面時,需先添加以下依賴。

      <dependency>
        <groupId>com.aliyun.odps</groupId>
        <artifactId>flink-connector-maxcompute</artifactId>
                  <version>xxx</version>
                  <scope>system</scope>
                  <systemPath>${mvn_project.basedir}/lib/flink-connector-maxcompute-xxx-shaded.jar</systemPath>
      </dependency>
      說明

      使用時請將“xxx”改為相應的版本號碼。

    2. 建表與參數配置的範例程式碼如下。

      package com.aliyun.odps.flink.examples;
      
      import org.apache.flink.configuration.Configuration;
      import org.apache.flink.odps.table.OdpsOptions;
      import org.apache.flink.odps.util.OdpsConf;
      import org.apache.flink.odps.util.OdpsPipeline;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.table.api.Table;
      import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
      import org.apache.flink.table.data.RowData;
      
      public class Examples {
      
          public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.enableCheckpointing(120 * 1000);
      
              StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env);
      
              Table source = streamTableEnvironment.sqlQuery("SELECT * FROM source_table");
              DataStream<RowData> input = streamTableEnvironment.toAppendStream(source, RowData.class);
      
              Configuration config = new Configuration();
              config.set(OdpsOptions.SINK_OPERATION, "upsert");
              config.set(OdpsOptions.UPSERT_COMMIT_THREAD_NUM, 8);
              config.set(OdpsOptions.UPSERT_MAJOR_COMPACT_MIN_COMMITS, 100);
      
              OdpsConf odpsConfig = new OdpsConf("accessid",
                      "accesskey",
                      "endpoint",
                      "project",
                      "tunnel endpoint");
      
              OdpsPipeline.Builder builder = OdpsPipeline.builder();
              builder.projectName("sql2_isolation_2a")
                      .tableName("user_ledger_portfolio")
                      .partition("")
                      .configuration(config)
                      .odpsConf(odpsConfig)
                      .sink(input, false);
              env.execute();
          }
      }

Flink資料寫入MaxCompute流程:阿里雲全託管Flink

  1. 準備工作:建立MaxCompute表。

    您需先建立好MaxCompute表,用於後續Flink資料寫入。以下以建立一張Delta Table表為例。

    SET odps.sql.type.system.odps2=true;
    DROP TABLE mf_flink_upsert;
    CREATE TABLE mf_flink_upsert (
      c1 int not null, 
      c2 string, 
      gt timestamp,
      primary key (c1)
    ) 
      PARTITIONED BY (ds string)
      tblproperties ("transactional"="true",
                     "write.bucket.num" = "64", 
                     "acid.data.retain.hours"="12") ;
  2. 登入Realtime Compute控制台,查看Flink Connector資訊,Flink Connector已經載入到阿里雲全託管Flink VVP上。

  3. 通過Flink SQL作業建立Flink表,並構造Flink即時資料,完成作業開發後進行作業部署。

    在Flink的作業開發頁面,建立並編輯Flink SQL作業,以下樣本為建立一張Flink資料來源表、一張Flink臨時結果表,並自動構建即時資料產生邏輯寫入源表,通過計算邏輯將源表資料寫入臨時結果表。SQL作業開發詳細操作請參見SQL作業開發

    --建立flink資料來源表,
    CREATE TEMPORARY TABLE fake_src_table
    (
        c1 int,
        c2 VARCHAR,
        gt AS CURRENT_TIMESTAMP
    ) WITH (
      'connector' = 'faker',
      'fields.c2.expression' = '#{superhero.name}',
      'rows-per-second' = '100',
      'fields.c1.expression' = '#{number.numberBetween ''0'',''1000''}'
    );
    
    --flink建立臨時結果表
    CREATE TEMPORARY TABLE test_c_d_g 
    (
        c1 int,
        c2 VARCHAR,
        gt TIMESTAMP,
        ds varchar,
        PRIMARY KEY(c1) NOT ENFORCED
     ) PARTITIONED BY(ds)
     WITH (
        		'connector' = 'maxcompute',
        		'table.name' = 'mf_flink_upsert',
        		'sink.operation' = 'upsert',
        		'odps.access.id'='LTAI5tRzd4W8cTyL****',
        		'odps.access.key'='gJwKaF3hK9MDAQgb**********',
        		'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api',
        		'odps.project.name'='mf_mc_bj',
        		'upsert.write.bucket.num'='64'
    );
    
    --flink 計算邏輯
    INSERT INTO test_c_d_g
    SELECT  c1 AS c1,
            c2 AS c2,
            gt AS gt,
            date_format(gt, 'yyyyMMddHH') AS ds
    FROM    fake_src_table;

    其中:

    odps.end.point:使用對應Region的雲產品互連網絡Endpoint。

    upsert.write.bucket.num:與MaxCompute中建立的Delta Table表的write.bucket.num屬性值保持一致。

  4. 在MaxCompute中查詢資料,驗證Flink資料寫入MaxCompute的結果。

    SELECT * FROM mf_flink_upsert WHERE ds=2023061517;
    
    --返回,由於Flink中的資料為隨機產生,實際MaxCompute查詢結果與本樣本不一定完全一致
    +------+----+------+----+
    | c1   | c2 | gt   | ds |
    +------+----+------+----+
    | 0    | Skaar | 2023-06-16 01:59:41.116 | 2023061517 |
    | 21   | Supah Century | 2023-06-16 01:59:59.117 | 2023061517 |
    | 104  | Dark Gorilla Grodd | 2023-06-16 01:59:57.117 | 2023061517 |
    | 126  | Leader | 2023-06-16 01:59:39.116 | 2023061517 |
    

附錄:新版Flink Connector全量參數

  • 基礎參數

    參數

    是否必填

    預設值

    說明

    connector

    無預設值

    Connector類型,需設定為MaxCompute

    odps.project.name

    無預設值

    MaxCompute的Project名稱。

    odps.access.id

    無預設值

    您的阿里雲帳號AccessKey ID。您可以在訪問憑證頁面查看對應資訊。

    odps.access.key

    無預設值

    您的阿里雲帳號AccessKey Secret。您可以在訪問憑證頁面查看對應資訊。

    odps.end.point

    無預設值

    MaxCompute的Endpoint資訊。各地區的MaxCompute Endpoint請參見Endpoint

    odps.tunnel.end.point

    Tunnel服務的公網訪問連結。如果您未配置Tunnel Endpoint,Tunnel會自動路由到MaxCompute服務所在網路對應的Tunnel Endpoint。如果您配置了Tunnel Endpoint,則以配置為準,不進行自動路由。

    各地區及網路對應的Tunnel Endpoint值,請參見Endpoint

    odps.tunnel.quota.name

    無預設值

    訪問MaxCompute使用的Tunnel Quota名稱。

    table.name

    無預設值

    MaxCompute表名稱,格式為[project.][schema.]table

    odps.namespace.schema

    false

    是否使用三層模型。關於三層模型介紹,請參見Schema操作

    sink.operation

    insert

    寫入類型,取值為insertupsert

    說明

    僅MaxCompute Delta Table支援Upsert寫入。

    sink.parallelism

    無預設值

    寫入的並行度,如果不設定,則預設使用上遊資料並行度。

    說明

    請務必確保表屬性 write.bucket.num 是該配置值的整數倍,這樣可以獲得最佳的寫入效能,並且能夠最有效地節省 Sink 節點記憶體。

    sink.meta.cache.time

    400

    中繼資料快取大小。

    sink.meta.cache.expire.time

    1200

    中繼資料快取逾時時間,單位:秒(s)。

    sink.coordinator.enable

    是否開啟Coordinator模式。

  • 分區參數

    參數

    是否必填

    預設值

    說明

    sink.partition

    無預設值

    待寫入的分區名稱。

    若您使用的是動態分區,則為動態分區的上級分區名稱。

    sink.partition.default-value

    __DEFAULT_PARTITION__

    使用動態分區時的預設分區名稱。

    sink.dynamic-partition.limit

    100

    動態分區寫入時,單個Checkpoint可同時匯入的最大分區數量。

    說明

    建議不要大幅提升該參數值,因為當同時寫入的分區數量過多時,容易導致Sink節點記憶體溢出(OOM),且當並發寫入分區數超過閾值,寫入任務會報錯。

    sink.group-partition.enable

    false

    動態分區寫入時,是否按照分區進行分組。

    sink.partition.assigner.class

    無預設值

    PartitionAssigner實作類別。

  • FileCached模式寫入參數

    當動態分區數量過多時,可以使用檔案快取模式,您可以通過以下參數配置資料寫入時快取檔案資訊。

    參數

    是否必配

    預設值

    說明

    sink.file-cached.enable

    false

    是否開啟FileCached寫入。取值說明:

    • false:不開啟。

    • true:開啟。

      說明

      當動態分區數量過多時,可以使用檔案快取模式。

    sink.file-cached.tmp.dirs

    ./local

    檔案快取模式下,預設檔案快取目錄。

    sink.file-cached.writer.num

    16

    檔案快取模式下,單個Task上傳資料的並發數。

    說明

    建議不要大幅提升該參數值,因為當同時寫入的分區數量過多時,容易導致記憶體溢出(OOM)。

    sink.bucket.check-interval

    60000

    檔案快取模式下,檢查檔案大小的周期,單位:毫秒(ms)。

    sink.file-cached.rolling.max-size

    16 M

    檔案快取模式下,單個快取檔案的最大值。

    若檔案大小超過該值,會將該檔案資料上傳到服務端。

    sink.file-cached.memory

    64 M

    檔案快取模式下,寫入檔案使用的最大堆外記憶體大小。

    sink.file-cached.memory.segment-size

    128 KB

    檔案快取模式下,寫入檔案的使用的buffer大小。

    sink.file-cached.flush.always

    true

    檔案快取模式下,寫入檔案是否使用緩衝。

    sink.file-cached.write.max-retries

    3

    檔案快取模式下,上傳資料的重試次數。

  • InsertUpsert寫入參數

    Upsert寫入參數

    參數

    是否必填

    預設值

    說明

    upsert.writer.max-retries

    3

    Upsert Writer寫入Bucket失敗後,重試次數。

    upsert.writer.buffer-size

    64 m

    單個Upsert Writer資料在Flink中的緩衝大小。

    說明
    • 當所有Bucket的緩衝區大小總和達到預設閾值時,系統將自動觸發重新整理操作,將資料更新到伺服器端。

    • 一個upsert writer裡會同時寫入多個Bucket,建議提高該值,以提升寫入效率。

    • 若寫入分區較多時,會存在引發記憶體OOM風險,可考慮降低該參數值。

    upsert.writer.bucket.buffer-size

    1 m

    單個Bucket資料在Flink中的緩衝大小,當Flink伺服器使用記憶體資源緊張時,可以減小該參數值。

    upsert.write.bucket.num

    寫入表的bucket數量,必須與寫入表write.bucket.num值一致。

    upsert.write.slot-num

    1

    單個Session使用Tunnel slot數量。

    upsert.commit.max-retries

    3

    Upsert Session Commit重試次數。

    upsert.commit.thread-num

    16

    Upsert Session Commit的並行度。

    不建議將此參數值調整得過大,因為當同時進行的提交並發數越多時,會導致資源消耗增加,可能導致效能問題或資源過度消耗。

    upsert.major-compact.min-commits

    100

    發起Major Compact的最小Commit次數。

    upsert.commit.timeout

    600

    Upsert Session Commit等待逾時時間,單位:秒(s)。

    upsert.major-compact.enable

    false

    是否開啟Major Compact。

    upsert.flush.concurrent

    2

    限制單個分區允許同時寫入的最大Bucket數。

    說明

    每當一個bucket的資料重新整理時,將會佔用一個Tunnel Slot資源。

    說明

    Upsert寫入時,參數配置建議詳情,請參見Upsert寫入參數配置建議

    Insert寫入參數

    參數

    是否必配

    預設值

    說明

    insert.commit.thread-num

    16

    Commit Session的並行度。

    insert.arrow-writer.enable

    false

    是否使用Arrow格式。

    insert.arrow-writer.batch-size

    512

    Arrow Batch的最大行數。

    insert.arrow-writer.flush-interval

    100000

    Writer Flush間隔,單位毫秒(ms)。

    insert.writer.buffer-size

    64 M

    使用Buffered Writer的緩衝大小。