通過DTS、Flink CDC、Catalog、阿里雲DataWorks,均可將MySQL(如自建MySQL、RDS MySQL和PolarDB MySQL等)的資料移轉至ApsaraDB for SelectDB。您可依據遷移的資料量和業務情境,選擇合適的方式完成資料移轉。本文為您介紹MySQL資料移轉到ApsaraDB for SelectDB的各個方式以及各方式的功能對比。
遷移方式功能對比
DTS、Flink CDC、Catalog、阿里雲DataWorks均可將MySQL的資料移轉至ApsaraDB for SelectDB,但不同的方式支援遷移的資料有所不同,您可根據不同的業務情境,選擇合適的遷移方式。
遷移方式 | 歷史資料移轉 | 增量資料同步 | 表結構遷移 | 整庫遷移 | 增量同步處理DDL | 資料校正 |
DTS | ✔️ | ✔️ | ✔️ | ✔️ | ✔️ | ✔️ |
Flink CDC | ✔️ | ✔️ | ✔️ | ✔️ | ✔️ | ❌ |
Catalog | ✔️ | ❌ | ❌ | ❌ | ❌ | ❌ |
DataWorks | ✔️ | ❌ | ❌ | ❌ | ❌ | ❌ |
本文將為您概述各個遷移方式的基本遷移步驟,如需擷取更多詳細資料,請根據指引查閱各個方式相應的詳細文檔。
前提條件
已建立ApsaraDB for SelectDB執行個體和MySQL執行個體。
ApsaraDB for SelectDB執行個體和MySQL執行個體需要網路互連,或者二者的網路都與遷移工具的網路互連。
通過DTS遷移資料
DTS支援遷移MySQL歷史資料、同步增量資料到ApsaraDB for SelectDB,且支援庫表遷移、DDL同步、資料校正等能力。本小節以RDS MySQL為例,示範如何遷移MySQL資料到ApsaraDB for SelectDB,更多詳情,請參見通過DTS匯入資料。
操作步驟
在頁面左上方,選擇執行個體所在地區。
在執行個體列表頁面,單擊目標執行個體ID,進入到執行個體詳情頁面。。
在左側導覽列,單擊資料移轉及同步,在頂部功能表列,選擇資料同步頁簽。
說明DTS資料同步通常包含存量資料的遷移和增量資料的即時同步,資料移轉通常用於存量歷史資料的遷移。
單擊建立同步任務,配置源庫及目標庫資訊。
配置完成後,單擊頁面下方的測試連接以進行下一步。
配置任務對象及進階配置。
選擇粒度為庫層級,不支援後續新增表的資料同步。若您在使用DTS的過程中可能會新增資料表,可以將同步對象的粒度選擇為表層級,後續可通過修改同步對象進行修改。
選擇粒度為表層級,且需進行編輯(如表列名映射),則單次同步任務僅支援同步至多1000張表。當超出數量限制,任務提交後會顯示請求報錯,此時建議您拆分待同步的表,分批配置多個任務,或者配置整庫的同步任務。
如需更改單個同步對象在目標執行個體中的名稱,請右擊已選擇對象中的同步對象,設定方式,請參見庫表列名單個映射。
如需批量更改同步對象在目標執行個體中的名稱,請單擊已選擇對象方框右上方的大量編輯,設定方式,請參見庫表列名批量映射。
如需按庫或表層級選擇同步的SQL操作,請在已選擇對象中右擊待同步對象,並在彈出的對話方塊中選擇所需同步的SQL操作。支援的操作,請參見支援增量同步處理的SQL。
如需設定WHERE條件過濾資料,請在已選擇對象中右擊待同步的表,在彈出的對話方塊中設定過濾條件。設定方法請參見設定過濾條件。
如果使用了對象名映射功能,可能會導致依賴這個對象的其他對象同步失敗。
可選:上述配置完成後,單擊下一步配置庫表欄位,設定待同步的表在目標中的主鍵列、分布鍵和引擎選擇資訊。
配置任務對象時同步類型勾選了庫表結構同步時才會有本步驟,您可以將定義狀態選擇為全部後進行修改。
主鍵列可以是選擇多個列組成聯合主鍵,且需要從主鍵列中選擇一個或者多個列作為分布鍵。引擎選擇僅支援選擇Unique。
儲存任務並進行預檢查。預檢查通過率顯示為100%時,單擊下一步購買。
在購買頁面,選擇資料同步執行個體的計費方式、鏈路規格,閱讀並勾選《資料轉送(隨用隨付)服務條款》,單擊購買並啟動,同步任務正式開始,您可在資料同步介面查看具體任務進度。
類別 | 配置 | 說明 |
任務資訊 | 任務名稱 | DTS會自動產生一個任務名稱,建議配置具有業務意義的名稱(無唯一性要求),便於後續識別。 |
源庫資訊 | 資料庫類型 | 選擇MySQL。 |
接入方式 | 選擇雲執行個體。 | |
執行個體地區 | 選擇源RDS MySQL執行個體所屬地區。 | |
RDS執行個體ID | 選擇源RDS MySQL執行個體ID。例如rm-2z3m****。 | |
資料庫帳號 | 填入源RDS MySQL執行個體的資料庫帳號,許可權要求請參見資料庫帳號的許可權要求。 | |
資料庫密碼 | 填入該資料庫帳號對應的密碼。 | |
串連方式 | 根據需求選擇非加密串連或SSL安全連線。如果設定為SSL安全連線,您需要提前開啟RDS MySQL執行個體的SSL加密功能,詳情請參見使用雲端認證快速開啟SSL鏈路加密。 | |
目標庫資訊 | 資料庫類型 | 選擇SelectDB。 |
接入方式 | 選擇雲執行個體。 | |
執行個體地區 | 選擇目標SelectDB執行個體所屬地區。 | |
執行個體ID | 選擇目標SelectDB執行個體的ID。 | |
資料庫帳號 | 填入目標SelectDB執行個體的資料庫帳號,許可權要求請參見資料庫帳號的許可權要求。 | |
資料庫密碼 | 填入該資料庫帳號對應的密碼。 |
配置 | 說明 |
同步類型 | 固定選中增量同步處理。預設情況下,您還需要同時選中庫表結構同步和全量同步。預檢查完成後,DTS會將源執行個體中待同步對象的全量資料在目的地組群中初始化,作為後續增量同步處理資料的基準資料。 |
源庫對象 | 在源庫對象框中單擊待同步對象,然後單擊將其移動至已選擇對象框。同步對象的選擇粒度為庫、表、列。 重要 |
已選擇對象 | 說明 |
通過Flink CDC遷移資料
Flink提供Flink SQL、Flink CDC和DataStream三種方式從MySQL遷移資料到SelectDB,其中Flink CDC支援歷史資料移轉、增量資料同步,且具備完備的庫表遷移、DDL同步等能力。以Flink CDC為例,示範如何將上遊MySQL資料同步到ApsaraDB for SelectDB。更多詳情,請參見通過Flink匯入資料。
操作步驟
準備環境
搭建Flink環境,本樣本以Flink 1.16單機環境為例。
下載flink-1.16.3-bin-scala_2.12.tgz,進行解壓,樣本如下。
wget https://dlcdn.apache.org/flink/flink-1.16.3/flink-1.16.3-bin-scala_2.12.tgz tar -zxvf flink-1.16.3-bin-scala_2.12.tgz
進入FLINK_HOME/lib目錄中下載flink-sql-connector-mysql-cdc-2.4.2和flink-doris-connector-1.16-1.5.2,樣本如下。
cd flink-1.16.3 cd lib/ wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.2/flink-sql-connector-mysql-cdc-2.4.2.jar wget https://repo.maven.apache.org/maven2/org/apache/doris/flink-doris-connector-1.16/1.5.2/flink-doris-connector-1.16-1.5.2.jar
啟動Flink Standalone叢集,樣本如下。
bin/start-cluster.sh
建立ApsaraDB for SelectDB執行個體。具體操作,請參見建立執行個體。
通過MySQL協議串連ApsaraDB for SelectDB執行個體。具體操作,請參見串連執行個體。
建立測試資料庫和測試表。
建立測試資料庫。
CREATE DATABASE test_db;
建立測試表。
USE test_db; CREATE TABLE employees ( emp_no int NOT NULL, birth_date date, first_name varchar(20), last_name varchar(20), gender char(2), hire_date date ) UNIQUE KEY(`emp_no`) DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;
提交Flink CDC任務
提交Flink CDC任務的文法如下:
<FLINK_HOME>/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-1.5.2.jar \
mysql-sync-database \
--database test_db \
--including-tables "tbl1|test.*" \
--mysql-conf hostname=127.0.0.1 \
--mysql-conf username=root \
--mysql-conf password=123456 \
--mysql-conf database-name=mysql_db \
--sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
--sink-conf username=admin \
--sink-conf password=****
參數 | 說明 |
execution.checkpointing.interval | Flink checkpoint的時間間隔,影響資料同步的頻率,推薦10s。 |
parallelism.default | 設定Flink任務的並行度,適當增加並行度可提高資料同步速度。 |
database | 同步到SelectDB的資料庫名。 |
including-tables | 需要同步的MySQL表,可以使用"|"分隔多個表,並支援Regex。 例如 |
excluding-tables | 不需要同步的表,配置方法與including-tables相同。 |
mysql-conf | MySQL CDC Source配置。配置詳情請參見MySQL CDC Connector,其中 |
sink-conf | Doris Sink的所有配置。更多詳情,請參見通過Flink匯入資料。 |
table-conf | SelectDB表的配置項,即建立SelectDB表時properties中的內容。 |
同步時需要在$FLINK_HOME/lib目錄下添加對應的Flink CDC依賴,例如flink-sql-connector-mysql-cdc-${version}.jar,flink-sql-connector-oracle-cdc-${version}.jar。
整庫同步支援Flink 1.15以上的版本,各個版本Flink Doris Connector的下載請參見Flink Doris Connector。
通過Catalog遷移資料
SelectDB提供的Catalog能力,支援通過聯邦查詢方式訪問MySQL,可簡單快速完成MySQL歷史資料移轉到SelectDB。以下樣本示範如何通過Catalog,將上遊MySQL資料同步到ApsaraDB for SelectDB。更多詳情,請參見JDBC資料來源。
操作步驟
串連SelectDB執行個體。具體操作,請參見串連執行個體。
說明使用DMS登入時,
SWITCH
指令失效。推薦使用MySQL用戶端串連。建立MySQL JDBC Catalog。
CREATE CATALOG jdbc_mysql PROPERTIES (
"type"="jdbc",
"user"="root",
"password"="123456",
"jdbc_url" = "jdbc:mysql://127.0.0.1:3306/demo",
"driver_url" = "mysql-connector-java-8.0.25.jar",
"driver_class" = "com.mysql.cj.jdbc.Driver",
"checksum" = "fdf55dcef04b09f2eaf42b75e61ccc9a"
)
參數說明
參數 | 是否必選 | 預設值 | 說明 |
user | 是 | 無 | 對應資料庫的帳號。 |
password | 是 | 無 | 對應資料庫的密碼。 |
jdbc_url | 是 | 無 | JDBC串連串。 |
driver_url | 是 | 無 | JDBC Driver Jar包名稱。 |
driver_class | 是 | 無 | JDBC Driver Class名稱。 |
lower_case_table_names | 否 | "false" | 指定是否以小寫形式同步JDBC外部資料源的庫名和表名。 |
only_specified_database | 否 | "false" | 指定是否只同步指定的Database。 |
include_database_list | 否 | "" | 當 |
exclude_database_list | 否 | "" | 當 |
在SelectDB中進行建表後,然後通過庫內ETL文法
insert into select
完成資料同步。更多insert into
詳情,請參見Insert Into。
# 建表
CREATE TABLE selectdb_table ...
# 遷移資料
INSERT INTO selectdb_table SELECT * FROM mysql_catalog.mysql_database.mysql_table;
通過DataWorks遷移資料
ApsaraDB for SelectDB支援使用DataWorks的Data Integration功能匯入歷史資料。下文將介紹如何通過DataWorks同步MySQL資料至ApsaraDB for SelectDB。更多詳情,請參見通過DataWorks匯入資料。
不支援寫入BITMAP、HLL(HyperLogLog)和QUANTILE_STATE類型的欄位。
資料同步任務開發
新增資料來源
在進行資料同步任務開發時,您需要在DataWorks上分別建立MySQL和SelectDB資料來源。
建立MySQL資料來源,詳情請參見MySQL資料來源。
建立SelectDB資料來源,詳情請參見建立並管理資料來源。SelectDB資料來源的部分配置參數如下所示:
參數
說明
資料來源名稱
資料來源的名稱。
MySQL串連地址
請填寫JDBC串連串
jdbc:mysql://<ip>:<port>/<dbname>
。您可以從ApsaraDB for SelectDB控制台的執行個體詳情 > 網路資訊中擷取VPC地址(或公網地址)和MySQL協議連接埠。
樣本:
jdbc:mysql://selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030/test_db
HTTP串連地址
請填寫HTTP協議訪問地址
<ip>:<port>
。您可以從ApsaraDB for SelectDB控制台的執行個體詳情 > 網路資訊中擷取VPC地址(或公網地址)和HTTP協議連接埠。
樣本:
selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080
使用者名稱
請填寫ApsaraDB for SelectDB執行個體的使用者名稱。
密碼
請填寫ApsaraDB for SelectDB執行個體對應使用者的密碼。
配置單表離線同步任務
您可以選擇嚮導模式或者指令碼模式配置離線同步任務,操作流程請參見: