全部產品
Search
文件中心

Realtime Compute for Apache Flink:即時數倉Hologres資料攝入YAML連接器(公測中)

更新時間:Dec 13, 2024

本文為您介紹如何在資料攝入YAML作業中,使用即時數倉Hologres連接器進行資料同步。

背景資訊

即時數倉Hologres是一站式即時資料倉庫引擎,支援海量資料即時寫入、即時更新、即時分析,支援標準SQL(相容PostgreSQL協議),支援PB級資料多維分析(OLAP)與即席分析(Ad Hoc),支援高並發低延遲的線上資料服務(Serving),與MaxCompute、Flink、DataWorks深度融合,提供離線上一體化全棧數倉解決方案。Hologres YAML連接器支援的資訊如下。

類別

詳情

支援類型

資料攝入目標端(Sink)

運行模式

流模式和批模式

資料格式

暫不支援

特有監控指標

  • numRecordsOut

  • numRecordsOutPerSecond

說明

指標含義詳情,請參見監控指標說明

API種類

YAML

是否支援更新或刪除結果表資料

功能說明

功能

詳情

整庫同步

支援即時同步整庫(或者多張表)的全量和增量資料到每張對應的結果表中。

表結構變更同步

在即時同步整庫資料的同時,還支援將每張源表的表結構變更(增加列、刪除列、重新命名列等)即時同步到結果表中。

分庫分表合并

支援使用Regex定義庫名,匹配資料來源的多個分庫下的源表,合并後同步到下遊每張對應表名的結果表中。

分區表寫入

支援將上遊的一張表寫入到Hologres分區表。

類型映射

採用多種資料對應策略,將上遊資料類型映射為更寬的Hologres資料類型。

文法結構

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}

參數說明

參數

說明

資料類型

是否必填

預設值

備忘

type

sink類型。

String

固定值為hologres

name

sink名稱。

String

無。

dbname

資料庫名稱。

String

無。

username

使用者名稱,請填寫阿里雲帳號的AccessKey ID。

String

詳情請參見空間管理與操作

重要

為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變數管理

password

密碼,請填寫阿里雲帳號的AccessKey Secret。

String

詳情請參見空間管理與操作

重要

為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變數管理

endpoint

Hologres服務地址。

String

詳情請參見訪問網域名稱

jdbcRetryCount

當串連故障時,寫入和查詢的重試次數。

Integer

10

無。

jdbcRetrySleepInitMs

每次重試的固定等待時間。

Long

1000

單位為毫秒。實際重試的等待時間的計算公式為jdbcRetrySleepInitMs+retry*jdbcRetrySleepStepMs

jdbcRetrySleepStepMs

每次重試的累加等待時間。

Long

5000

單位為毫秒。實際重試的等待時間的計算公式為jdbcRetrySleepInitMs+retry*jdbcRetrySleepStepMs

jdbcConnectionMaxIdleMs

JDBC串連的空閑時間。

Long

60000

單位為毫秒。超過這個空閑時間,串連就會斷開釋放掉。

jdbcMetaCacheTTL

本機快取TableSchema資訊的到期時間。

Long

60000

單位為毫秒。

jdbcMetaAutoRefreshFactor

如果緩衝的剩餘時間小於觸發時間,則系統會自動重新整理緩衝。

Integer

4

緩衝的剩餘時間計算方法:緩衝的剩餘時間=緩衝的到期時間 - 緩衝已經存活的時間。緩衝自動重新整理後,則從0開始重新計算緩衝的存活時間。

觸發時間計算方法:jdbcMetaCacheTTL/jdbcMetaAutoRefreshFactor兩個參數的比值。

mutatetype

資料寫入模式。

String

INSERT_OR_UPDATE

如果Hologres物理表已設定主鍵,則Hologres Sink通過主鍵確保Exactly-once語義。當同主鍵資料出現多次時,您需要設定mutatetype參數確定更新結果表的方式,mutatetype取值如下:

  • INSERT_OR_IGNORE:保留首次出現的資料,忽略後續所有資料。

  • INSERT_OR_REPLACE:後出現的資料整行替換已有資料。

  • INSERT_OR_UPDATE:更新已有資料的部分列。例如一張表有a、b、c和d四個欄位,a是PK(Primary Key),寫入Hologres時唯寫入a和b兩個欄位,在PK重複的情況下,系統只會更新b欄位,c和d保持不變。

