全部產品
Search
文件中心

DataWorks:AnalyticDB for PostgreSQL資料來源

更新時間:Jun 19, 2024

AnalyticDB for PostgreSQL資料來源提供讀取和寫入AnalyticDB for PostgreSQL的雙向功能,本文為您介紹DataWorks的AnalyticDB for PostgreSQL資料同步的能力支援情況。

使用限制

離線同步支援閱讀檢視表。

支援的版本

支援版本最高至7.0(含)。

支援的欄位類型

離線讀

AnalyticDB for PostgreSQL Reader支援大部分AnalyticDB for PostgreSQL類型,但也存在部分類型沒有支援的情況,請注意檢查您的資料類型。

AnalyticDB for PostgreSQL Reader針對AnalyticDB for PostgreSQL的類型轉換列表,如下所示。

類型分類

AnalyticDB for PostgreSQL資料類型

整數類

BIGINT、BIGSERIAL、INTEGER、SMALLINT、SERIAL和GEOMETRY

浮點類

DOUBLE、PRECISION、MONEY、NUMERIC和REAL

字串類

VARCHAR、CHAR、TEXT、BIT和INET

日期時間類

DATE、TIME和TIMESTAMP

布爾型

BOOL

二進位類

BYTEA

離線寫

AnalyticDB for PostgreSQL Writer支援大部分AnalyticDB for PostgreSQL類型,但也存在部分類型沒有支援的情況,請注意檢查您的類型。

AnalyticDB for PostgreSQL Writer針對AnalyticDB for PostgreSQL的類型轉換列表,如下所示。

類型分類

AnalyticDB for PostgreSQL資料類型

LONG

BIGINT、BIGSERIAL、INTEGER、SMALLINT和SERIAL

DOUBLE

DOUBLE、PRECISION、MONEY、NUMERIC和REAL

STRING

VARCHAR、CHAR、TEXT、BIT、INET和GEOMETRY

DATE

DATE、TIME和TIMESTAMP

BOOLEAN

BOOL

BYTES

BYTEA

說明

MONEY、INET和BIT需要您使用a_inet::varchar類似的文法進行轉換。

資料同步任務開發

AnalyticDB for PostgreSQL資料同步任務的配置入口和通用配置流程指導可參見下文的配置指導,詳細的配置參數解釋可在配置介面查看對應參數的文案提示。

建立資料來源

在進行資料同步任務開發時,您需要在DataWorks上建立一個對應的資料來源,操作流程請參見建立並管理資料來源

單表離線同步任務配置指導

整庫離線讀同步配置指導

操作流程請參見Data Integration側同步任務配置

附錄:指令碼Demo與參數說明

附錄:離線任務指令碼配置方式

如果您配置離線任務時使用指令碼模式的方式進行配置,您需要在任務指令碼中按照指令碼的統一格式要求編寫指令碼中的reader參數和writer參數,指令碼模式的統一要求請參見通過指令碼模式配置離線同步任務,以下為您介紹指令碼模式下的資料來源的Reader參數和Writer參數的指導詳情。

Reader指令碼Demo

