全部產品
Search
文件中心

MaxCompute:使用阿里雲Flink(流式資料轉送)

更新時間:Feb 28, 2024

Realtime ComputeFlink版內建外掛程式支援通過批量資料通道寫入MaxCompute,受到批量資料通道並發數及隱藏檔數影響,內建版本外掛程式會有效能瓶頸。MaxCompute提供了使用流式資料通道的Flink外掛程式,支援使用Flink在高並發、高QPS情境下寫入MaxCompute。

前提條件

背景資訊

Realtime ComputeFlink版可以調用MaxCompute SDK中的介面將資料寫入緩衝區,當緩衝區的大小超過指定的大小(預設為1 MB)或每隔指定的時間間隔時,將資料上傳至MaxCompute結果表中。

說明

建議Flink同步MaxCompute並發數大於32或Flush間隔小於60秒的情境下,使用MaxCompute自訂外掛程式。其他情境可以隨意選擇Flink內建外掛程式和MaxCompute自訂外掛程式。

MaxCompute與Realtime ComputeFlink版的欄位類型對照關係如下。

MaxCompute欄位類型

Realtime ComputeFlink版欄位類型

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

DATETIME

TIMESTAMP

TIMESTAMP

TIMESTAMP

VARCHAR

VARCHAR

STRING

VARCHAR

DECIMAL

DECIMAL

BINARY

VARBINARY

使用限制

該功能的使用限制如下:

  • 本外掛程式僅支援Blink 3.2.1及以上版本。

  • MaxCompute中的聚簇表不支援作為MaxCompute結果表。

文法樣本

您需要在Flink控制台新增作業,建立MaxCompute結果表。

說明

DDL語句中定義的欄位需要與MaxCompute物理表中的欄位名稱、順序以及類型保持一致,否則可能導致在MaxCompute物理表中查詢的資料為/n

命令樣本如下:

create table odps_output(
    id INT,
    user_name VARCHAR,
    content VARCHAR
) with (
    type ='custom',
    class = 'com.alibaba.blink.customersink.MaxComputeStreamTunnelSink',
    endpoint = '<YourEndPoint>',
    project = '<YourProjectName>',
    `table` = '<YourtableName>',
    access_id = '<yourAccessKeyId>',
    access_key = '<yourAccessKeySecret>',
    `partition` = 'ds=2018****'
);

WITH參數

參數

說明

是否必填

備忘

type

結果表的類型。

固定值為custom

class

外掛程式入口類。

固定值為com.alibaba.blink.customersink.MaxComputeStreamTunnelSink

endpoint

MaxCompute服務地址。

參見各地區Endpoint對照表(外網串連方式)

tunnel_endpoint

MaxCompute Tunnel服務的串連地址。

參見各地區Endpoint對照表(外網串連方式)

說明

VPC環境下必填。

project

MaxCompute專案名稱。

table

MaxCompute物理表名稱。

access_id

可以訪問MaxCompute專案的AccessKey ID。

access_key

AccessKey ID對應的AccessKey Secret。

partition

分區表的分區名稱。

如果表為分區表則必填:

  • 固定分區

    例如`partition` = 'ds=20180905'表示將資料寫入分區ds= 20180905

  • 動態分區

    如果不明文顯示分區的值,則會根據寫入資料中的分區列具體的值,寫入到不同的分區中。例如`partition`='ds'表示根據ds欄位的值寫入分區。

    如果要建立多級動態分區,With參數中Partition的欄位順序和結果表的DDL中的分區欄位順序,必須與物理表一致,各個分區欄位之間使用英文逗號(,)分隔。

    說明
    • 動態分區列需要顯式寫在建表語句中。

    • 對於動態分區欄位為空白的情況,如果資料來源中ds=nullds='',則會建立ds=NULL的分區。

enable_dynamic_partition

設定是否開啟動態分區機制。

預設值為False。

dynamic_partition_limit

設定最大並發分區數。動態分區模式會為每個分區分配一個緩衝區,緩衝區大小通過flush_batch_size參數控制,所以動態分區模式最大會佔用分區數量×緩衝區大小的記憶體。例如100個分區,每個分區1 MB,則最大佔用記憶體為100 MB。