createparttable

當寫入分區表時,是否根據分區值自動建立不存在的分區表。

Boolean

false

無。

sink.delete-strategy

撤回訊息的處理方式。

String

參數取值如下:

  • IGNORE_DELETE:忽略Update Before和Delete訊息。適用於僅需插入或更新資料,而無需刪除資料的情境。

  • CHANGELOG_STANDARD:Flink架構按照 Flink SQL Changelog的工作原理運行,不忽略刪除操作,並通過先刪除資料再插入的方式執行更新操作,以確保資料準確性。適用於不涉及局部更新的情境

jdbcWriteBatchSize

JDBC模式,Hologres Sink節點資料攢批條數(不是來一條資料處理一條,而是攢一批再處理)的最大值。

Integer

256

單位為資料行數。

說明

jdbcWriteBatchSizejdbcWriteBatchByteSizejdbcWriteFlushInterval三者之間為或的關係。如果同時設定了這三個參數,則滿足其中一個,就進行寫入結果資料。

jdbcWriteBatchByteSize

JDBC模式,Hologres Sink節點資料攢批位元組數(不是來一條資料處理一條,而是攢一批再處理)的最大值。

Long

2097152(2*1024*1024)位元組,即2 MB

說明

jdbcWriteBatchSizejdbcWriteBatchByteSizejdbcWriteFlushInterval三者之間為或的關係。如果同時設定了這三個參數,則滿足其中一個,就進行寫入結果資料。

jdbcWriteFlushInterval

JDBC模式,Hologres Sink節點資料攢批寫入Hologres的最長等待時間。

Long

10000

單位為毫秒。

說明

jdbcWriteBatchSizejdbcWriteBatchByteSizejdbcWriteFlushInterval三者之間為或的關係。如果同時設定了這三個參數,則滿足其中一個,就進行寫入結果資料。

ignoreNullWhenUpdate

mutatetype='insertOrUpdate'時,是否忽略更新寫入資料中的Null值。

Boolean

false

參數取值如下:

  • false(預設值):將Null值寫到Hologres結果表裡。

  • true:忽略更新寫入資料中的Null值。

jdbcEnableDefaultForNotNullColumn

如果將Null值寫入Hologres表中Not Null且無預設值的欄位,是否允許連接器協助填充一個預設值。

Boolean

true

參數取值如下:

  • true(預設值):允許連接器填充預設值並寫入,規則如下。

    • 如果欄位是String類型,則預設寫為空白("")。

    • 如果欄位是Number類型,則預設寫為0。

    • 如果是Date、timestamp或timestamptz時間類型欄位,則預設寫為1970-01-01 00:00:00

  • false:不填充預設值,寫Null到Not Null欄位時,會拋出異常。

remove-u0000-in-text.enabled

如果寫入時字串類型包含\u0000非法字元,是否允許連接器協助去除。

Boolean

false

參數取值如下:

  • false(預設值):連接器不對資料進行操作,但碰到髒資料時寫入可能拋出如下異常,ERROR: invalid byte sequence for encoding "UTF8": 0x00

    此時需要在源表提前處理髒資料,或者在SQL中定義髒資料處理邏輯。

  • true:連接器會協助去除字串類型中的\u0000,防止寫入拋出異常。

deduplication.enabled

jdbc及jdbc_fixed模式寫入攢批過程中,是否進行去重。

Boolean

true

參數取值如下:

  • true(預設值):如果一批資料中有主鍵相同的資料,預設進行去重,只保留最後一條到達的資料。以兩個欄位,其中第一個欄位為主鍵的資料舉例:

    • INSERT (1,'a')INSERT (1,'b')兩條記錄先後到達,去重之後只保留後到達的(1,'b')寫入Hologres結果表中。

    • Hologres結果表中已經存在記錄(1,'a'),此時DELETE (1,'a')INSERT (1,'b')兩條記錄先後到達,只保留後到達的(1,'b')寫入hologres中,表現為直接更新,而不是先刪除再插入。

  • false:在攢批過程中不進行去重,如果發現新到的資料和目前攢批的資料中存在主鍵相同的情況,先將攢批資料寫入,寫入完成之後再繼續寫入新到的資料。

sink.type-normalize-strategy

資料對應策略。

String

STANDARD

