全部產品
Search
文件中心

Realtime Compute for Apache Flink:雲原生資料倉儲AnalyticDB MySQL版(ADB)3.0

更新時間:Dec 25, 2024

本文為您介紹如何使用雲原生資料倉儲AnalyticDB MySQL版3.0連接器。

背景資訊

雲原生資料倉儲AnalyticDB MySQL版3.0是融合資料庫、巨量資料技術於一體的雲原生企業級資料倉儲服務。AnalyticDB MySQL版支援高吞吐的資料即時增刪改、低延時的即時分析和複雜ETL,相容上下遊生態工具,可用於構建企業級報表系統、資料倉儲和資料服務引擎。

ADB MySQL 3.0連接器支援的資訊如下。

類別

詳情

支援類型

源表(公測中)、維表和結果表

說明

僅Flink計算引擎VVR 8.0.4及以上版本支援源表(公測中),源表的參數和配置詳情請參見Flink訂閱Binlog,維表和結果表參數詳情請參見WITH參數

運行模式

流模式和批模式

資料格式

暫不適用

特有監控指標

暫無

API種類

SQL

是否支援更新或刪除結果表資料

前提條件

文法結構

CREATE TEMPORARY TABLE adb_table (
  `id` INT,
  `num` BIGINT,
  PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'connector' = 'adb3.0',
  'url' = '<yourUrl>',
  'userName' = '<yourUsername>',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>'
);
重要

Flink DDL中定義的主鍵必須和AnalyticDB MySQL資料庫物理表中的主鍵保持一致,主鍵一致包括是否存在主鍵和主鍵名稱一致。如果不一致,會影響資料正確性。

