支援的版本
Hologres支援的版本:0.7、0.8、0.9、0.10、1.1、1.2、1.3。
支援的欄位類型
欄位類型 | 離線讀(Hologres Reader) | 離線寫(Hologres Writer) | 即時寫 |
UUID | 不支援 | 不支援 | 不支援 |
CHAR | 支援 | 支援 | 支援 |
NCHAR | 支援 | 支援 | 支援 |
VARCHAR | 支援 | 支援 | 支援 |
LONGVARCHAR | 支援 | 支援 | 支援 |
NVARCHAR | 支援 | 支援 | 支援 |
LONGNVARCHAR | 支援 | 支援 | 支援 |
CLOB | 支援 | 支援 | 支援 |
NCLOB | 支援 | 支援 | 支援 |
SMALLINT | 支援 | 支援 | 支援 |
TINYINT | 支援 | 支援 | 支援 |
INTEGER | 支援 | 支援 | 支援 |
BIGINT | 支援 | 支援 | 支援 |
NUMERIC | 支援 | 支援 | 支援 |
DECIMAL | 支援 | 支援 | 支援 |
FLOAT | 支援 | 支援 | 支援 |
REAL | 支援 | 支援 | 支援 |
DOUBLE | 支援 | 支援 | 支援 |
TIME | 支援 | 支援 | 支援 |
DATE | 支援 | 支援 | 支援 |
TIMESTAMP | 支援 | 支援 | 支援 |
BINARY | 支援 | 支援 | 支援 |
VARBINARY | 支援 | 支援 | 支援 |
BLOB | 支援 | 支援 | 支援 |
LONGVARBINARY | 支援 | 支援 | 支援 |
BOOLEAN | 支援 | 支援 | 支援 |
BIT | 支援 | 支援 | 支援 |
JSON | 支援 | 支援 | 支援 |
JSONB | 支援 | 支援 | 支援 |
實現原理
離線讀寫
Hologres Reader通過PSQL讀取Hologres表中的資料,根據表的Shard Count發起多個並發,每個Shard對應一個Select並發任務:
Hologres在建立表時,在同一個CREATE TABLE
事務中,通過CALL set_table_property('table_name', 'shard_count', 'xx')
配置表的Shard Count。
預設情況下,使用資料庫預設的Shard Count,具體數值取決於Hologres執行個體的配置。
Select語句通過表的內建列hg_shard_id的Shard篩選資料。
離線寫
Hologres Writer通過資料同步架構擷取Reader產生的協議資料,根據conflictMode(衝突策略)的配置決定寫入資料時的衝突解決方案策略。
您可以通過配置conflictMode,決定新匯入的資料和已有資料的主鍵發生衝突時,如何處理新匯入的資料:
重要 conflictMode僅適用於有主鍵的表。具體寫入原理和效能,詳情請參考技術原理。
conflictMode為Replace(整行更新)模式時,新資料覆蓋舊資料,整行所有列全部覆蓋,沒有配置列映射的欄位會強制寫NULL。
conflictMode為Update(更新)模式時,新資料覆蓋舊資料,只覆蓋配置有列映射的欄位。
conflictMode為Ignore(忽略)模式時,忽略新資料。
建立資料來源
在進行資料同步任務開發時,您需要在DataWorks上建立一個對應的資料來源,操作流程請參見建立並管理資料來源,詳細的配置參數解釋可在配置介面查看對應參數的文案提示。
資料同步任務開發
資料同步任務的配置入口和通用配置流程可參見下文的配置指導。
附錄:指令碼Demo與參數說明
離線任務指令碼配置方式
如果您配置離線任務時使用指令碼模式的方式進行配置,您需要按照統一的指令碼格式要求,在任務指令碼中編寫相應的參數,詳情請參見通過指令碼模式配置離線同步任務,以下為您介紹指令碼模式下資料來源的參數配置詳情。
Reader指令碼Demo
配置非分區表
配置從Hologres非分區表讀取資料至記憶體,如下所示。
{
"type":"job",
"version":"2.0",//版本號碼。
"steps":[
{
"stepType":"holo",//外掛程式名。
"parameter":{
"endpoint": "instance-id-region-endpoint.hologres.aliyuncs.com:port",
"accessId": "***************", //訪問Hologres的accessId。
"accessKey": "*******************", //訪問Hologres的accessKey。
"database": "postgres",
"table": "holo_reader_****",
"column" : [ //欄位。
"tag",
"id",
"title"
]
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//錯誤記錄數。
},
"speed":{
"throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
"concurrent":1,//作業並發數。
"mbps":"12"//限流,此處1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Hologres表的DDL語句,如下所示。
begin;
drop table if exists holo_reader_basic_src;
create table holo_reader_basic_src(
tag text not null,
id int not null,
title text not null,
body text,
primary key (tag, id));
call set_table_property('holo_reader_basic_src', 'orientation', 'column');
call set_table_property('holo_reader_basic_src', 'shard_count', '3');
commit;
配置分區表
配置從記憶體產生的資料同步至Hologres分區表的子表。
{
"type":"job",
"version":"2.0",//版本號碼。
"steps":[
{
"stepType":"holo",//外掛程式名。
"parameter":{
"endpoint": "instance-id-region-endpoint.hologres.aliyuncs.com:port",
"accessId": "***************", //訪問Hologres的accessId。
"accessKey": "*******************", //訪問Hologres的accessKey。
"database": "postgres",
"table": "holo_reader_basic_****",
"partition": "tag=foo",
"column" : [
"*"
],
"fetchSize": "100"
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//錯誤記錄數。
},
"speed":{
"throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
"concurrent":1,//作業並發數。
"mbps":"12"//限流,此處1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Hologres表的DDL語句,如下所示。
begin;
drop table if exists holo_reader_basic_part_src;
create table holo_reader_basic_part_src(
tag text not null,
id int not null,
title text not null,
body text,
primary key (tag, id))
partition by list( tag );
call set_table_property('holo_reader_basic_part_src', 'orientation', 'column');
call set_table_property('holo_reader_basic_part_src', 'shard_count', '3');
commit;
create table holo_reader_basic_part_src_1583161774228 partition of holo_reader_basic_part_src for values in ('foo');
# 確保分區表子表已經建立且匯入資料。
postgres=# \d+ holo_reader_basic_part_src
Table "public.holo_reader_basic_part_src"
Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
--------+---------+-----------+----------+---------+----------+--------------+-------------
tag | text | | not null | | extended | |
id | integer | | not null | | plain | |
title | text | | not null | | extended | |
body | text | | | | extended | |
Partition key: LIST (tag)
Indexes:
"holo_reader_basic_part_src_pkey" PRIMARY KEY, btree (tag, id)
Partitions: holo_reader_basic_part_src_1583161774228 FOR VALUES IN ('foo')
Reader指令碼參數
參數 | 描述 | 是否必選 | 預設值 |
endpoint | 目標互動式分析(Hologres)執行個體對應的endpoint,格式為instance-id-region-endpoint.hologres.aliyuncs.com:port 。您可以從互動式分析執行個體的管理頁面擷取。 endpoint包括傳統網路、公網和VPC三種網路類型,請根據Data Integration資源群組和Hologres執行個體所在的網路環境選擇正確的endpoint類型,否則會出現網路不通或者效能受限的情況: 傳統網路樣本:instance-id-region-endpoint-internal.hologres.aliyuncs.com:port 公網樣本:instance-id-region-endpoint.hologres.aliyuncs.com:port VPC樣本:instance-id-region-endpoint-vpc.hologres.aliyuncs.com:port
通常建議Data Integration資源群組和Hologres執行個體配在同一個地區的同一個可用性區域,以保證網路連接埠連通,實現最大效能。 | 是 | 無 |
accessId | 訪問Hologres的accessId。 | 是 | 無 |
accessKey | 訪問Hologres的accessKey,請確保該金鑰組目標表有寫入許可權。 | 是 | 無 |
database | Hologres執行個體內部資料庫的名稱。 | 是 | 無 |
table | Hologres的表名稱,如果是分區表,請指定父表的名稱。 | 是 | 無 |
column | 定義匯入目標表的資料列,必須包含目標表的主鍵集合。例如["*"] 表示全部列。 | 是 | 無 |
partition | 針對分區表,表示分區Column以及對應的Value,格式為column=value 。 | 否 | 空,表示非分區表。 |
fetchSize | 指定使用Select語句一次性讀取資料的條數。 | 否 | 1,000 |
Writer指令碼Demo
配置非分區表
配置從記憶體產生的資料匯入至Hologres普通表,樣本為通過JDBC模式匯入的配置。
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "mysql",
"parameter": {
"envType": 0,
"datasource": "<mysql_source_name>",
"column": [
"<column1>",
"<column2>",
......,
"<columnN>"
],
"connection": [
{
"datasource": "<mysql_source_name>",//mysql資料來源名
"table": [
"<mysql_table_name>"
]
}
],
"where": "",
"splitPk": "",
"encoding": "UTF-8"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "holo",
"parameter": {
"maxConnectionCount": 9,
"datasource": "<holo_sink_name>",//Hologres資料來源名稱
"truncate":true,//清理規則。
"conflictMode": "ignore",
"envType": 0,
"column": [
"<column1>",
"<column2>",
......,
"<columnN>"
],
"table": "<holo_table_name>"
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"executeMode": null,
"errorLimit": {
"record": ""
},
"speed": {
"concurrent": 2,//作業並發數
"throttle": false//限流
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
Hologres表的DDL語句,如下所示。
begin;
drop table if exists mysql_to_holo_test;
create table mysql_to_holo_test(
tag text not null,
id int not null,
body text not null,
brrth date,
primary key (tag, id));
call set_table_property('mysql_to_holo_test', 'orientation', 'column');
call set_table_property('mysql_to_holo_test', 'distribution_key', 'id');
call set_table_property('mysql_to_holo_test', 'clustering_key', 'birth');
commit;
配置分區表
配置從記憶體產生的資料同步至Hologres分區表的子表。
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "mysql",
"parameter": {
"envType": 0,
"datasource": "<mysql_source_name>",
"column": [
"<column1>",
"<column2>",
......,
"<columnN>"
],
"connection": [
{
"datasource": "<mysql_source_name>",
"table": [
"<mysql_table_name>"
]
}
],
"where": "",
"splitPk": "<mysql_pk>",//mysql的pk欄位
"encoding": "UTF-8"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "holo",
"parameter": {
"maxConnectionCount": 9,
"partition": "<partition_key>",//Hologres分區鍵
"datasource": "<holo_sink_name>",//Hologres資料來源名
"conflictMode": "ignore",
"envType": 0,
"column": [
"<column1>",
"<column2>",
......,
"<columnN>"
],
"table": "<holo_table_name>"
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"executeMode": null,
"errorLimit": {
"record": ""
},
"speed": {
"concurrent": 2,//作業並發數
"throttle": false//限流
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
Hologres表的DDL語句,如下所示。
BEGIN;
CREATE TABLE public.hologres_parent_table(
a text ,
b int,
c timestamp,
d text,
ds text,
primary key(ds,b)
)
PARTITION BY LIST(ds);
CALL set_table_property('public.hologres_parent_table', 'orientation', 'column');
CREATE TABLE public.holo_child_1 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201215');
CREATE TABLE public.holo_child_2 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201216');
CREATE TABLE public.holo_child_3 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201217');
COMMIT;
Writer指令碼參數
參數 | 描述 | 是否必選 | 預設值 |
endpoint | 目標互動式分析(Hologres)執行個體對應的endpoint,格式為instance-id-region-endpoint.hologres.aliyuncs.com:port 。您可以從互動式分析執行個體的管理頁面擷取。 endpoint包括公網、傳統網路和VPC三種網路類型,請根據Data Integration資源群組和Hologres執行個體所在的網路環境選擇正確的endpoint類型,否則會出現網路不通或者效能受限的情況: 公網樣本:instance-id-region-endpoint.hologres.aliyuncs.com:port 傳統網路樣本:instance-id-region-endpoint-internal.hologres.aliyuncs.com:port VPC樣本:instance-id-region-endpoint-vpc.hologres.aliyuncs.com:port
通常建議Data Integration資源群組和Hologres執行個體在同一個地區的同一個可用性區域,以確保網路連通,實現最大效能。 | 是 | 無 |
accessId | 訪問Hologres的accessId。 | 是 | 無 |
accessKey | 訪問Hologres的accessKey,請確保該金鑰組目標表有寫入許可權。 | 是 | 無 |
database | Hologres執行個體內部資料庫的名稱。 | 是 | 無 |
table | Hologres的表名稱,目前支援表名稱中包含Schema,例如schema_name.table_name 。 | 是 | 無 |
conflictMode | conflictMode包括Replace、Update和Ignore,詳情請參見實現原理。 | 是 | 無 |
column | 定義匯入目標表的資料列,必須包含目標表的主鍵集合。例如["*"] 表示全部列。 | 是 | 無 |
partition | 針對分區表,表示分區Column以及對應的Value,格式為column=value 。 | 否 | 空,表示非分區表 |
truncate | 寫入Holo表之前是否需要清空目標表。 | 否 | false |