當Hologres sink轉換上遊資料到Hologres類型時的策略。

  • STANDARD:根據標準將Flink CDC類型轉換為PG類型。

  • BROADEN:將Flink CDC類型轉換為更廣泛的Hologres類型。

  • ONLY_BIGINT_OR_TEXT:將所有Flink CDC類型轉換為Hologres中的BIGINT或STRING類型。

table_property.*

Hologres物理表屬性。

String

建立Hologres表時,允許在WITH參數中設定物理表屬性,合理的表屬性設定可以有助於系統高效地組織和查詢資料。

警告

table_property.distribution_key預設為主索引值,不要輕易設定,會影響寫入資料的正確性。

類型映射

通過配置項sink.type-normalize-strategy設定轉換上遊資料到Hologres類型時的策略。

說明
  • 建議您在初次開機YAML作業時開啟sink.type-normalize-strategy。如果啟動後再開啟sink.type-normalize-strategy,需要刪除下遊表並且將作業無狀態重啟才會生效。

  • 目前數群組類型僅支援INTEGER、BIGINT、FLOAT、DOUBLE、BOOLEAN、CHAR和VARCHAR。

  • Hologres不支援numeric類型作為主鍵,因此如果主鍵類型被映射為numeric,會被轉化為varchar類型。

STANDARD

當sink.type-normalize-strategy為STANDARD時,類型映射如下:

Flink CDC類型

Hologres類型

CHAR

bpchar

STRING

text

VARCHAR

text(長度大於10485760時)

varchar(長度不大於10485760時)

BOOLEAN

bool

BINARY

bytea

VARBINARY

DECIMAL

numeric

TINYINT

int2

SMALLINT

INTEGER

int4

BIGINT

int8

FLOAT

float4

DOUBLE

float8

DATE

date

TIME_WITHOUT_TIME_ZONE

time

TIMESTAMP_WITHOUT_TIME_ZONE

timestamp

TIMESTAMP_WITH_LOCAL_TIME_ZONE

timestamptz

ARRAY

各種類型的數組

MAP

不支援

ROW

不支援

BROADEN

當sink.type-normalize-strategy為BROADEN時,將Flink CDC類型轉換為更廣泛的Hologres類型。資料對應如下:

Flink CDC類型

Hologres類型

CHAR

text

STRING

VARCHAR

BOOLEAN

bool

BINARY

bytea

VARBINARY

DECIMAL

numeric

TINYINT

int8

SMALLINT

INTEGER

BIGINT

FLOAT

float8

DOUBLE

DATE

date

TIME_WITHOUT_TIME_ZONE

time

TIMESTAMP_WITHOUT_TIME_ZONE

timestamp

TIMESTAMP_WITH_LOCAL_TIME_ZONE

timestamptz

ARRAY

各種類型的數組

MAP

不支援

ROW

不支援

ONLY_BIGINT_OR_TEXT

sink.type-normalize-strategy為ONLY_BIGINT_OR_TEXT時,將所有Flink CDC類型轉換為Hologres中的BIGINT或STRING類型。類型映射如下:

Flink CDC類型

Hologres類型

TINYINT

int8

SMALLINT

INTEGER

BIGINT

BOOLEAN

text

BINARY

VARBINARY

DECIMAL

FLOAT

DOUBLE

DATE

TIME_WITHOUT_TIME_ZONE

TIMESTAMP_WITHOUT_TIME_ZONE

TIMESTAMP_WITH_LOCAL_TIME_ZONE

ARRAY

各種類型的數組

MAP

不支援

ROW

不支援

分區表寫入

Hologres Sink支援分區表寫入,搭配Transform可以將上遊資料寫入到Hologres分區表中。寫入時需要注意:

  • 分區鍵(Partition Key)必須為主鍵的一部分,如果採用上遊非主鍵中的一個作為分區表,可能會導致上下遊主鍵不一致。資料同步時,如果上下遊主鍵不一致,會導致資料不一致。

  • Hologres支援將TEXTVARCHAR以及INT類型的資料作為分區鍵(Partition Key),V1.3.22及以上版本支援將DATE類型設為分區鍵。

  • 需要設定createparttable為true, 才能自動建立分區子表,否則使用者需要手動建立分區子表。

樣本請參見分區表寫入示

表結構變更同步

