Hologres資料來源為您提供讀取和寫入Hologres雙向通道的功能,本文為您介紹DataWorks的Hologres資料同步的能力支援情況。
使用限制
離線讀寫
Hologres資料來源支援使用Serverless資源群組(推薦)和獨享Data Integration資源群組。
Hologres Writer不支援將資料寫入Hologres的外部表格。
Hologres資料來源連通性擷取Hologres端點的邏輯:
當前地區的Hologres執行個體,Hologres端點擷取順序:。
跨地區的Hologres執行個體,Hologres端點擷取順序:。
整庫即時寫
即時資料同步任務僅支援使用Serverless資源群組(推薦)和獨享Data Integration資源群組。
即時資料同步任務暫不支援同步沒有主鍵的表。
支援的欄位類型
欄位類型 | 離線讀(Hologres Reader) | 離線寫(Hologres Writer) | 即時寫 |
UUID | 不支援 | 不支援 | 不支援 |
CHAR | 支援 | 支援 | 支援 |
NCHAR | 支援 | 支援 | 支援 |
VARCHAR | 支援 | 支援 | 支援 |
LONGVARCHAR | 支援 | 支援 | 支援 |
NVARCHAR | 支援 | 支援 | 支援 |
LONGNVARCHAR | 支援 | 支援 | 支援 |
CLOB | 支援 | 支援 | 支援 |
NCLOB | 支援 | 支援 | 支援 |
SMALLINT | 支援 | 支援 | 支援 |
TINYINT | 支援 | 支援 | 支援 |
INTEGER | 支援 | 支援 | 支援 |
BIGINT | 支援 | 支援 | 支援 |
NUMERIC | 支援 | 支援 | 支援 |
DECIMAL | 支援 | 支援 | 支援 |
FLOAT | 支援 | 支援 | 支援 |
REAL | 支援 | 支援 | 支援 |
DOUBLE | 支援 | 支援 | 支援 |
TIME | 支援 | 支援 | 支援 |
DATE | 支援 | 支援 | 支援 |
TIMESTAMP | 支援 | 支援 | 支援 |
BINARY | 支援 | 支援 | 支援 |
VARBINARY | 支援 | 支援 | 支援 |
BLOB | 支援 | 支援 | 支援 |
LONGVARBINARY | 支援 | 支援 | 支援 |
BOOLEAN | 支援 | 支援 | 支援 |
BIT | 支援 | 支援 | 支援 |
JSON | 支援 | 支援 | 支援 |
JSONB | 支援 | 支援 | 支援 |
實現原理
離線讀
Hologres Reader通過PSQL讀取Hologres表中的資料,根據表的Shard Count發起多個並發,每個Shard對應一個Select並發任務:
Hologres在建立表時,在同一個
CREATE TABLE事務中,通過CALL set_table_property('table_name', 'shard_count', 'xx')配置表的Shard Count。預設情況下,使用資料庫預設的Shard Count,具體數值取決於Hologres執行個體的配置。
Select語句通過表的內建列hg_shard_id的Shard篩選資料。
離線寫
Hologres Writer通過資料同步架構擷取Reader產生的協議資料,根據conflictMode(衝突策略)的配置決定寫入資料時的衝突解決方案策略。
您可以通過配置conflictMode,決定新匯入的資料和已有資料的主鍵發生衝突時,如何處理新匯入的資料:
conflictMode僅適用於有主鍵的表。具體寫入原理和效能,詳情請參考技術原理。
conflictMode為Replace(整行更新)模式時,新資料覆蓋舊資料,整行所有列全部覆蓋,沒有配置列映射的欄位會強制寫入NULL。
conflictMode為Update(更新)模式時,新資料覆蓋舊資料,只覆蓋配置有列映射的欄位。
conflictMode為Ignore(忽略)模式時,忽略新資料。
建立資料來源
在進行資料同步任務開發時,您需要在DataWorks上建立一個對應的資料來源,操作流程請參見資料來源管理,詳細的配置參數解釋可在配置介面查看對應參數的文案提示。
資料同步任務開發
資料同步任務的配置入口和通用配置流程可參見下文的配置指導。
單表離線同步任務配置指導
指令碼模式配置的全量參數和指令碼Demo請參見下文的附錄:指令碼Demo與參數說明。
單表即時同步任務配置指導
離線整庫、即時整庫任務配置指導
操作流程請參見配置整庫離線同步任務、整庫即時同步任務配置。
附錄:指令碼Demo與參數說明
離線任務指令碼配置方式
如果您配置離線任務時使用指令碼模式的方式進行配置,您需要按照統一的指令碼格式要求,在任務指令碼中編寫相應的參數,詳情請參見指令碼模式配置,以下為您介紹指令碼模式下資料來源的參數配置詳情。
Reader指令碼Demo
配置非分區表
配置從Hologres非分區表讀取資料至記憶體,如下所示。
{ "transform": false, "type": "job", "version": "2.0", "steps": [ { "stepType": "holo", "parameter": { "datasource": "holo_db", "envType": 1, "column": [ "tag", "id", "title", "body" ], "where": "", "table": "holo_reader_basic_src" }, "name": "Reader", "category": "reader" }, { "stepType": "stream", "parameter": { "print": false, "fieldDelimiter": "," }, "name": "Writer", "category": "writer" } ], "setting": { "executeMode": null, "failoverEnable": null, "errorLimit": { "record": "0" }, "speed": { "concurrent": 2, "throttle": false } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }Hologres表的DDL語句,如下所示。
begin; drop table if exists holo_reader_basic_src; create table holo_reader_basic_src( tag text not null, id int not null, title text not null, body text, primary key (tag, id)); call set_table_property('holo_reader_basic_src', 'orientation', 'column'); call set_table_property('holo_reader_basic_src', 'shard_count', '3'); commit;
配置分區表
配置從Hologres分區表的子表讀取資料至記憶體。
說明請注意partition的配置。
{ "transform": false, "type": "job", "version": "2.0", "steps": [ { "stepType": "holo", "parameter": { "selectedDatabase": "public", "partition": "tag=foo", "datasource": "holo_db", "envType": 1, "column": [ "tag", "id", "title", "body" ], "tableComment": "", "where": "", "table": "public.holo_reader_basic_part_src" }, "name": "Reader", "category": "reader" }, { "stepType":"stream", "parameter":{}, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"0" }, "speed":{ "throttle":true, "concurrent":1, "mbps":"12" } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }Hologres表的DDL語句,如下所示。
begin; drop table if exists holo_reader_basic_part_src; create table holo_reader_basic_part_src( tag text not null, id int not null, title text not null, body text, primary key (tag, id)) partition by list( tag ); call set_table_property('holo_reader_basic_part_src', 'orientation', 'column'); call set_table_property('holo_reader_basic_part_src', 'shard_count', '3'); commit; create table holo_reader_basic_part_src_1583161774228 partition of holo_reader_basic_part_src for values in ('foo'); # 確保分區表子表已經建立且匯入資料。 postgres=# \d+ holo_reader_basic_part_src Table "public.holo_reader_basic_part_src" Column | Type | Collation | Nullable | Default | Storage | Stats target | Description --------+---------+-----------+----------+---------+----------+--------------+------------- tag | text | | not null | | extended | | id | integer | | not null | | plain | | title | text | | not null | | extended | | body | text | | | | extended | | Partition key: LIST (tag) Indexes: "holo_reader_basic_part_src_pkey" PRIMARY KEY, btree (tag, id) Partitions: holo_reader_basic_part_src_1583161774228 FOR VALUES IN ('foo')
Reader指令碼參數
參數 | 描述 | 是否必選 | 預設值 |
database | Hologres執行個體內部資料庫的名稱。 | 是 | 無 |
table | Hologres的表名稱,如果是分區表,請指定父表的名稱。 | 是 | 無 |
column | 定義匯入目標表的資料列,必須包含目標表的主鍵集合。例如 | 是 | 無 |
partition | 針對分區表,表示分區Column以及對應的Value,格式為 重要
| 否 | 空,表示非分區表。 |
Writer指令碼Demo
配置非分區表
配置從MySQL產生的資料匯入至Hologres普通表,樣本為通過JDBC模式匯入的配置。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "mysql", "parameter": { "envType": 0, "useSpecialSecret": false, "column": [ "<column1>", "<column2>", ......, "<columnN>" ], "tableComment": "", "connection": [ { "datasource": "<mysql_source_name>",//mysql資料來源名 "table": [ "<mysql_table_name>" ] } ], "where": "", "splitPk": "", "encoding": "UTF-8" }, "name": "Reader", "category": "reader" }, { "stepType": "holo", "parameter": { "selectedDatabase":"public", "schema": "public", "maxConnectionCount": 9, "truncate":true,//清理規則 "datasource": "<holo_sink_name>",//Hologres資料來源名稱 "conflictMode": "ignore", "envType": 0, "column": [ "<column1>", "<column2>", ......, "<columnN>" ], "tableComment": "", "table": "<holo_table_name>", "reShuffleByDistributionKey":false }, "name": "Writer", "category": "writer" } ], "setting": { "executeMode": null, "errorLimit": { "record": "0" }, "locale": "zh_CN", "speed": { "concurrent": 2,//作業並發數 "throttle": false//限流 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }Hologres表的DDL語句,如下所示。
begin; drop table if exists mysql_to_holo_test; create table mysql_to_holo_test( tag text not null, id int not null, body text not null, brrth date, primary key (tag, id)); call set_table_property('mysql_to_holo_test', 'orientation', 'column'); call set_table_property('mysql_to_holo_test', 'distribution_key', 'id'); call set_table_property('mysql_to_holo_test', 'clustering_key', 'birth'); commit;
配置分區表
說明目前Hologres僅支援LIST分區,分區Column僅支援單個Column分區,且僅支援INT4或TEXT類型。
請確認該參數和表DDL的分區配置匹配。
配置從MySQL產生的資料同步至Hologres分區表的子表。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "mysql", "parameter": { "envType": 0, "useSpecialSecret": false, "column": [ "<column1>", "<column2>", ......, "<columnN>" ], "tableComment": "", "connection": [ { "datasource": "<mysql_source_name>", "table": [ "<mysql_table_name>" ] } ], "where": "", "splitPk": "<mysql_pk>",//mysql的pk欄位 "encoding": "UTF-8" }, "name": "Reader", "category": "reader" }, { "stepType": "holo", "parameter": { "selectedDatabase": "public", "maxConnectionCount": 9, "partition": "<partition_key>",//Hologres分區鍵 "truncate": "false", "datasource": "<holo_sink_name>",//Hologres資料來源名 "conflictMode": "ignore", "envType": 0, "column": [ "<column1>", "<column2>", ......, "<columnN>" ], "tableComment": "", "table": "<holo_table_name>", "reShuffleByDistributionKey":false }, "name": "Writer", "category": "writer" } ], "setting": { "executeMode": null, "failoverEnable": null, "errorLimit": { "record": "0" }, "speed": { "concurrent": 2,//作業並發數 "throttle": false//限流 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }Hologres表的DDL語句,如下所示。
BEGIN; CREATE TABLE public.hologres_parent_table( a text , b int, c timestamp, d text, ds text, primary key(ds,b) ) PARTITION BY LIST(ds); CALL set_table_property('public.hologres_parent_table', 'orientation', 'column'); CREATE TABLE public.holo_child_1 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201215'); CREATE TABLE public.holo_child_2 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201216'); CREATE TABLE public.holo_child_3 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201217'); COMMIT;
Writer指令碼參數
參數 | 描述 | 是否必選 | 預設值 |
database | Hologres執行個體內部資料庫的名稱。 | 是 | 無 |
table | Hologres的表名稱,目前支援表名稱中包含Schema,例如 | 是 | 無 |
conflictMode | conflictMode包括Replace、Update和Ignore,詳情請參見實現原理。 | 是 | 無 |
column | 定義匯入目標表的資料列,必須包含目標表的主鍵集合。例如 | 是 | 無 |
partition | 針對分區表,表示分區Column以及對應的Value,格式為 說明
| 否 | 空,表示非分區表 |
reShuffleByDistributionKey | 在 Hologres 中,主鍵表的大量匯入預設會觸發表鎖,這限制了多個串連的並發寫入能力。開啟 reShuffle 功能可以在離線同步情境下,允許不同的任務根據資料分區鍵將資料寫入指定的 Holo shard,這樣可以實現並發批量寫入,從而顯著提升寫入效能。與傳統 JDBC 模式的即時寫入相比,啟用該功能不僅能降低 Holo 服務端的負載,還能進一步提升寫入效率。 重要 該功能僅在Serverless資源群組開啟。 | 否 | false |
truncate | 寫入Holo表之前是否需要清空目標表。
| 否 | false |