全部產品
Search
文件中心

DataWorks:Hologres資料來源

更新時間:Nov 08, 2024

Hologres資料來源為您提供讀取和寫入Hologres雙向通道的功能,本文為您介紹DataWorks的Hologres資料同步的能力支援情況。

支援的版本

Hologres支援的版本:0.7、0.8、0.9、0.10、1.1、1.2、1.3。

使用限制

離線讀寫

  • Hologres資料來源支援使用Serverless資源群組(推薦)獨享Data Integration資源群組

  • Hologres Writer不支援寫入資料至Hologres的外部表格。

  • Hologres資料來源連通性擷取Hologres端點的邏輯:

    • 當前地區的Hologres執行個體,Hologres端點擷取順序:any Tunnel > single Tunnel > Public(公網)

    • 跨地區的Hologres執行個體,Hologres端點擷取順序:Public(公網) > single Tunnel

整庫即時寫

支援的欄位類型

欄位類型

離線讀(Hologres Reader)

離線寫(Hologres Writer)

即時寫

UUID

不支援

不支援

不支援

CHAR

支援

支援

支援

NCHAR

支援

支援

支援

VARCHAR

支援

支援

支援

LONGVARCHAR

支援

支援

支援

NVARCHAR

支援

支援

支援

LONGNVARCHAR

支援

支援

支援

CLOB

支援

支援

支援

NCLOB

支援

支援

支援

SMALLINT

支援

支援

支援

TINYINT

支援

支援

支援

INTEGER

支援

支援

支援

BIGINT

支援

支援

支援

NUMERIC

支援

支援

支援

DECIMAL

支援

支援

支援

FLOAT

支援

支援

支援

REAL

支援

支援

支援

DOUBLE

支援

支援

支援

TIME

支援

支援

支援

DATE

支援

支援

支援

TIMESTAMP

支援

支援

支援

BINARY

支援

支援

支援

VARBINARY

支援

支援

支援

BLOB

支援

支援

支援

LONGVARBINARY

支援

支援

支援

BOOLEAN

支援

支援

支援

BIT

支援

支援

支援

JSON

支援

支援

支援

JSONB

支援

支援

支援

實現原理

離線讀寫

Hologres Reader通過PSQL讀取Hologres表中的資料,根據表的Shard Count發起多個並發,每個Shard對應一個Select並發任務:

  • Hologres在建立表時,在同一個CREATE TABLE事務中,通過CALL set_table_property('table_name', 'shard_count', 'xx')配置表的Shard Count。

    預設情況下,使用資料庫預設的Shard Count,具體數值取決於Hologres執行個體的配置。

  • Select語句通過表的內建列hg_shard_id的Shard篩選資料。

離線寫

Hologres Writer通過資料同步架構擷取Reader產生的協議資料,根據conflictMode(衝突策略)的配置決定寫入資料時的衝突解決方案策略。

您可以通過配置conflictMode,決定新匯入的資料和已有資料的主鍵發生衝突時,如何處理新匯入的資料:

重要

conflictMode僅適用於有主鍵的表。具體寫入原理和效能,詳情請參考技術原理

  • conflictModeReplace(整行更新)模式時,新資料覆蓋舊資料,整行所有列全部覆蓋,沒有配置列映射的欄位會強制寫NULL。

  • conflictModeUpdate(更新)模式時,新資料覆蓋舊資料,只覆蓋配置有列映射的欄位。

  • conflictModeIgnore(忽略)模式時,忽略新資料。

建立資料來源

在進行資料同步任務開發時,您需要在DataWorks上建立一個對應的資料來源,操作流程請參見建立並管理資料來源詳細的配置參數解釋可在配置介面查看對應參數的文案提示

資料同步任務開發

資料同步任務的配置入口和通用配置流程可參見下文的配置指導。

單表離線同步任務配置指導

單表、整庫即時同步任務配置指導

操作流程請參見DataStudio側即時同步任務配置

單表、整庫全增量即時寫任務配置指導

操作流程請參見Data Integration側同步任務配置

附錄:指令碼Demo與參數說明

離線任務指令碼配置方式

如果您配置離線任務時使用指令碼模式的方式進行配置,您需要按照統一的指令碼格式要求,在任務指令碼中編寫相應的參數,詳情請參見通過指令碼模式配置離線同步任務,以下為您介紹指令碼模式下資料來源的參數配置詳情。

