全部產品
Search
文件中心

DataWorks:MySQL資料來源

更新時間:Oct 25, 2024

MySQL資料來源為您提供讀取和寫入MySQL的雙向通道,本文為您介紹DataWorks的MySQL資料同步的能力支援情況。

支援的MySQL版本

  • 離線讀寫:

    支援MySQL 5.5.x、MySQL 5.6.x、MySQL 5.7.x、MySQL 8.0.x,相容Amazon RDS for MySQLAzure MySQLAmazon Aurora MySQL

    離線同步支援閱讀檢視表。

  • 即時讀取:

    Data Integration即時讀取MySQL資料是基於即時訂閱MySQL實現的,當前僅支援即時同步MySQL 5.5.x、MySQL 5.6.x、MySQL 5.7.x、MySQL 8.0.x(非8.0新特性,比如functional index,僅相容原有功能)版本的MySQL資料,相容Amazon RDS for MySQLAzure MySQLAmazon Aurora MySQL

    重要

    如果需要同步DRDS的MySQL,請不要將DRDS的MySQL配置為MySQL資料來源,您可以參考配置DRDS資料來源文檔直接將其配置為DRDS資料來源。

使用限制

即時讀

  • 不支援同步MySQL唯讀庫執行個體的資料。

  • 不支援同步含有Functional index的表。

  • 不支援XA ROLLBACK。

    針對已經XA PREPARE的交易資料,即時同步會將其同步到目標端,如果XA ROLLBACK,即時同步不會針對XA PREPARE的資料做復原寫入的操作。若要處理XA ROLLBACK情境,需要手動將XA ROLLBACK的表從即時同步任務中移除,再添加表後重新進行同步。

  • 僅支援同步MySQL伺服器Binlog配置格式為ROW。

  • 即時同步不會同步被串聯刪除的關聯表記錄。

  • 對於Amazon Aurora MySQL資料庫,需要串連到您的主/寫資料庫,因為AWS不允許在Aurora MySQL的唯讀副本上啟用binlog功能。即時同步任務需要binlog來執行累加式更新。

  • 即時同步線上DDL變更僅支援通過Data Management對MySQL表進行加列(Add Column)線上DDL變更。

離線讀

MySQL Reader外掛程式在進行分庫分表等多表同步時,若要對單表進行切分,則需要滿足任務並發數大於表個數這一條件,否則切分的Task數目等於表的個數。

支援的欄位類型

各版本MySQL的全量欄位類型請參見MySQL官方文檔。以下以MySQL 8.0.x為例,為您羅列當前主要欄位的支援情況。

欄位類型

離線讀(MySQL Reader)

離線寫(MySQL Writer)

即時讀

即時寫

TINYINT

支援

支援

支援

支援

SMALLINT

支援

支援

支援

支援

INTEGER

支援

支援

支援

支援

BIGINT

支援

支援

支援

支援

FLOAT

支援

支援

支援

支援

DOUBLE

支援

支援

支援

支援

DECIMAL/NUMBERIC

支援

支援

支援

支援

REAL

不支援

不支援

不支援

不支援

VARCHAR

支援

支援

支援

支援

JSON

支援

支援

支援

支援

TEXT

支援

支援

支援

支援

MEDIUMTEXT

支援

支援

支援

支援

LONGTEXT

支援

支援

支援

支援

VARBINARY

支援

支援

支援

支援

BINARY

支援

支援

支援

支援

TINYBLOB

支援

支援

支援

支援

MEDIUMBLOB

支援

支援

支援

支援

LONGBLOB

支援

支援

支援

支援

ENUM

支援

支援

支援

支援

SET

支援

支援

支援

支援

BOOLEAN

支援

支援

支援

支援

BIT

支援

支援

支援

支援

DATE

支援

支援

支援

支援

DATETIME

支援

支援

支援

支援

TIMESTAMP

支援

支援

支援

支援

TIME

支援

支援

支援