{
    "type": "job",
    "steps": [
        {
            "parameter": {
                "datasource": "test_004",//資料來源名稱。
                "column": [//源端表的列名。
                    "id",
                    "name",
                    "sex",
                    "salary",
                    "age"
                ],
                "where": "id=1001",//過濾條件。
                "splitPk": "id",//切分鍵。
                "table": "public.person"//源端表名。
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "parameter": {},
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",//版本號碼
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {//錯誤記錄數。
            "record": ""
        },
        "speed": {
            "concurrent": 6,//並發數。
            "throttle": true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
           "mbps":"12"//限流,此處1mbps = 1MB/s。
        }
    }
}

Reader指令碼參數

參數

描述

是否必選

預設值

datasource

資料來源名稱,指令碼模式支援添加資料來源,該配置項輸入的內容必須和添加的資料來源名稱保持一致。

table

選取的需要同步的表名稱。

column

所配置的表中需要同步的列名集合,使用JSON的數組描述欄位資訊 。預設使用所有列配置,例如[*]

  • 支援列裁剪,即列可以挑選部分列進行匯出。

  • 支援列換序,即列可以不按照表Schema資訊順序進行匯出。

  • 支援常量配置,您需要按照SQL文法格式。例如,["id", "table","1","'mingya.wmy'","'null'", "to_char(a+1)","2.3","true"]

    • id為普通列名。

    • table為包含保留字的列名。

    • 1為整型數字常量。

    • ‘mingya.wmy’為字串常量(注意需要加上一對單引號)。

    • 'null'為字串常量。

    • to_char(a+1)為計算字串長度函數。

    • 2.3為浮點數。

    • true為布爾值。

  • column必須顯示指定同步的列集合,不允許為空白。

splitPk

AnalyticDB for PostgreSQL Reader進行資料幫浦時,如果指定splitPk,表示您希望使用splitPk代表的欄位進行資料分區。資料同步會啟動並發任務進行資料同步,以提高資料同步的效能。

  • 通常表主鍵較為均勻,切分出的分區不易出現資料熱點,所以推薦splitPk使用者使用表主鍵。

  • 目前splitPk僅支援整型資料切分,不支援字串、浮點、日期等其他類型 。如果您指定其他非支援類型,忽略splitPk功能,使用單通道進行同步。

  • 如果不填寫splitPk,包括不提供splitPk或者splitPk值為空白,資料同步視作使用單通道同步該表資料 。

where

篩選條件,AnalyticDB for PostgreSQLReader根據指定的columntablewhere條件拼接SQL,並根據該SQL進行資料幫浦。例如測試時,可以將where條件指定實際業務情境,往往會選擇當天的資料進行同步,將where條件指定為id>2 and sex=1

  • where條件可以有效地進行業務增量同步處理。

  • where條件不配置或者為空白,視作全表同步資料。

querySql(進階模式,嚮導模式不提供)

在部分業務情境中,where配置項不足以描述所篩選的條件,您可以通過該配置型來自訂篩選SQL。當配置此項後,資料同步系統就會忽略columntable等配置項,直接使用該項配置的內容對資料進行篩選。例如需要進行多表join後同步資料,使用select a,b from table_a join table_b on table_a.id = table_b.id

當您配置querySql時,AnalyticDB for PostgreSQL Reader直接忽略columntablewhere條件的配置。

fetchSize

該配置項定義了外掛程式和資料庫伺服器端每次批量資料擷取條數,該值決定了Data Integration和伺服器端的網路互動次數,能夠提升資料幫浦效能。

說明

fetchSize值過大(>2048)可能造成資料同步進程OOM。

512

Writer指令碼Demo

{
    "type": "job",
    "steps": [
        {
            "parameter": {},
            "name": "Reader",
            "category": "reader"
        },
        {
            "parameter": {
                "postSql": [],//匯入後的完成語句。
                "datasource": "test_004",//資料來源名。
                "column": [//目標表的列名。
                    "id",
                    "name",
                    "sex",
                    "salary",
                    "age"
                ],
                "table": "public.person",//目標表的表名。
                "preSql": []//匯入前的準備語句。
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",//版本號碼。
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {//錯誤記錄數。
            "record": ""
        },
        "speed": {
            "throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
            "concurrent":6, //作業並發數。
            "mbps":"12"//限流
        }
    }
}

Writer指令碼參數

參數

描述

是否必選

預設值

datasource

資料來源名稱,指令碼模式支援添加資料來源,此配置項填寫的內容必須要與添加的資料來源名稱保持一致。

table

選取的需要同步的表名稱。

writeMode

選擇匯入模式,支援insert、copy和upsert方式。

  • insert:執行PostgreSQL的insert into...values... 語句,將資料寫入至PostgreSQL中。建議優先選擇該模式。

  • copy:PostgreSQL提供copy命令,用於表與檔案(標準輸出,標準輸入)之間的相互複製。Data Integration支援使用copy from,將資料載入到表中。建議您在遇到效能問題時再嘗試使用該模式。

  • upsert:寫入資料發生衝突後,按照conflictMode參數的設定進行新舊資料的處理。

insert

conflictMode

writeModeupsert,寫入資料至PostgreSQL出現主鍵或唯一性索引衝突時,可選擇如下衝突處理策略:

  • replace:寫入資料發生衝突後,使用待同步的新資料覆蓋原有的舊資料。

  • ignore:寫入資料發生衝突後,忽略新資料,並保留原有舊資料。

說明

當前僅支援通過指令碼模式配置衝突處理策略。

replace

column

目標表需要寫入資料的欄位,欄位之間用英文逗號分隔。例如"column":["id","name","age"]。如果要依次寫入全部列,使用*表示,例如"column":["*"]。

preSql

執行資料同步任務之前率先執行的SQL語句。目前嚮導模式僅允許執行一條SQL語句,指令碼模式可以支援多條SQL語句,例如清除舊資料。

postSql

執行資料同步任務之後執行的SQL語句。目前嚮導模式僅允許執行一條SQL語句,指令碼模式可以支援多條SQL語句,例如加上某一個時間戳記。

batchSize

一次性批量提交的記錄數大小,該值可以極大減少Data Integration與AnalyticDB for PostgreSQL的網路互動次數,並提升整體輸送量。但是該值設定過大可能會造成Data Integration運行進程OOM情況。

1,024