Reader指令碼Demo

  • 配置非分區表

    • 配置從Hologres非分區表讀取資料至記憶體,如下所示。

      {
        "type":"job",
        "version":"2.0",//版本號碼。
        "steps":[
          {
            "stepType":"holo",//外掛程式名。
            "parameter":{
              "endpoint": "instance-id-region-endpoint.hologres.aliyuncs.com:port",
              "accessId": "***************", //訪問Hologres的accessId。
              "accessKey": "*******************", //訪問Hologres的accessKey。
              "database": "postgres",
              "table": "holo_reader_****",
              "column" : [ //欄位。
                "tag",
                "id",
                "title"
              ]
            },
            "name":"Reader",
            "category":"reader"
          },
          {
            "stepType":"stream",
            "parameter":{},
            "name":"Writer",
            "category":"writer"
          }
        ],
        "setting":{
          "errorLimit":{
            "record":"0"//錯誤記錄數。
          },
          "speed":{
            "throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
            "concurrent":1,//作業並發數。 
                        "mbps":"12"//限流,此處1mbps = 1MB/s。
          }
        },
        "order":{
          "hops":[
            {
              "from":"Reader",
              "to":"Writer"
            }
          ]
        }
      }
    • Hologres表的DDL語句,如下所示。

      begin;
      drop table if exists holo_reader_basic_src;
      create table holo_reader_basic_src(
        tag text not null, 
        id int not null, 
        title text not null, 
        body text, 
        primary key (tag, id));
        call set_table_property('holo_reader_basic_src', 'orientation', 'column');
        call set_table_property('holo_reader_basic_src', 'shard_count', '3');
      commit;
  • 配置分區表

    • 配置從記憶體產生的資料同步至Hologres分區表的子表。

      說明

      請注意partition的配置。

      {
        "type":"job",
        "version":"2.0",//版本號碼。
        "steps":[
          {
            "stepType":"holo",//外掛程式名。
            "parameter":{
              "endpoint": "instance-id-region-endpoint.hologres.aliyuncs.com:port",
              "accessId": "***************", //訪問Hologres的accessId。
              "accessKey": "*******************", //訪問Hologres的accessKey。
              "database": "postgres",
              "table": "holo_reader_basic_****",
              "partition": "tag=foo",
              "column" : [
                "*"
              ],
              "fetchSize": "100"
            },
            "name":"Reader",
            "category":"reader"
          },
          {
            "stepType":"stream",
            "parameter":{},
            "name":"Writer",
            "category":"writer"
          }
        ],
        "setting":{
          "errorLimit":{
            "record":"0"//錯誤記錄數。
          },
          "speed":{
            "throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
            "concurrent":1,//作業並發數。
                        "mbps":"12"//限流,此處1mbps = 1MB/s。
          }
        },
        "order":{
          "hops":[
            {
              "from":"Reader",
              "to":"Writer"
            }
          ]
        }
      }
    • Hologres表的DDL語句,如下所示。

      begin;
      drop table if exists holo_reader_basic_part_src;
      create table holo_reader_basic_part_src(
        tag text not null, 
        id int not null, 
        title text not null, 
        body text, 
        primary key (tag, id))
        partition by list( tag );
        call set_table_property('holo_reader_basic_part_src', 'orientation', 'column');
        call set_table_property('holo_reader_basic_part_src', 'shard_count', '3');
      commit;
      
      create table holo_reader_basic_part_src_1583161774228 partition of holo_reader_basic_part_src for values in ('foo');
      
      # 確保分區表子表已經建立且匯入資料。
      postgres=# \d+ holo_reader_basic_part_src
                               Table "public.holo_reader_basic_part_src"
       Column |  Type   | Collation | Nullable | Default | Storage  | Stats target | Description 
      --------+---------+-----------+----------+---------+----------+--------------+-------------
       tag    | text    |           | not null |         | extended |              | 
       id     | integer |           | not null |         | plain    |              | 
       title  | text    |           | not null |         | extended |              | 
       body   | text    |           |          |         | extended |              | 
      Partition key: LIST (tag)
      Indexes:
          "holo_reader_basic_part_src_pkey" PRIMARY KEY, btree (tag, id)
      Partitions: holo_reader_basic_part_src_1583161774228 FOR VALUES IN ('foo')

