全部產品
Search
文件中心

DataWorks:PolarDB資料來源

更新時間:Oct 24, 2024

PolarDB資料來源為您提供讀取和寫入PolarDB雙向通道的功能,您可以通過嚮導模式和指令碼模式配置同步任務。

使用限制

離線讀寫

支援閱讀檢視表。

即時讀

來來源資料源為阿里雲PolarDB MySQL時,您需要開啟Binlog。阿里雲PolarDB MySQL是一款完全相容MySQL的雲原生資料庫,預設使用了更進階別的物理日誌代替Binlog,但為了更好地與MySQL生態融合,PolarDB支援開啟Binlog的功能。

支援的欄位類型

離線讀

PolarDB Reader針對PolarDB類型的轉換列表,如下所示。

類型分類

PolarDB資料類型

整數類

INT、TINYINT、SMALLINT、MEDIUMINT和BIGINT

浮點類

FLOAT、DOUBLE和DECIMAL

字串類

VARCHAR、CHAR、TINYTEXT、TEXT、MEDIUMTEXT和LONGTEXT

日期時間類

DATE、DATETIME、TIMESTAMP、TIME和YEAR

布爾型

BIT和BOOL

二進位類

TINYBLOB、MEDIUMBLOB、BLOB、LONGBLOB和VARBINARY

說明
  • 除上述羅欄欄位類型外,其它類型均不支援。

  • PolarDB Reader外掛程式將tinyint(1)視作整型。

離線寫

類似於PolarDB Reader ,目前PolarDB Writer支援大部分PolarDB類型,但也存在部分類型沒有支援的情況,請注意檢查您的資料類型。

PolarDB Writer針對PolarDB類型的轉換列表,如下所示。

類型分類

PolarDB資料類型

整數類

INT、TINYINT、SMALLINT、MEDIUMINT、BIGINT和YEAR

浮點類

FLOAT、DOUBLE和DECIMAL

字串類

VARCHAR、CHAR、TINYTEXT、TEXT、MEDIUMTEXT和LONGTEXT

日期時間類

DATE、DATETIME、TIMESTAMP和TIME

布爾型

BOOL

二進位類

TINYBLOB、MEDIUMBLOB、BLOB、LONGBLOB和VARBINARY

資料同步前準備

準備工作1:配置白名單

Serverless資源群組或獨享Data Integration資源群組所在的VPC網段添加至OceanBase的白名單中,詳情請參見添加白名單。

準備工作2:建立帳號並配置帳號許可權

建立帳號並配置帳號許可權。

您需要規劃一個資料庫的登入賬戶用於後續執行操作,此賬戶需擁有資料庫的 SELECT, REPLICATION SLAVE, REPLICATION CLIENT許可權。

  1. 建立帳號。

    操作詳情可參見建立和管理資料庫帳號

  2. 配置許可權。

    您可參考以下命令為帳號添加此許可權,或直接給帳號賦予SUPER許可權。

    -- CREATE USER '同步帳號'@'%' IDENTIFIED BY '同步帳號';
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO '同步帳號'@'%';

準備工作3:開啟PolarDB的開啟Binlog

操作詳情可參見開啟Binlog

建立資料來源

在進行資料同步任務開發時,您需要在DataWorks上建立一個對應的資料來源,操作流程請參見建立並管理資料來源詳細的配置參數解釋可在配置介面查看對應參數的文案提示

資料同步任務開發:PolarDB同步流程引導

資料同步任務的配置入口和通用配置流程可參見下文的配置指導。

單表離線同步任務配置指導

單表、整庫即時同步任務配置指導

操作流程請參見DataStudio側即時同步任務配置

整庫離線讀、單表/整庫全增量即時讀同步任務配置指導

操作流程請參見Data Integration側同步任務配置

常見問題

即時同步Oracle、PolarDB、MySQL任務重複報錯

附錄:指令碼Demo與參數說明

離線任務指令碼配置方式

如果您配置離線任務時使用指令碼模式的方式進行配置,您需要按照統一的指令碼格式要求,在任務指令碼中編寫相應的參數,詳情請參見通過指令碼模式配置離線同步任務,以下為您介紹指令碼模式下資料來源的參數配置詳情。

Reader指令碼Demo

單庫單表的指令碼樣本如下,詳情請參見上述參數說明。