支援

YEAR

支援

支援

支援

支援

LINESTRING

不支援

不支援

不支援

不支援

POLYGON

不支援

不支援

不支援

不支援

MULTIPOINT

不支援

不支援

不支援

不支援

MULTILINESTRING

不支援

不支援

不支援

不支援

MULTIPOLYGON

不支援

不支援

不支援

不支援

GEOMETRYCOLLECTION

不支援

不支援

不支援

不支援

資料同步前準備:MySQL環境準備

在DataWorks上進行資料同步前,您需要參考本文提前在MySQL側進行資料同步環境準備,以便在DataWorks上進行MySQL資料同步任務配置與執行時服務正常。以下為您介紹MySQL同步前的相關環境準備。

準備工作1:確認MySQL版本

Data Integration對MySQL版本有要求,您可參考上文支援的MySQL版本章節,查看當前待同步的MySQL是否符合版本要求。您可以在MySQL資料庫通過如下語句查看當前MySQL資料庫版本。

SELECT version();

準備工作2:配置帳號許可權

建議您提前規劃並建立一個專用於DataWorks訪問資料來源的MySQL帳號,操作如下。

  1. 可選:建立帳號。

    操作詳情請參見建立MySQL帳號

  2. 配置許可權。

    • 離線

      在離線同步情境下:

      • 在離線讀MySQL資料時,此帳號需擁有同步表的讀(SELECT)許可權。

      • 在離線寫MySQL資料時,此帳號需擁有同步表的寫(INSERTDELETEUPDATE)許可權。

    • 即時

      在即時同步情境下,此帳號需要擁有資料庫的SELECTREPLICATION SLAVEREPLICATION CLIENT許可權。

    您可以參考以下命令為帳號添加許可權,或直接給帳號賦予SUPER許可權。如下執行語句在實際使用時,請替換'同步帳號'為上述建立的帳號。

    -- CREATE USER '同步帳號'@'%' IDENTIFIED BY '密碼'; //建立同步帳號並設定密碼,使其可以通過任意主機登入資料庫。%表示任意主機。
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO '同步帳號'@'%'; //授權同步帳號資料庫的 SELECT, REPLICATION SLAVE, REPLICATION CLIENT許可權。

    *.*表示授權同步帳號對所有資料庫的所有表擁有上述許可權。您也可以指定授權同步帳號對目標資料庫的指定表擁有上述許可權。例如,授權同步帳號對test資料庫的user表擁有上述許可權,則可以使用GRANT SELECT, REPLICATION CLIENT ON test.user TO '同步帳號'@'%';語句。

    說明

    REPLICATION SLAVE語句為全域許可權,不能指定授權同步帳號對目標資料庫的指定表擁有相關許可權。

準備工作3:(僅即時同步需要)開啟MySQL Binlog

Data Integration通過即時訂閱MySQL Binlog實現增量資料即時同步,您需要在DataWorks配置同步前,先開啟MySQL Binlog服務。操作如下:

重要
  • 如果Binlog在消費中,則無法被資料庫刪除。如果即時同步任務運行延遲將可能導致源端Binlog長時間被消費,請合理配置任務的延遲警示,並及時關注資料庫的磁碟空間。

  • Binlog至少保留72小時以上,避免任務失敗後因Binlog已經消失,再啟動無法重設位點到故障發生前而導致的資料丟失(此時只能使用全量離線同步來補齊資料)。

  1. 檢查Binlog是否開啟。

    • 使用如下語句檢查Binlog是否開啟。

      SHOW variables like "log_bin";

      返回結果為ON時,表明已開啟Binlog。

    • 如果您使用備用庫同步資料,則還可以通過如下語句檢查Binlog是否開啟。

      SHOW variables LIKE "log_slave_updates";

      返回結果為ON時,表明備用庫已開啟Binlog。

    如果返回的結果與上述結果不符:

  2. 查詢Binlog的使用格式。

    使用如下語句查詢Binlog的使用格式。

    SHOW variables LIKE "binlog_format";

    返回結果說明:

    • 返回ROW,表示開啟的Binlog格式為ROW

    • 返回STATEMENT,表示開啟的Binlog格式為STATEMENT

    • 返回MIXED,表示開啟的Binlog格式為MIXED

    重要

    DataWorks即時同步僅支援同步MySQL伺服器Binlog配置格式為ROW。如果返回非ROW請修改Binlog Format。

  3. 查詢Binlog完整日誌是否開啟。

    使用如下語句查詢Binlog完整日誌是否開啟。

    show variables like "binlog_row_image";

    返回結果說明:

    • 返回FULL,表示Binlog開啟了完整日誌。

    • 返回MINIMAL,表示Binlog開啟了最小日誌,未開啟完整日誌。

    重要

    DataWorks即時同步,僅支援同步開啟了Binlog完整日誌的MySQL伺服器資料。若查詢結果返回非FULL,請修改binlog_row_image的配置。

OSS binlog讀取授權配置

在添加MySQL資料來源時,如果配置模式阿里雲執行個體模式,且RDS MySQL執行個體地區與DataWorks專案空間在同一地區,您可以開啟支援OSS binlog讀取,開啟後,在無法訪問RDS binlog時,將會嘗試從OSS擷取binlog,以避免即時同步任務中斷。

如果選擇的OSS binlog訪問身份阿里雲RAM子帳號阿里雲RAM角色,您還需參考如下方式配置帳號授權。

阿里雲RAM子帳號
  1. 登入RAM 存取控制-使用者控制台,找到需要授權的子帳號。具體操作:

  2. 單擊操作列的添加許可權

  3. 配置如下關鍵參數後,單擊確認新增授權

    • 資源範圍:帳號層級。

    • 權限原則:系統策略。

    • 策略名稱稱AliyunDataWorksAccessingRdsOSSBinlogPolicy

    image

阿里雲RAM角色
  1. 登入RAM 存取控制-角色控制台,建立一個RAM角色。具體操作,請參見建立可信實體為阿里雲帳號的RAM角色

    關鍵參數:

    • 選擇可信實體類型:阿里雲帳號。

    • 角色名稱:自訂。

    • 選擇信任的雲帳號:其他帳號,填寫DataWorks工作空間所屬的雲帳號。

  2. 為建立好的RAM角色精確授權。具體操作,請參見為RAM角色授權

    關鍵參數:

    • 權限原則:系統策略。

    • 策略名稱稱AliyunDataWorksAccessingRdsOSSBinlogPolicy

  3. 為建立好的RAM角色修改信任策略。具體操作,請參見修改RAM角色的信任策略

    {
        "Statement": [
            {
                "Action": "sts:AssumeRole",
                "Effect": "Allow",
                "Principal": {
                    "Service": [
                        "<DataWorks使用者主帳號的雲帳號ID>@cdp.aliyuncs.com"
                    ]
                }
            }
        ],
        "Version": "1"
    }

建立資料來源

在進行資料同步任務開發時,您需要在DataWorks上建立一個對應的資料來源,操作流程請參見建立並管理資料來源詳細的配置參數解釋可在配置介面查看對應參數的文案提示

資料同步任務開發:MySQL同步流程引導

資料同步任務的配置入口和通用配置流程可參見下文的配置指導。

單表離線同步任務配置指導

單表即時同步任務配置指導

操作流程請參見DataStudio側即時同步任務配置

整庫離線、整庫(即時)全增量、整庫(即時)分庫分表等整庫層級同步配置指導

操作流程請參見Data Integration側同步任務配置

常見問題

更多其他Data Integration常見問題請參見Data Integration常見問題

附錄:MySQL指令碼Demo與參數說明

離線任務指令碼配置方式

