本文為您介紹如何在資料攝入YAML作業中,使用即時數倉Hologres連接器進行資料同步。
背景資訊
即時數倉Hologres是一站式即時資料倉庫引擎,支援海量資料即時寫入、即時更新、即時分析,支援標準SQL(相容PostgreSQL協議),支援PB級資料多維分析(OLAP)與即席分析(Ad Hoc),支援高並發低延遲的線上資料服務(Serving),與MaxCompute、Flink、DataWorks深度融合,提供離線上一體化全棧數倉解決方案。Hologres YAML連接器支援的資訊如下。
類別 | 詳情 |
支援類型 | 資料攝入目標端(Sink) |
運行模式 | 流模式和批模式 |
資料格式 | 暫不支援 |
特有監控指標 |
說明 指標含義詳情,請參見監控指標說明。 |
API種類 | YAML |
是否支援更新或刪除結果表資料 | 是 |
功能說明
功能 | 詳情 |
支援即時同步整庫(或者多張表)的全量和增量資料到每張對應的結果表中。 | |
在即時同步整庫資料的同時,還支援將每張源表的表結構變更(增加列、刪除列、重新命名列等)即時同步到結果表中。 | |
支援使用Regex定義庫名,匹配資料來源的多個分庫下的源表,合并後同步到下遊每張對應表名的結果表中。 | |
支援將上遊的一張表寫入到Hologres分區表。 | |
採用多種資料對應策略,將上遊資料類型映射為更寬的Hologres資料類型。 |
文法結構
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
參數說明
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
type | sink類型。 | String | 是 | 無 | 固定值為 |
name | sink名稱。 | String | 否 | 無 | 無。 |
dbname | 資料庫名稱。 | String | 是 | 無 | 無。 |
username | 使用者名稱,請填寫阿里雲帳號的AccessKey ID。 | String | 是 | 無 | 詳情請參見空間管理與操作 重要 為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變數管理。 |
password | 密碼,請填寫阿里雲帳號的AccessKey Secret。 | String | 是 | 無 | 詳情請參見空間管理與操作 重要 為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變數管理。 |
endpoint | Hologres服務地址。 | String | 是 | 無 | 詳情請參見訪問網域名稱。 |
jdbcRetryCount | 當串連故障時,寫入和查詢的重試次數。 | Integer | 否 | 10 | 無。 |
jdbcRetrySleepInitMs | 每次重試的固定等待時間。 | Long | 否 | 1000 | 單位為毫秒。實際重試的等待時間的計算公式為 |
jdbcRetrySleepStepMs | 每次重試的累加等待時間。 | Long | 否 | 5000 | 單位為毫秒。實際重試的等待時間的計算公式為 |
jdbcConnectionMaxIdleMs | JDBC串連的空閑時間。 | Long | 否 | 60000 | 單位為毫秒。超過這個空閑時間,串連就會斷開釋放掉。 |
jdbcMetaCacheTTL | 本機快取TableSchema資訊的到期時間。 | Long | 否 | 60000 | 單位為毫秒。 |
jdbcMetaAutoRefreshFactor | 如果緩衝的剩餘時間小於觸發時間,則系統會自動重新整理緩衝。 | Integer | 否 | 4 | 緩衝的剩餘時間計算方法:緩衝的剩餘時間=緩衝的到期時間 - 緩衝已經存活的時間。緩衝自動重新整理後,則從0開始重新計算緩衝的存活時間。 觸發時間計算方法:jdbcMetaCacheTTL/jdbcMetaAutoRefreshFactor兩個參數的比值。 |
mutatetype | 資料寫入模式。 | String | 否 | INSERT_OR_UPDATE | 如果Hologres物理表已設定主鍵,則Hologres Sink通過主鍵確保Exactly-once語義。當同主鍵資料出現多次時,您需要設定mutatetype參數確定更新結果表的方式,mutatetype取值如下:
|
createparttable | 當寫入分區表時,是否根據分區值自動建立不存在的分區表。 | Boolean | 否 | false | 無。 |
sink.delete-strategy | 撤回訊息的處理方式。 | String | 否 | 無 | 參數取值如下:
|
jdbcWriteBatchSize | JDBC模式,Hologres Sink節點資料攢批條數(不是來一條資料處理一條,而是攢一批再處理)的最大值。 | Integer | 否 | 256 | 單位為資料行數。 說明 jdbcWriteBatchSize、jdbcWriteBatchByteSize和jdbcWriteFlushInterval三者之間為或的關係。如果同時設定了這三個參數,則滿足其中一個,就進行寫入結果資料。 |
jdbcWriteBatchByteSize | JDBC模式,Hologres Sink節點資料攢批位元組數(不是來一條資料處理一條,而是攢一批再處理)的最大值。 | Long | 否 | 2097152(2*1024*1024)位元組,即2 MB | 說明 jdbcWriteBatchSize、jdbcWriteBatchByteSize和jdbcWriteFlushInterval三者之間為或的關係。如果同時設定了這三個參數,則滿足其中一個,就進行寫入結果資料。 |
jdbcWriteFlushInterval | JDBC模式,Hologres Sink節點資料攢批寫入Hologres的最長等待時間。 | Long | 否 | 10000 | 單位為毫秒。 說明 jdbcWriteBatchSize、jdbcWriteBatchByteSize和jdbcWriteFlushInterval三者之間為或的關係。如果同時設定了這三個參數,則滿足其中一個,就進行寫入結果資料。 |
ignoreNullWhenUpdate | 當mutatetype='insertOrUpdate'時,是否忽略更新寫入資料中的Null值。 | Boolean | 否 | false | 參數取值如下:
|
jdbcEnableDefaultForNotNullColumn | 如果將Null值寫入Hologres表中Not Null且無預設值的欄位,是否允許連接器協助填充一個預設值。 | Boolean | 否 | true | 參數取值如下:
|
remove-u0000-in-text.enabled | 如果寫入時字串類型包含\u0000非法字元,是否允許連接器協助去除。 | Boolean | 否 | false | 參數取值如下:
|
deduplication.enabled | jdbc及jdbc_fixed模式寫入攢批過程中,是否進行去重。 | Boolean | 否 | true | 參數取值如下:
|
sink.type-normalize-strategy | 資料對應策略。 | String | 否 | STANDARD | 當Hologres sink轉換上遊資料到Hologres類型時的策略。
|
table_property.* | Hologres物理表屬性。 | String | 否 | 無 | 建立Hologres表時,允許在WITH參數中設定物理表屬性,合理的表屬性設定可以有助於系統高效地組織和查詢資料。 警告 table_property.distribution_key預設為主索引值,不要輕易設定,會影響寫入資料的正確性。 |
類型映射
通過配置項sink.type-normalize-strategy
設定轉換上遊資料到Hologres類型時的策略。
建議您在初次開機YAML作業時開啟sink.type-normalize-strategy。如果啟動後再開啟sink.type-normalize-strategy,需要刪除下遊表並且將作業無狀態重啟才會生效。
目前數群組類型僅支援INTEGER、BIGINT、FLOAT、DOUBLE、BOOLEAN、CHAR和VARCHAR。
Hologres不支援numeric類型作為主鍵,因此如果主鍵類型被映射為numeric,會被轉化為varchar類型。
STANDARD
當sink.type-normalize-strategy為STANDARD時,類型映射如下:
Flink CDC類型 | Hologres類型 |
CHAR | bpchar |
STRING | text |
VARCHAR | text(長度大於10485760時) |
varchar(長度不大於10485760時) | |
BOOLEAN | bool |
BINARY | bytea |
VARBINARY | |
DECIMAL | numeric |
TINYINT | int2 |
SMALLINT | |
INTEGER | int4 |
BIGINT | int8 |
FLOAT | float4 |
DOUBLE | float8 |
DATE | date |
TIME_WITHOUT_TIME_ZONE | time |
TIMESTAMP_WITHOUT_TIME_ZONE | timestamp |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | timestamptz |
ARRAY | 各種類型的數組 |
MAP | 不支援 |
ROW | 不支援 |
BROADEN
當sink.type-normalize-strategy為BROADEN時,將Flink CDC類型轉換為更廣泛的Hologres類型。資料對應如下:
Flink CDC類型 | Hologres類型 |
CHAR | text |
STRING | |
VARCHAR | |
BOOLEAN | bool |
BINARY | bytea |
VARBINARY | |
DECIMAL | numeric |
TINYINT | int8 |
SMALLINT | |
INTEGER | |
BIGINT | |
FLOAT | float8 |
DOUBLE | |
DATE | date |
TIME_WITHOUT_TIME_ZONE | time |
TIMESTAMP_WITHOUT_TIME_ZONE | timestamp |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | timestamptz |
ARRAY | 各種類型的數組 |
MAP | 不支援 |
ROW | 不支援 |
ONLY_BIGINT_OR_TEXT
當sink.type-normalize-strategy
為ONLY_BIGINT_OR_TEXT時,將所有Flink CDC類型轉換為Hologres中的BIGINT或STRING類型。類型映射如下:
Flink CDC類型 | Hologres類型 |
TINYINT | int8 |
SMALLINT | |
INTEGER | |
BIGINT | |
BOOLEAN | text |
BINARY | |
VARBINARY | |
DECIMAL | |
FLOAT | |
DOUBLE | |
DATE | |
TIME_WITHOUT_TIME_ZONE | |
TIMESTAMP_WITHOUT_TIME_ZONE | |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | |
ARRAY | 各種類型的數組 |
MAP | 不支援 |
ROW | 不支援 |
分區表寫入
Hologres Sink支援分區表寫入,搭配Transform可以將上遊資料寫入到Hologres分區表中。寫入時需要注意:
分區鍵(Partition Key)必須為主鍵的一部分,如果採用上遊非主鍵中的一個作為分區表,可能會導致上下遊主鍵不一致。資料同步時,如果上下遊主鍵不一致,會導致資料不一致。
Hologres支援將TEXT、VARCHAR以及INT類型的資料作為分區鍵(Partition Key),V1.3.22及以上版本支援將DATE類型設為分區鍵。
需要設定createparttable為true, 才能自動建立分區子表,否則使用者需要手動建立分區子表。
樣本請參見分區表寫入示。
表結構變更同步
CDC Yaml Pipeline作業在處理表結構變更時有不同的策略,通過pipeline層級的配置項schema.change.behavior
來設定。schema.change.behavior
取值有IGNORE、LENIENT、TRY_EVOLVE、EVOLVE 和 EXCEPTION。Hologres Sink目前不支援TRY_EVOLVE策略。其中LENIENT和EVOLVE涉及到表結構變更,接下來會說明如何處理不同表結構變更事件(Schema Change Event)。
LENIENT(預設)
LENIENT模式下支援的Schema變更策略詳情如下:
添加可空列:會自動在結果表Schema末尾添加對應的列,並自動同步新增列的資料。
刪除可空列:不會直接在結果表中刪除該列,而是將該列的資料自動填滿為NULL值。
添加非空列:會自動在結果表Schema末尾添加對應的列,並自動同步新增列的資料,新增的列會預設設定為可空列,對於添加列發生之前的資料自動化佈建為NULL值。
重新命名列:被看作為添加列和刪除列。直接在結果表中末尾添加重新命名後的列,並將重新命名前的列資料自動填滿為NULL值。例如,如果col_a重新命名為col_b,則會在結果表末尾添加col_b,並自動將col_a的資料填充為NULL值。
列類型變更:不支援。由於Hologres不支援列類型變更,需要搭配sink.type-normalize-strategy使用。
暫不支援同步以下Schema的變更:
主鍵或索引等約束的變更。
非空列的刪除。
從NOT NULL轉為NULLABLE變更。
EVOLVE
EVOLVE模式下支援的Schema變更策略詳情如下:
添加可空列:支援
刪除可空列:不支援。
添加非空列:會在結果表添加可空列。
重新命名列:支援,會在結果表將原有列重新命名。
列類型變更:不支援。由於Hologres不支援列類型變更,需要搭配sink.type-normalize-strategy使用。
暫不支援同步以下Schema的變更:
主鍵或索引等約束的變更。
非空列的刪除。
從NOT NULL轉為NULLABLE變更。
在EVOLVE模式下,如果在未刪除結果表的情況下無狀態重啟,有可能出現上遊資料與結果表的結構不一致的情況導致作業失敗,需要使用者手動調整下遊表結構。
開啟EVOLVE模式樣本請參見開啟EVOLVE模式。
程式碼範例
寬類型映射
通過配置項sink.type-normalize-strategy
設定寬類型映射。
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.test_source_table
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres Pipeline
分區表寫入
將上遊時間戳記類型的create_time欄位轉化為日期類型,作為Hologres表的分區鍵。
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.test_source_table
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
createparttable: true
transform:
- source-table: test_db.test_source_table
projection: \*, DATE_FORMAT(CAST(create_time AS TIMESTAMP), 'yyyy-MM-dd') as partition_key
primary-keys: id, create_time, partition_key
partition-keys: partition_key
description: add partition key
pipeline:
name: MySQL to Hologres Pipeline
開啟EVOLVE模式
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.test_source_table
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
createparttable: true
pipeline:
name: MySQL to Hologres Pipeline
schema.change.behavior: evolve
單表同步
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.test_source_table
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres Pipeline
整庫同步
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.\.*
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres Pipeline
分庫分表合并
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.user\.*
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
route:
- source-table: test_db.user\.*
sink-table: test_db.user
pipeline:
name: MySQL to Hologres Pipeline
同步到指定schema
Hologres的Schema對應MySQL的Database,可以執行結果表的Schema。
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.user\.*
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
route:
- source-table: test_db.user\.*
sink-table: test_db2.user\.*r
pipeline:
name: MySQL to Hologres Pipeline
不重啟同步新增表
如果想在作業啟動並執行過程中即時同步新增表,設定scan.binlog.newly-added-table.enable = true.
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.\.*
server-id: 5401-5499
scan.binlog.newly-added-table.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres Pipeline
重啟新增存量表
如果想要新增同步存量表,設定scan.newly-added-table.enabled = true後重啟作業。
如果作業先設定scan.binlog.newly-added-table.enabled為true捕獲新增表,不可以再通過scan.newly-added-table.enabled = true重啟捕獲存量表,否則會有資料重複發送。
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.\.*
server-id: 5401-5499
scan.newly-added-table.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres Pipeline
整庫同步時排除部分表
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.\.*
tables.exclude: test_db.table1
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres Pipeline
相關文檔
source、sink、transform和route模組的開發參考,詳情請參見資料攝入開發參考。
資料攝入YAML作業開發的操作步驟,詳情請參見資料攝入YAML作業開發(公測中)。