Reader指令碼參數

參數

描述

是否必選

預設值

endpoint

目標互動式分析(Hologres)執行個體對應的endpoint,格式為instance-id-region-endpoint.hologres.aliyuncs.com:port。您可以從互動式分析執行個體的管理頁面擷取。

endpoint包括傳統網路、公網和VPC三種網路類型,請根據Data Integration資源群組和Hologres執行個體所在的網路環境選擇正確的endpoint類型,否則會出現網路不通或者效能受限的情況:

  • 傳統網路樣本:instance-id-region-endpoint-internal.hologres.aliyuncs.com:port

  • 公網樣本:instance-id-region-endpoint.hologres.aliyuncs.com:port

  • VPC樣本:instance-id-region-endpoint-vpc.hologres.aliyuncs.com:port

通常建議Data Integration資源群組和Hologres執行個體配在同一個地區的同一個可用性區域,以保證網路連接埠連通,實現最大效能。

accessId

訪問Hologres的accessId

accessKey

訪問Hologres的accessKey,請確保該金鑰組目標表有寫入許可權。

database

Hologres執行個體內部資料庫的名稱。

table

Hologres的表名稱,如果是分區表,請指定父表的名稱。

column

定義匯入目標表的資料列,必須包含目標表的主鍵集合。例如["*"]表示全部列。

partition

針對分區表,表示分區Column以及對應的Value,格式為column=value

重要
  • 目前Hologres僅支援LIST分區,分區Column僅支援單個Column分區,且僅支援INT4或TEXT類型。

  • 請確認該參數和表DDL的分區配置匹配。

  • 請確認對應的子表已經建立,且已經匯入資料。

,表示非分區表。

fetchSize

指定使用Select語句一次性讀取資料的條數。

1,000

Writer指令碼Demo

  • 配置非分區表

    • 配置從記憶體產生的資料匯入至Hologres普通表,樣本為通過JDBC模式匯入的配置。

      {
          "type": "job",
          "version": "2.0",
          "steps": [
              {
                  "stepType": "mysql",
                  "parameter": {
                      "envType": 0,
                          "datasource": "<mysql_source_name>",
                      "column": [
                          "<column1>",
                          "<column2>",
                          ......,
                          "<columnN>"
                      ],
                      "connection": [
                          {
                              "datasource": "<mysql_source_name>",//mysql資料來源名
                              "table": [
                                  "<mysql_table_name>"
                              ]
                          }
                      ],
                      "where": "",
                      "splitPk": "",
                      "encoding": "UTF-8"
                  },
                  "name": "Reader",
                  "category": "reader"
              },
              {
                  "stepType": "holo",
                  "parameter": {
                      "maxConnectionCount": 9,
                      "datasource": "<holo_sink_name>",//Hologres資料來源名稱
                      "truncate":true,//清理規則。
                      "conflictMode": "ignore",
                      "envType": 0,
                      "column": [
                          "<column1>",
                          "<column2>",
                          ......,
                          "<columnN>"
                      ],
                      "table": "<holo_table_name>"
                  },
                  "name": "Writer",
                  "category": "writer"
              }
          ],
          "setting": {
              "executeMode": null,
              "errorLimit": {
                  "record": ""
              },
              "speed": {
                  "concurrent": 2,//作業並發數
                  "throttle": false//限流
              }
          },
          "order": {
              "hops": [
                  {
                      "from": "Reader",
                      "to": "Writer"
                  }
              ]
          }
      }
    • Hologres表的DDL語句,如下所示。

      begin;
      drop table if exists mysql_to_holo_test;
      create table mysql_to_holo_test(
        tag text not null,
        id int not null,
        body text not null,
        brrth date,
        primary key (tag, id));
        call set_table_property('mysql_to_holo_test', 'orientation', 'column');
        call set_table_property('mysql_to_holo_test', 'distribution_key', 'id');
        call set_table_property('mysql_to_holo_test', 'clustering_key', 'birth');
      commit;
  • 配置分區表

    說明
    • 目前Hologres僅支援LIST分區,分區Column僅支援單個Column分區,且僅支援INT4或TEXT類型。

    • 請確認該參數和表DDL的分區配置匹配。

    • 配置從記憶體產生的資料同步至Hologres分區表的子表。

      {
        "type": "job",
        "version": "2.0",
        "steps": [
          {
            "stepType": "mysql",
            "parameter": {
              "envType": 0,
              "datasource": "<mysql_source_name>",
              "column": [
                "<column1>",
                "<column2>",
                  ......,
                "<columnN>"
              ],
              "connection": [
                {
                  "datasource": "<mysql_source_name>",
                  "table": [
                    "<mysql_table_name>"
                  ]
                }
              ],
              "where": "",
              "splitPk": "<mysql_pk>",//mysql的pk欄位
              "encoding": "UTF-8"
            },
            "name": "Reader",
            "category": "reader"
          },
          {
            "stepType": "holo",
            "parameter": {
              "maxConnectionCount": 9,
              "partition": "<partition_key>",//Hologres分區鍵
              "datasource": "<holo_sink_name>",//Hologres資料來源名
              "conflictMode": "ignore",
              "envType": 0,
              "column": [
                "<column1>",
                "<column2>",
                  ......,
                "<columnN>"
              ],
              "table": "<holo_table_name>"
            },
            "name": "Writer",
            "category": "writer"
          }
        ],
        "setting": {
          "executeMode": null,
          "errorLimit": {
            "record": ""
          },
          "speed": {
            "concurrent": 2,//作業並發數
            "throttle": false//限流
          }
        },
        "order": {
          "hops": [
            {
              "from": "Reader",
              "to": "Writer"
            }
          ]
        }
      }
    • Hologres表的DDL語句,如下所示。

      BEGIN;
      CREATE TABLE public.hologres_parent_table(
        a text ,
        b int,
        c timestamp,
        d text,
        ds text,
        primary key(ds,b)
        )
        PARTITION BY LIST(ds);
      CALL set_table_property('public.hologres_parent_table', 'orientation', 'column');
      CREATE TABLE public.holo_child_1 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201215');
      CREATE TABLE public.holo_child_2 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201216');
      CREATE TABLE public.holo_child_3 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201217');
      COMMIT;