如果您配置離線任務時使用指令碼模式的方式進行配置,您需要按照統一的指令碼格式要求,在任務指令碼中編寫相應的參數,詳情請參見通過指令碼模式配置離線同步任務,以下為您介紹指令碼模式下資料來源的參數配置詳情。

Reader指令碼Demo

本文為您提供單庫單表和分庫分表的配置樣本:

說明

本文JSON樣本中的注釋僅用於展示部分重要參數含義,實際配置時,請移除注釋內容。

  • 配置單庫單表

    {
      "type": "job",
      "version": "2.0",//版本號碼。
      "steps": [
        {
          "stepType": "mysql",//外掛程式名。
          "parameter": {
            "column": [//列名。
              "id"
            ],
            "connection": [
              {
                "querySql": [
                  "select a,b from join1 c join join2 d on c.id = d.id;"
                ],
                "datasource": ""//資料來源名稱。
              }
            ],
            "where": "",//過濾條件。
            "splitPk": "",//切分鍵。
            "encoding": "UTF-8"//編碼格式。
          },
          "name": "Reader",
          "category": "reader"
        },
        {
          "stepType": "stream",
          "parameter": {},
          "name": "Writer",
          "category": "writer"
        }
      ],
      "setting": {
        "errorLimit": {
          "record": "0"//錯誤記錄數。
        },
        "speed": {
          "throttle": true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
          "concurrent": 1,//作業並發數。
          "mbps": "12"//限流,此處1mbps = 1MB/s。
        }
      },
      "order": {
        "hops": [
          {
            "from": "Reader",
            "to": "Writer"
          }
        ]
      }
    }
  • 配置分庫分表

    說明

    分庫分表是指在MySQL Reader端可以選擇多個MySQL資料表,且表結構保持一致。此處的‘分庫分表’是指多個MySQL寫入同一個目標表,如想要支援整庫層級配置分庫分表,還請在Data Integration網站建立任務並選擇整庫分庫分表能力

    {
      "type": "job",
      "version": "2.0",
      "steps": [
        {
          "stepType": "mysql",
          "parameter": {
            "indexes": [
              {
                "type": "unique",
                "column": [
                  "id"
                ]
              }
            ],
            "envType": 0,
            "useSpecialSecret": false,
            "column": [
              "id",
              "buyer_name",
              "seller_name",
              "item_id",
              "city",
              "zone"
            ],
            "tableComment": "測試訂單表",
            "connection": [
              {
                "datasource": "rds_dataservice",
                "table": [
                  "rds_table"
                ]
              },
              {
                "datasource": "rds_workshop_log",
                "table": [
                  "rds_table"
                ]
              }
            ],
            "where": "",
            "splitPk": "id",
            "encoding": "UTF-8"
          },
          "name": "Reader",
          "category": "reader"
        },
        {
          "stepType": "odps",
          "parameter": {},
          "name": "Writer",
          "category": "writer"
        },
        {
          "name": "Processor",
          "stepType": null,
          "category": "processor",
          "copies": 1,
          "parameter": {
            "nodes": [],
            "edges": [],
            "groups": [],
            "version": "2.0"
          }
        }
      ],
      "setting": {
        "executeMode": null,
        "errorLimit": {
          "record": ""
        },
        "speed": {
          "concurrent": 2,
          "throttle": false
        }
      },
      "order": {
        "hops": [
          {
            "from": "Reader",
            "to": "Writer"
          }
        ]
      }
    }

Reader指令碼參數

指令碼參數名

描述

是否必選

預設值

datasource

資料來源名稱,指令碼模式支援添加資料來源,此配置項填寫的內容必須與添加的資料來源名稱保持一致。

table

選取的需要同步的表名稱。一個Data Integration任務只能從一張表中讀取資料。

table用於配置範圍的進階用法樣本如下:

  • 您可以通過配置區間讀取分庫分表,例如'table_[0-99]'表示讀取'table_0''table_1''table_2'直到'table_99'

  • 如果您的表數字尾碼的長度一致,例如'table_000''table_001''table_002'直到'table_999',您可以配置為'"table": ["table_00[0-9]", "table_0[10-99]", "table_[100-999]"]'