{
    "type": "job",
    "steps": [
        {
            "parameter": {
                "datasource": "test_005",//資料來源名。
                "column": [//源端列名。
                    "id",
                    "name",
                    "age",
                    "sex",
                    "salary",
                    "interest"
                ],
                "where": "id=1001",//過濾條件。
                "splitPk": "id",//切分鍵。
                "table": "PolarDB_person",//源端表名。
              	"useReadonly": "false"//是否從備庫讀取資料。
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "parameter": {}
    ],
    "version": "2.0",//版本號碼。
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {//錯誤記錄數。
            "record": ""
        },
        "speed": {
            "concurrent": 6,//並發數。
            "throttle": true//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
      "mbps":"12",//限流,此處1mbps = 1MB/s。
        }
    }
}

Reader指令碼參數

參數

描述

是否必選

預設值

datasource

資料來源名稱,指令碼模式支援添加資料來源,此配置項填寫的內容必須要與添加的資料來源名稱保持一致。

table

選取的需要同步的表名稱。

useReadonly

如果您希望讀寫分離,從PolarDB叢集的備庫讀取資料,則此參數配置為true。不配置時,預設為false,表示從主庫讀取資料。

false

column

所配置的表中需要同步的列名集合,使用JSON的數組描述欄位資訊 。預設使用所有列配置,例如[*]。

  • 支援列裁剪,即列可以挑選部分列進行匯出。

  • 支援列換序,即列可以不按照表Schema資訊順序進行匯出。

  • 支援常量配置,您需要按照SQL文法格式,例如["id", "table","1","'mingya.wmy'","'null'", "to_char(a+1)","2.3","true"]

    • id為普通列名。

    • table為包含保留字的列名。

    • 1為整型數字常量。

    • ‘mingya.wmy’為字串常量(注意需要加上一對單引號)。

    • 'null'為字串常量。

    • to_char(a+1)為計算字串長度函數。

    • 2.3為浮點數。

    • true為布爾值。

  • column必須顯示指定同步的列集合,不允許為空白。

splitPk

PolarDB Reader進行資料幫浦時,如果指定splitPk,表示您希望使用splitPk代表的欄位進行資料分區,資料同步因此會啟動並發任務進行資料同步,從而提高資料同步的效能。

  • 推薦splitPk使用者使用表主鍵,因為表主鍵通常情況下比較均勻,因此切分出來的分區不容易出現資料熱點。

  • 目前splitPk僅支援整型資料切分,不支援字串、浮點、日期等其他類型 。如果您指定其他非支援類型,忽略plitPk功能,使用單通道進行同步。

  • 如果splitPk不填寫,包括不提供splitPk或者splitPk值為空白,資料同步視作使用單通道同步該表資料 。

splitFactor

切分因子,可以配置同步資料的切分份數,如果配置了多並發,會按照並發數 * splitFactor份來切分。例如,並發數=5,splitFactor=5,則會按照5*5=25份來切分,在5個並發線程上執行。

說明

建議取值範圍:1~100,過大會導致記憶體溢出。

5

where

篩選條件,在實際業務情境中,往往會選擇當天的資料進行同步,將where條件指定為gmt_create>$bizdate

  • where條件可以有效地進行業務增量同步處理。如果不填寫where語句,包括不提供where的key或value,資料同步均視作同步全量資料。

  • 將where條件指定為limit 10不符合WHERE子句約束,不建議使用。

querySql(進階模式,嚮導模式不提供)

在部分業務情境中,where配置項不足以描述所篩選的條件,您可以通過該配置型來自訂篩選SQL。當配置該項後,資料同步系統就會忽略columntablewhere配置項,直接使用該項配置的內容對資料進行篩選。例如需要進行多表 join 後同步資料,使用select a,b from table_a join table_b on table_a.id = table_b.id。當您配置querySql時,PolarDB Reader直接忽略columntablewhere條件的配置,querySql優先順序大於table、column、where、splitPk選項。datasource會使用它解析出使用者名稱和密碼等資訊。

Writer指令碼Demo

指令碼配置範例如下,詳情請參見上述參數說明。

{
    "type": "job",
    "steps": [
        {
            "parameter": {},
            "name": "Reader",
            "category": "reader"
        },
        {
            "parameter": {
                "postSql": [],//匯入後完成語句。
                "datasource": "test_005",//資料來源名稱。
                "column": [//目標列名。
                    "id",
                    "name",
                    "age",
                    "sex",
                    "salary",
                    "interest"
                ],
                "writeMode": "insert",//寫入模式。
                "batchSize": 256,//一次性批量提交的記錄數大小。
                "table": "PolarDB_person_copy",//目標表名。
                "preSql": []//匯入前準備語句。
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",//版本號碼。
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {//錯誤記錄數。
            "record": ""
        },
        "speed": {
            "throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
            "concurrent":6, //作業並發數。
            "mbps":"12"//限流,此處1mbps = 1MB/s。
        }
    }
}

Writer指令碼參數

  • 全量參數說明

    參數

    描述

    必選

    預設值

    datasource

    資料來源名稱,指令碼模式支援添加資料來源,此配置項填寫的內容必須要與添加的資料來源名稱保持一致。

    table

    選取的需要同步的表名稱。

    writeMode

    選擇匯入模式,可以支援:

    • insert(即嚮導模式的insert into)

    • update(即嚮導模式的on duplicate key update)

    • replace(即嚮導模式的replace into)

    不同方式的詳細介紹與情境樣本請參見下文的writeMode(主鍵衝突)參數詳解

    insert

    column

    目標表需要寫入資料的欄位,欄位之間用英文所逗號分隔。例如"column": ["id", "name", "age"]。如果要依次寫入全部列,使用(*)表示。 例如"column": [" *"]

    preSql

    執行資料同步任務之前率先執行的SQL語句。目前嚮導模式僅允許執行一條SQL語句,指令碼模式可以支援多條SQL語句,例如清除舊資料。

    postSql

    執行資料同步任務之後執行的SQL語句,目前嚮導模式僅允許執行一條SQL語句,指令碼模式可以支援多條SQL語句,例如加上某一個時間戳記。

    batchSize

    一次性批量提交的記錄數大小,該值可以極大減少資料同步系統與PolarDB的網路互動次數,並提升整體輸送量。但是該值設定過大可能會造成資料同步運行進程OOM情況。

    1,024

    updateColumn

    writeMode配置成update時,發生遇到主鍵/唯一性索引衝突時所更新的欄位。欄位之間用英文逗號所分隔,例如"updateColumn": ["name", "age"]

    說明

    目前僅支援PolarDB for MySQL。

  • writeMode(主鍵衝突)參數詳解

    對比介紹

    insert(即嚮導模式的insert into)

    update(即嚮導模式的on duplicate key update)

    replace(即嚮導模式的replace into)

    處理策略

    當主鍵或唯一性索引衝突時,衝突行不寫入目標表,以髒資料的形式體現。

    沒有遇到主鍵或唯一性索引衝突時,與insert into行為一致。衝突時會用新行替換已經指定的欄位的語句,寫入資料至目標表。

    沒有遇到主鍵或唯一性索引衝突時,與insert into行為一致。衝突時會先刪除原有行,再插入新行。即新行會替換原有行的所有欄位。

    資料樣本

    • 源表資料

      +----+---------+-----+
      | id | name    | age |
      +----+---------+-----+
      | 1  | zhangsan| 1   |
      | 2  | lisi    |     |
      +----+---------+-----+
    • 目標表未經處理資料

      +----+---------+-----+
      | id | name    | age |
      +----+---------+-----+
      | 2  | wangwu  |     |
      +----+---------+-----+
    • 運行任務後,成功寫入目標表1條資料,髒資料1條。

      +----+---------+-----+
      | id | name    | age |
      +----+---------+-----+
      | 1  | zhangsan| 1   |
      | 2  | wangwu  |     |
      +----+---------+-----+
    • 情境1:任務配置部分欄位:"column": ["id","name"]

      • 源表資料

        +----+---------+-----+
        | id | name    | age |
        +----+---------+-----+
        | 1  | zhangsan| 1   |
        | 2  | lisi    |     |
        +----+---------+-----+
      • 目標表未經處理資料

        +----+---------+-----+
        | id | name    | age |
        +----+---------+-----+
        | 2  | wangwu  |  3  |
        +----+---------+-----+
      • 運行任務後,成功寫入目標表2條資料,髒資料0條。

        +----+---------+-----+
        | id | name    | age |
        +----+---------+-----+
        | 1  | zhangsan| 1   |
        | 2  | lisi    | 3   |
        +----+---------+-----+
    • 情境2:任務配置所有欄位,"column": ["id","name","age"]

      • 源表資料

        +----+---------+-----+
        | id | name    | age |
        +----+---------+-----+
        | 1  | zhangsan| 1   |
        | 2  | lisi    |     |
        +----+---------+-----+
      • 目標表未經處理資料

        +----+---------+-----+
        | id | name    | age |
        +----+---------+-----+
        | 2  | wangwu  |  3  |
        +----+---------+-----+
      • 運行任務後,成功寫入目標表2條資料,髒資料0條。

        +----+---------+-----+
        | id | name    | age |
        +----+---------+-----+
        | 1  | zhangsan| 1   |
        | 2  | lisi    |     |
        +----+---------+-----+
    • 源表資料

      +----+---------+-----+
      | id | name    | age |
      +----+---------+-----+
      | 1  | zhangsan| 1   |
      | 2  | lisi    |     |
      +----+---------+-----+
    • 目標表未經處理資料

      +----+---------+-----+
      | id | name    | age |
      +----+---------+-----+
      | 2  | wangwu  |  3  |
      +----+---------+-----+
    • 運行任務後,成功寫入目標表2條資料,髒資料0條。

      +----+---------+-----+
      | id | name    | age |
      +----+---------+-----+
      | 1  | zhangsan| 1   |
      | 2  | lisi    |     |
      +----+---------+-----+