Writer指令碼參數

參數

描述

是否必選

預設值

endpoint

目標互動式分析(Hologres)執行個體對應的endpoint,格式為instance-id-region-endpoint.hologres.aliyuncs.com:port。您可以從互動式分析執行個體的管理頁面擷取。

endpoint包括公網、傳統網路和VPC三種網路類型,請根據Data Integration資源群組和Hologres執行個體所在的網路環境選擇正確的endpoint類型,否則會出現網路不通或者效能受限的情況:

  • 公網樣本:instance-id-region-endpoint.hologres.aliyuncs.com:port

  • 傳統網路樣本:instance-id-region-endpoint-internal.hologres.aliyuncs.com:port

  • VPC樣本:instance-id-region-endpoint-vpc.hologres.aliyuncs.com:port

通常建議Data Integration資源群組和Hologres執行個體在同一個地區的同一個可用性區域,以確保網路連通,實現最大效能。

accessId

訪問Hologres的accessId

accessKey

訪問Hologres的accessKey,請確保該金鑰組目標表有寫入許可權。

database

Hologres執行個體內部資料庫的名稱。

table

Hologres的表名稱,目前支援表名稱中包含Schema,例如schema_name.table_name

conflictMode

conflictMode包括ReplaceUpdateIgnore,詳情請參見實現原理

column

定義匯入目標表的資料列,必須包含目標表的主鍵集合。例如["*"]表示全部列。

partition

針對分區表,表示分區Column以及對應的Value,格式為column=value

說明
  • 目前Hologres僅支援LIST分區,分區Column僅支援單個Column分區,且僅支援INT4或TEXT類型。

  • 請確認該參數和表DDL的分區配置匹配。

,表示非分區表

truncate

寫入Holo表之前是否需要清空目標表。

  • true:清空目標表。

    說明
    • 目前僅支援清空非分區表和靜態分區表,不支援清空動態分區表,如果您是動態分區表,並且設定了參數值為true,同步任務將會異常退出。

    • 如果您是靜態分區表,並設定了參數值為true,則會清空該分區子表資料,不會清空父表資料。

  • false:不清空目標表。

false