預設值為100。系統記憶體中會維護一個分區到Writer的Map,如果這個Map的大小超過了dynamicPartitionLimit的值,系統會通過LRU(Least Recently Used)的規則嘗試淘汰沒有資料寫入的分區。如果所有分區都有資料寫入,則會出現dynamic partition limit exceeded: 100報錯。

flush_batch_size

資料緩衝區大小,單位位元組。緩衝區資料寫滿後會觸發Flush操作,將資料發送到MaxCompute。

預設值為1048576,即1 MB。

flush_interval_ms

緩衝區Flush間隔,單位毫秒。

MaxCompute Sink寫入資料時,先將資料放到MaxCompute的緩衝區中,等緩衝區溢位或每隔一段時間(flush_interval_ms)時,再把緩衝區中的資料寫到目標 MaxCompute表。

預設值為-1,即不設定主動Flush間隔。

flush_retry_count

資料Flush失敗重試次數,在緩衝區Flush失敗的情境下自動重試。

預設值為10,即重試10次。

flush_retry_interval_sec

Flush失敗重試的時間間隔,單位秒。

預設值為1,即1秒。

flush_retry_strategy

Flush失敗重試策略,多次重試的時間間隔增長策略,配合flush_retry_interval_sec使用。包含如下三種策略:

  • constant:常數時間,即每次稍候再試使用固定時間間隔。

  • linear:線性增長,即每次稍候再試時間軸性增長,例如flush_retry_interval_sec設定為1,flush_retry_count設定為5,多次重試時間間隔為1、2、3、4、5秒。

  • exponential:指數增長。例如flush_retry_interval_sec設定為1,flush_retry_count設定為5,多次重試中間間隔為1、2、4、8、16秒。

預設值為constant,即常數時間間隔。

類型映射

MaxCompute欄位類型

Realtime ComputeFlink版欄位類型

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

DATETIME

TIMESTAMP

TIMESTAMP

TIMESTAMP

VARCHAR

VARCHAR

STRING

VARCHAR

DECIMAL

DECIMAL

程式碼範例

包含MaxCompute結果表的Realtime ComputeFlink版作業程式碼範例如下:

  • 寫入固定分區

    create table source (
       id INT,
       len INT,
       content VARCHAR
    ) with (
       type = 'random'
    );
    create table odps_sink (
       id INT,
       len INT,
       content VARCHAR
    ) with (
       type='custom',
       class = 'com.alibaba.blink.customersink.MaxComputeStreamTunnelSink',
       endpoint = '<yourEndpoint>', 
       project = '<yourProjectName>',
       `table` = '<yourTableName>',
       accessId = '<yourAccessId>',
       accessKey = '<yourAccessPassword>',
       `partition` = 'ds=20180418'
    );
    insert into odps_sink 
    select 
       id, len, content 
    from source;
  • 寫入動態分區

    create table source (
       id INT,
       len INT,
       content VARCHAR,
       c TIMESTAMP 
    ) with (
       type = 'random'
    );
    create table odps_sink (
       id INT,
       len INT,
       content VARCHAR,
       ds VARCHAR                        --動態分區列需要顯式寫在建表語句中。
    ) with (
       type = 'odps',
       endpoint = '<yourEndpoint>', 
       project = '<yourProjectName>',
       `table` = '<yourTableName>',
       accessId = '<yourAccessId>',
       accessKey = '<yourAccessPassword>',
       `partition`='ds'                 --不寫分區的值,表示根據ds欄位的值寫入不同分區。
       ,enable_dynamic_partition = 'true' --啟用動態分區。
       ,dynamic_partition_limit='50' --最大並發分區數50。
       ,flush_batch_size = '524288' --緩衝區512 KB。
       ,flush_interval_ms = '60000' --Flush間隔60秒。
       ,flush_retry_count = '5' --Flush失敗重試5次。
       ,flush_retry_interval_sec = '2' --失敗稍候再試單位2秒。
       ,flush_retry_strategy = 'linear' --連續失敗重試時間間隔線性增長。
    );
    insert into odps_sink 
    select 
       id, 
       len, 
       content,
       date_dormat(c, 'yyMMdd') as ds
    from source;