說明

任務會讀取匹配到的所有表,具體讀取這些表中column配置項指定的列。如果表不存在,或者讀取的列不存在,會導致任務失敗。

column

所配置的表中需要同步的列名集合,使用JSON的數組描述欄位資訊 。預設使用所有列配置,例如[ * ]。

  • 支援列裁剪:列可以挑選部分列進行匯出。

  • 支援列換序:列可以不按照表schema資訊順序進行匯出。

  • 支援常量配置:您需要按照MySQL SQL文法格式,例如["id","table","1","'mingya.wmy'","'null'","to_char(a+1)","2.3","true"]

    • id為普通列名。

    • table為包含保留字的列名。

    • 1為整型數字常量。

    • 'mingya.wmy'為字串常量(注意需要加上一對單引號)。

    • 關於null

      • " "表示空。

      • null表示null。

      • 'null'表示null這個字串。

    • to_char(a+1)為計算字串長度函數。

    • 2.3為浮點數。

    • true為布爾值。

  • column必須顯示指定同步的列集合,不允許為空白。

splitPk

MySQL Reader進行資料幫浦時,如果指定splitPk,表示您希望使用splitPk代表的欄位進行資料分區,資料同步因此會啟動並發任務進行資料同步,提高資料同步的效能。

  • 推薦splitPk使用者使用表主鍵,因為表主鍵通常情況下比較均勻,因此切分出來的分區也不容易出現資料熱點。

  • 目前splitPk僅支援整型資料切分,不支援字串、浮點和日期等其他類型 。如果您指定其他非支援類型,忽略splitPk功能,使用單通道進行同步。

  • 如果不填寫splitPk,包括不提供splitPk或者splitPk值為空白,資料同步視作使用單通道同步該表資料 。

where

篩選條件,在實際業務情境中,往往會選擇當天的資料進行同步,將where條件指定為gmt_create>$bizdate

  • where條件可以有效地進行業務增量同步處理。如果不填寫where語句,包括不提供where的key或value,資料同步均視作同步全量資料。

  • 不可以將where條件指定為limit 10,這不符合MySQL SQL WHERE子句約束。

querySql(進階模式,嚮導模式不支援此參數的配置)

在部分業務情境中,where配置項不足以描述所篩選的條件,您可以通過該配置型來自訂篩選SQL。配置該項後,資料同步系統會忽略tables、columns和splitPk配置項,直接使用該項配置的內容對資料進行篩選。例如,需要進行多表join後同步資料,使用select a,b from table_a join table_b on table_a.id = table_b.id。當您配置querySql時,MySQL Reader直接忽略table、column、where和splitPk條件的配置,querySql優先順序大於tablecolumnwheresplitPk選項。datasource通過它解析出使用者名稱和密碼等資訊。

說明

querySql需要區分大小寫,例如,寫為querysql會不生效。

useSpecialSecret

多來來源資料源時,是否使用各自資料來源的密碼。取值包括:

  • true

  • false

如果您配置了多個來來源資料源,且各個資料來源使用的使用者名稱密碼不一致,您可以設定使用各自資料來源的密碼,即此參數設定為true

false

Writer指令碼Demo