WITH參數

  • 通用

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    connector

    結果表類型。

    String

    固定值為adb3.0。

    url

    JDBC串連地址。

    String

    雲原生資料倉儲AnalyticDB MySQL版資料庫的JDBC連結地址。固定格式為jdbc:mysql://<endpoint>:<port>/<databaseName>,其中:

    • endpoint和port:您可以登入AnalyticDB 控制台,單擊對應的叢集名稱,進入叢集資訊頁面,在網路資訊中擷取。

    • databaseName:雲原生資料倉儲AnalyticDB MySQL版資料庫名稱。

    userName

    使用者名稱。

    String

    無。

    password

    密碼。

    String

    無。

    tableName

    表名。

    String

    無。

    maxRetryTimes

    寫入或讀取資料失敗後,重試的最大次數。

    Integer

    10

    無。

  • 結果表專屬

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    batchSize

    一次批量寫入的條數。

    Integer

    1000

    需指定主鍵後,該參數才生效。

    bufferSize

    記憶體中緩衝的資料條數。batchSizebufferSize任一到達閾值都會觸發寫入。

    Integer

    1000

    需指定主鍵後,該參數才生效。

    flushIntervalMs

    清空緩衝的時間間隔。表示如果緩衝中的資料在等待指定時間後,依然沒有達到輸出條件,系統會自動輸出緩衝中的所有資料。

    Integer

    3000

    單位為毫秒。

    ignoreDelete

    是否忽略Delete操作。

    Boolean

    false

    參數取值如下:

    • true:忽略Delete操作。

    • false:接受Delete操作。

    replaceMode

    DDL中定義了主鍵的情況下,是否採用replace into文法插入資料。

    Boolean

    true

    該參數取值如下:

    • true:採用replace into文法插入資料。

    • false:採用insert into on duplicate key update文法插入資料。

    說明
    • 僅AnalyticDB MySQL 3.1.3.5及以上版本支援該參數。

    • 此參數僅在DDL中定義了主鍵時才生效,插入資料時採用的文法詳情如下:

      • DDL中定義了主鍵且replaceMode=true,採用replace into文法插入資料。

      • DDL中定義了主鍵且replaceMode=false,採用insert into on duplicate key update文法插入資料。

      • DDL中沒有定義主鍵,採用insert into文法插入資料。

    excludeUpdateColumns

    表示更新主索引值相同的資料時,忽略指定欄位的更新。

    String

    Null 字元串

    如果忽略指定的欄位為多個時,則需要使用英文逗號(,)分割。例如excludeUpdateColumns=column1,column2

    說明
    • 僅在replaceMode=false時,該參數才生效。在replaceMode=true時,對應欄位會被更新為null。

    • 要忽略的多個欄位需要寫在一行中,不能換行。

    connectionMaxActive

    線程池大小。

    Integer

    40

    無。

  • 維表專屬

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    cache

    緩衝策略。

    String

    ALL

    雲原生資料倉儲AnalyticDB MySQL版3.0維表支援以下三種緩衝策略:

    • None:無緩衝。

    • LRU:緩衝維表裡的部分資料。源表的每條資料都會觸發系統先在Cache中尋找資料,如果沒有找到,則去物理維表中尋找。

    • ALL(預設值):緩衝維表裡的所有資料。在Job運行前,系統會將維表中所有資料載入到Cache中,之後所有的維表尋找資料都會通過Cache進行。如果在Cache中無法找到資料,則KEY不存在,並在Cache到期後重新載入一遍全量Cache。

    適用於遠端資料表資料量小且MISS KEY在源表資料和維表JOIN時,ON條件無法關聯特別多的情境。

    說明
    • 如果使用CACHE ALL時,請注意節點記憶體大小,防止出現OOM。

    • 因為系統會非同步載入維表資料,所以在使用CACHE ALL時,需要增加維表JOIN節點的記憶體,增加的記憶體大小為遠端資料表資料量的兩倍。

    cacheSize

    緩衝大小,即緩衝多少行資料。

    Integer

    100000

    cacheSize配置和cache為LRU有關。當cache配置為LRU時,必須配置cacheSize參數。

    cacheTTLMs

    緩衝逾時時間,單位為毫秒。

    Integer

    Long.MAX_VALUE

    cacheTTLMs配置和cache配置為LRU或ALL有關:

    • 如果cache配置為LRU,則cacheTTLMs為緩衝失效的逾時時間。預設值為Long.MAX_VALUE,即代表緩衝不到期。

    • 如果cache配置為ALL,則cacheTTLMs為物理表資料被重新載入的間隔時間。預設值為Long.MAX_VALUE,即代表不重新載入物理表資料。

    說明

    如果cache配置為None,則cacheTTLMs不用配置。因為cache配置為None,表示沒有緩衝,因此不用配置緩衝逾時時間。

    maxJoinRows

    主表中每一條資料查詢維表時,匹配後最多返回的結果數。

    Integer

    1024

    如果您可以預估一條資料對應的維表資料最多為n條,則可以設定maxJoinRows='n',以確保Realtime Compute匹配處理效率。

    說明

    進行Join時,主表輸入一條資料,對應維表匹配後返回的結果總數受該參數限制。

類型映射

雲原生資料倉儲AnalyticDB MySQL版3.0欄位類型

Flink欄位類型

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(p, s) 或NUMERIC(p, s)

DECIMAL(p, s)

VARCHAR

STRING

BINARY

BYTES

DATE

DATE

TIME

TIME

DATETIME

TIMESTAMP

TIMESTAMP

TIMESTAMP

POINT

STRING

使用樣本

  • 結果表

    CREATE TEMPORARY TABLE datagen_source (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE adb_sink (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'adb3.0',
      'url' = '<yourUrl>',
      'userName' = '<yourUsername>',
      'password' = '<yourPassword>',
      'tableName' = '<yourTablename>'
    );
    
    INSERT INTO adb_sink
    SELECT * FROM datagen_source;
  • 維表

    CREATE TEMPORARY TABLE datagen_source(
      `a` INT,
      `b` VARCHAR,
      `c` STRING,
      `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE adb_dim (
      `a` INT,
      `b` VARCHAR,
      `c` VARCHAR
    ) WITH (
      'connector' = 'adb3.0',
      'url' = '<yourUrl>',
      'userName' = '<yourUsername>',
      'password' = '<yourPassword>',
      'tableName' = '<yourTablename>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      `a` INT,
      `b` VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink SELECT T.a,H.b
    FROM datagen_source AS T JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;

相關文檔

報錯:multi-statement be found