CDC Yaml Pipeline作業在處理表結構變更時有不同的策略,通過pipeline層級的配置項schema.change.behavior來設定。schema.change.behavior取值有IGNORE、LENIENT、TRY_EVOLVE、EVOLVE 和 EXCEPTION。Hologres Sink目前不支援TRY_EVOLVE策略。其中LENIENT和EVOLVE涉及到表結構變更,接下來會說明如何處理不同表結構變更事件(Schema Change Event)。

LENIENT(預設)

LENIENT模式下支援的Schema變更策略詳情如下:

  • 添加可空列:會自動在結果表Schema末尾添加對應的列,並自動同步新增列的資料。

  • 刪除可空列:不會直接在結果表中刪除該列,而是將該列的資料自動填滿為NULL值。

  • 添加非空列:會自動在結果表Schema末尾添加對應的列,並自動同步新增列的資料,新增的列會預設設定為可空列,對於添加列發生之前的資料自動化佈建為NULL值。

  • 重新命名列:被看作為添加列和刪除列。直接在結果表中末尾添加重新命名後的列,並將重新命名前的列資料自動填滿為NULL值。例如,如果col_a重新命名為col_b,則會在結果表末尾添加col_b,並自動將col_a的資料填充為NULL值。

  • 列類型變更:不支援。由於Hologres不支援列類型變更,需要搭配sink.type-normalize-strategy使用。

  • 暫不支援同步以下Schema的變更:

    • 主鍵或索引等約束的變更。

    • 非空列的刪除。

    • 從NOT NULL轉為NULLABLE變更。

EVOLVE

EVOLVE模式下支援的Schema變更策略詳情如下:

  • 添加可空列:支援

  • 刪除可空列:不支援。

  • 添加非空列:會在結果表添加可空列。

  • 重新命名列:支援,會在結果表將原有列重新命名。

  • 列類型變更:不支援。由於Hologres不支援列類型變更,需要搭配sink.type-normalize-strategy使用。

  • 暫不支援同步以下Schema的變更:

    • 主鍵或索引等約束的變更。

    • 非空列的刪除。

    • 從NOT NULL轉為NULLABLE變更。

警告

在EVOLVE模式下,如果在未刪除結果表的情況下無狀態重啟,有可能出現上遊資料與結果表的結構不一致的情況導致作業失敗,需要使用者手動調整下遊表結構。

開啟EVOLVE模式樣本請參見開啟EVOLVE模式

程式碼範例

寬類型映射

通過配置項sink.type-normalize-strategy設定寬類型映射。

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

分區表寫入

將上遊時間戳記類型的create_time欄位轉化為日期類型,作為Hologres表的分區鍵。

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  createparttable: true
 
transform:
  - source-table: test_db.test_source_table
    projection: \*, DATE_FORMAT(CAST(create_time AS TIMESTAMP), 'yyyy-MM-dd') as partition_key
    primary-keys: id, create_time, partition_key
    partition-keys: partition_key
    description: add partition key 

pipeline:
  name: MySQL to Hologres Pipeline

開啟EVOLVE模式

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  createparttable: true

pipeline:
  name: MySQL to Hologres Pipeline
  schema.change.behavior: evolve

單表同步

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

整庫同步

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

分庫分表合并

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.user\.*
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN
  
route:
  - source-table: test_db.user\.*
    sink-table: test_db.user

pipeline:
  name: MySQL to Hologres Pipeline

同步到指定schema

Hologres的Schema對應MySQL的Database,可以執行結果表的Schema。

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.user\.*
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN
  
route:
  - source-table: test_db.user\.*
    sink-table: test_db2.user\.*r

pipeline:
  name: MySQL to Hologres Pipeline

不重啟同步新增表

如果想在作業啟動並執行過程中即時同步新增表,設定scan.binlog.newly-added-table.enable = true.

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  server-id: 5401-5499
  scan.binlog.newly-added-table.enabled: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

重啟新增存量表

如果想要新增同步存量表,設定scan.newly-added-table.enabled = true後重啟作業。

警告

如果作業先設定scan.binlog.newly-added-table.enabled為true捕獲新增表,不可以再通過scan.newly-added-table.enabled = true重啟捕獲存量表,否則會有資料重複發送。

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  server-id: 5401-5499
  scan.newly-added-table.enabled: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

整庫同步時排除部分表

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  tables.exclude: test_db.table1
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

相關文檔