{
  "type": "job",
  "version": "2.0",//版本號碼。
  "steps": [
    {
      "stepType": "stream",
      "parameter": {},
      "name": "Reader",
      "category": "reader"
    },
    {
      "stepType": "mysql",//外掛程式名。
      "parameter": {
        "postSql": [],//匯入後的準備語句。
        "datasource": "",//資料來源。
        "column": [//列名。
          "id",
          "value"
        ],
        "writeMode": "insert",//寫入模式,您可以設定為insert、replace或update。
        "batchSize": 1024,//一次性批量提交的記錄數大小。
        "table": "",//表名。
        "nullMode": "skipNull",//NULL值處理策略。
        "skipNullColumn": [//需要跳過NULL值的列。
          "id",
          "value"
        ],
        "preSql": [
          "delete from XXX;"//匯入前的準備語句。
        ]
      },
      "name": "Writer",
      "category": "writer"
    }
  ],
  "setting": {
    "errorLimit": {//錯誤記錄數。
      "record": "0"
    },
    "speed": {
      "throttle": true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
      "concurrent": 1,//作業並發數。
      "mbps": "12"//限流,控制同步的最高速率,防止對上遊/下遊資料庫讀取/寫入壓力過大,此處1mbps = 1MB/s。
    }
  },
  "order": {
    "hops": [
      {
        "from": "Reader",
        "to": "Writer"
      }
    ]
  }
}

Writer指令碼參數

指令碼參數名

描述

是否必選

預設值

datasource

資料來源名稱,指令碼模式支援添加資料來源,此配置項填寫的內容必須與添加的資料來源名稱保持一致。

table

選取的需要同步的表名稱。

writeMode

選擇匯入模式,可以支援insert intoon duplicate key updatereplace into三種方式:

  • insert into:當主鍵/唯一性索引衝突時會寫不進去衝突的行,以髒資料的形式體現。

    如果您通過指令碼模式配置任務,請設定writeModeinsert

  • on duplicate key update:沒有遇到主鍵/唯一性索引衝突時,與insert into行為一致。衝突時會用新行替換已經指定的欄位的語句,寫入資料至MySQL。

    如果您通過指令碼模式配置任務,請設定writeModeupdate

  • replace into:沒有遇到主鍵/唯一性索引衝突時,與insert into行為一致。衝突時會先刪除原有行,再插入新行。即新行會替換原有行的所有欄位。

    如果您通過指令碼模式配置任務,請設定writeModereplace

insert

nullMode

NULL值處理策略,取值範圍:

  • writeNull:當源端欄位資料是NULL值時,給目標端欄位寫入NULL值。

  • skipNull:當源端欄位資料是NULL值時,目標端不寫入本欄位,若目標端有預設值定義,該列值會使用目標端預設值,若目標端無預設值定義,該列值會是NULL。配置此參數時,需要同時配置skipNullColumn參數。

重要

配置為skipNull時,任務會動態拼接寫資料的SQL語句以支援目標端預設值,會增多FLUSH次數,降低同步速度,最差情況下會每條資料FLUSH一次。

writeNull

skipNullColumn

nullMode配置為skipNull時,此參數配置的列不會被強制寫為NULL,會優先使用對應列本身的預設值。

配置格式:["c1", "c2", ...],其中,c1c2需要配置為column參數的子集。

預設為本任務配置的所有列。

column

目標表需要寫入資料的欄位,欄位之間用英文所逗號分隔,例如"column": ["id", "name", "age"]。如果要依次寫入全部列,使用星號(*)表示, 例如"column": ["*"]

preSql

執行資料同步任務之前率先執行的SQL語句。目前嚮導模式僅允許執行一條SQL語句,指令碼模式可以支援多條SQL語句。例如,執行前清空表中的舊資料(truncate table tablename)。

說明

當有多條SQL語句時,不支援事務。

postSql

執行資料同步任務之後執行的SQL語句,目前嚮導模式僅允許執行一條SQL語句,指令碼模式可以支援多條SQL語句。例如,加上某一個時間戳記alter table tablename add colname timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP

說明

當有多條SQL語句時,不支援事務。

batchSize

一次性批量提交的記錄數大小,該值可以極大減少資料同步系統與MySQL的網路互動次數,並提升整體輸送量。如果該值設定過大,會導致資料同步運行進程OOM異常。

256

updateColumn

writeMode配置成update時,發生遇到主鍵/唯一性索引衝突時所更新的欄位。欄位之間用英文逗號所分隔,例如 "updateColumn":["name", "age"]