Data Transmission Service支援將RDS MySQL執行個體的全量或增量資料同步至Function ComputeFC的指定函數。您可以編寫函數代碼,結合約步至函數中的資料,對資料進行二次加工。
前提條件
已建立源RDS MySQL執行個體。建立方式,請參見快速建立RDS MySQL執行個體。
已建立請求處理常式類型為處理事件請求的目標服務和函數資源。建立方式,請參見快速建立函數。
注意事項
類型 | 說明 |
源庫限制 |
|
其他限制 |
|
特殊情況 |
|
費用說明
同步類型 | 鏈路配置費用 |
全量資料同步 | 不收費。 |
增量資料同步 | 收費,詳情請參見計費概述。 |
支援同步的SQL操作
操作類型 | SQL動作陳述式 |
DML | INSERT、UPDATE、DELETE |
DDL |
|
資料庫帳號的許可權要求
資料庫 | 要求的權限 | 建立及授權方式 |
源RDS MySQL | 待同步對象的讀許可權。 |
若您使用的源庫帳號不是通過RDS MySQL控制台建立且授權,您需確保該帳號具備REPLICATION CLIENT、REPLICATION SLAVE、SHOW VIEW和SELECT許可權。
操作步驟
進入同步任務的列表頁面。
在頂部功能表列中,單擊整合與開發。
在左側導覽列,選擇 。
說明實際操作可能會因DMS的模式和布局不同,而有所差異。更多資訊,請參見極簡模式控制台和自訂DMS介面布局與樣式。
您也可以登入新版DTS同步工作清單頁面。
在同步任務右側,選擇同步執行個體所屬地區。
說明新版DTS同步工作清單頁面,需要在頁面左上方選擇同步執行個體所屬地區。
單擊建立任務,配置源庫及目標庫資訊。
類別
配置
說明
無
任務名稱
DTS會自動產生一個任務名稱,建議配置具有業務意義的名稱(無唯一性要求),便於後續識別。
源庫資訊
選擇已有的DMS資料庫執行個體(可選,如未建立可忽略此處選擇,直接在下方設定資料庫資訊即可)
您可以按實際需求,選擇是否使用已有執行個體。
如使用已有執行個體,下方資料庫資訊將自動填入,您無需重複輸入。
如不使用已有執行個體,您需要輸入下方的資料庫資訊。
資料庫類型
選擇MySQL。
接入方式
選擇雲執行個體。
執行個體地區
選擇源RDS MySQL執行個體所屬地區。
是否跨阿里雲帳號
本樣本為同一阿里雲帳號間的同步,選擇不跨帳號。
RDS執行個體ID
選擇源RDS MySQL執行個體ID。
資料庫帳號
填入源RDS MySQL執行個體的資料庫帳號,許可權要求請參見資料庫帳號的許可權要求。
資料庫密碼
填入該資料庫帳號對應的密碼。
串連方式
根據需求選擇非加密串連或SSL安全連線。如果設定為SSL安全連線,您需要提前開啟RDS MySQL執行個體的SSL加密功能,詳情請參見使用雲端認證快速開啟SSL鏈路加密。
目標庫資訊
選擇已有的DMS資料庫執行個體(可選,如未建立可忽略此處選擇,直接在下方設定資料庫資訊即可)
您可以按實際需求,選擇是否使用已有執行個體。
如使用已有執行個體,下方資料庫資訊將自動填入,您無需重複輸入。
如不使用已有執行個體,您需要輸入下方的資料庫資訊。
資料庫類型
選擇Function Compute FC。
接入方式
選擇雲執行個體。
執行個體地區
預設與源庫執行個體地區一致,且不支援修改。
服務
選擇目標Function ComputeFC的服務名稱。
函數
選擇目標Function ComputeFC用於接收資料的函數。
服務版本和別名
請根據實際情況進行選擇。
預設版本:服務版本固定為LATEST。
指定版本:您還需選擇服務版本。
指定別名:您還需選擇服務別名。
說明目標Function ComputeFC的專有名詞,請參見基本概念。
配置完成後,單擊頁面下方的測試連接以進行下一步。
如果源或目標資料庫是阿里雲資料庫執行個體(例如RDS MySQL、ApsaraDB for MongoDB等),DTS會自動將對應地區DTS服務的IP地址添加到阿里雲資料庫執行個體的白名單中;如果源或目標資料庫是ECS上的自建資料庫,DTS會自動將對應地區DTS服務的IP地址添加到ECS的安全規則中,您還需確保自建資料庫沒有限制ECS的訪問(若資料庫是叢集部署在多個ECS執行個體,您需要手動將DTS服務對應地區的IP地址添到其餘每個ECS的安全規則中);如果源或目標資料庫是IDC自建資料庫或其他雲資料庫,則需要您手動添加對應地區DTS服務的IP地址,以允許來自DTS伺服器的訪問。DTS服務的IP地址,請參見DTS伺服器的IP位址區段。
警告DTS自動添加或您手動添加DTS服務的公網IP位址區段可能會存在安全風險,一旦使用本產品代表您已理解和確認其中可能存在的安全風險,並且需要您做好基本的安全防護,包括但不限於加強帳號密碼強度防範、限制各網段開放的連接埠號碼、內部各API使用鑒權方式通訊、定期檢查並限制不需要的網段,或者使用通過內網(專線/VPN網關/智能網關)的方式接入。
配置任務對象及進階配置。
配置
說明
同步類型
固定選中增量同步處理。若您有存量資料的同步需求,您還需要同時選中全量同步。
資料格式
同步到FC函數中的資料存放區格式,當前僅支援Canal Json。
說明Canal Json的參數說明和樣本,請參見Canal Json說明。
源庫對象
在源庫對象框中單擊待同步對象,然後單擊將其移動至已選擇對象框。
說明同步對象的選擇粒度為庫或表。
已選擇對象
請在已選擇對象框中確認待同步的資料。
說明若您需要移除已選擇的對象,可以在已選擇對象框中勾選目標對象,並單擊進行移除。
單擊下一步高級配置,進行進階配置。
配置
說明
選擇調度該任務的專屬叢集
DTS預設將任務調度到共用叢集上,您無需選擇。若您希望任務更加穩定,可以購買專屬叢集來運行DTS同步任務。更多資訊,請參見什麼是DTS專屬叢集。
設定警示
是否設定警示,當同步失敗或延遲超過閾值後,將通知警示連絡人。
不設定:不設定警示。
設定:設定警示,您還需要設定警示閾值和警示通知。更多資訊,請參見在配置任務過程中配置監控警示。
源庫、目標庫無法串連後的重試時間
在同步任務啟動後,若源庫或目標庫串連失敗則DTS會報錯,並會立即進行持續的重試串連,預設持續重試時間為720分鐘,您也可以在取值範圍(10~1440分鐘)內自訂重試時間,建議設定30分鐘以上。如果DTS在設定的重試時間內重新串連上源庫、目標庫,同步任務將自動回復。否則,同步任務將會失敗。
說明針對同源或者同目標的多個DTS執行個體,如DTS執行個體A和DTS執行個體B,設定網路重試時間時A設定30分鐘,B設定60分鐘,則重試時間以低的30分鐘為準。
由於串連重試期間,DTS將收取任務運行費用,建議您根據業務需要自訂重試時間,或者在源和目標庫執行個體釋放後儘快釋放DTS執行個體。
源庫、目標庫出現其他問題後的重試時間
在同步任務啟動後,若源庫或目標庫出現非串連性的其他問題(如DDL或DML執行異常),則DTS會報錯並會立即進行持續的重試操作,預設持續重試時間為10分鐘,您也可以在取值範圍(1~1440分鐘)內自訂重試時間,建議設定10分鐘以上。如果DTS在設定的重試時間內相關操作執行成功,同步任務將自動回復。否則,同步任務將會失敗。
重要源庫、目標庫出現其他問題後的重試時間的值需要小於源庫、目標庫無法串連後的重試時間的值。
是否限制全量遷移速率
在全量同步階段,DTS將佔用源庫和目標庫一定的讀寫資源,可能會導致資料庫的負載上升。您可以根據實際情況,選擇是否對全量同步任務進行限速設定(設定每秒查詢源庫的速率QPS、每秒全量遷移的行數RPS和每秒全量遷移的數據量(MB)BPS),以緩解目標庫的壓力。
說明僅當同步類型選擇了全量同步時才可以配置。
是否限制增量同步處理速率
您也可以根據實際情況,選擇是否對增量同步處理任務進行限速設定(設定每秒增量同步處理的行數RPS和每秒增量同步處理的數據量(MB)BPS),以緩解目標庫的壓力。
環境標籤
您可以根據實際情況,選擇用於標識執行個體的環境標籤。本樣本無需選擇。
配置ETL功能
選擇是否配置ETL功能。關於ETL的更多資訊,請參見什麼是ETL。
是:配置ETL功能,並在文字框中填寫資料處理語句,詳情請參見在DTS遷移或同步任務中配置ETL。
否:不配置ETL功能。
是否去除正反向任務的心跳錶sql
根據業務需求選擇是否在DTS執行個體運行時,在源庫中寫入心跳SQL資訊。
是:不在源庫中寫入心跳SQL資訊,DTS執行個體可能會顯示有延遲。
否:在源庫中寫入心跳SQL資訊,可能會影響源庫的物理備份和複製等功能。
儲存任務並進行預檢查。
若您需要查看調用API介面配置該執行個體時的參數資訊,請將滑鼠游標移動至下一步儲存任務並預檢查按鈕上,然後單擊氣泡中的預覽OpenAPI參數。
若您無需查看或已完成查看API參數,請單擊頁面下方的下一步儲存任務並預檢查。
說明在同步作業正式啟動之前,會先進行預檢查。只有預檢查通過後,才能成功啟動同步作業。
如果預檢查失敗,請單擊失敗檢查項後的查看詳情,並根據提示修複後重新進行預檢查。
如果預檢查產生警告:
對於不可以忽略的檢查項,請單擊失敗檢查項後的查看詳情,並根據提示修複後重新進行預檢查。
對於可以忽略無需修複的檢查項,您可以依次單擊點擊確認警示詳情、確認屏蔽、確定、重新進行預檢查,跳過警示檢查項重新進行預檢查。如果選擇屏蔽警示檢查項,可能會導致資料不一致等問題,給業務帶來風險。
預檢查通過率顯示為100%時,單擊下一步購買。
在購買頁面,選擇資料同步執行個體的計費方式、鏈路規格,詳細說明請參見下表。
類別
參數
說明
資訊配置
計費方式
預付費(訂用帳戶):在建立執行個體時支付費用。適合長期需求,價格比隨用隨付更實惠,且購買時間長度越長,折扣越多。
後付費(隨用隨付):按小時計費。適合短期需求,用完可立即釋放執行個體,節省費用。
資源群組配置
執行個體所屬的資源群組,預設為default resource group。更多資訊,請參見什麼是資源管理。
鏈路規格
DTS為您提供了不同效能的同步規格,同步鏈路規格的不同會影響同步速率,您可以根據業務情境進行選擇。更多資訊,請參見資料同步鏈路規格說明。
訂購時間長度
在預付費模式下,選擇訂用帳戶執行個體的時間長度和數量,包月可選擇1~9個月,包年可選擇1年、2年、3年和5年。
說明該選項僅在付費類型為預付費時出現。
配置完成後,閱讀並勾選《資料轉送(隨用隨付)服務條款》。
單擊購買並啟動,並在彈出的確認對話方塊,單擊確定。
您可在資料同步介面查看具體任務進度。
後續步驟
若待同步的單條資料大於16 MB,導致DTS任務報錯,您可以修改同步對象或者使用ETL功能,過濾大欄位的資料。操作方法,請參見在已有同步任務上修改ETL配置和修改同步對象。
根據實際需求,編寫函數代碼。更多資訊,請參見代碼開發概述。
目標服務接收的資料格式
FC接收到的資料類型為Object,源端增量資料以數組的方式儲存在Records欄位中,數組中的每一個元素為一條Object類型的資料記錄。其中Object欄位和含義如下表所示。
FC接收到的資料有DML和DDL兩種:
DDL:記錄更改資料庫的結構資訊。
DML:記錄管理資料庫中的資料資訊。
欄位 | 類型 | 說明 |
| Boolean | 是否為DDL操作。
|
| String | SQL操作的類型。
重要 全量任務階段固定為INIT。 |
| String | MySQL的資料庫名。 |
| String | MySQL的表名。 |
| String | MySQL表的主鍵名。 |
| Long | 操作在源庫的執行時間,13位Unix時間戳記,單位為毫秒。 說明 Unix時間戳記轉換工具可用搜尋引擎擷取。 |
| Long | 操作開始寫入到目標庫的時間,13位Unix時間戳記,單位為毫秒。 說明 Unix時間戳記轉換工具可用搜尋引擎擷取。 |
| Object Array | Array中僅包含一個元素,類型為Object,其中Key為列名,value為對應的值。 |
old | Object Array | 儲存更新前的資料,格式同 data 欄位。 重要 僅當 |
sql | String |
|
| Int | 操作的序號。 |
DDL操作資料格式樣本
ALTER TABLE操作
SQL語句
ALTER TABLE `demoTable`
ADD COLUMN `address` varchar(20) NULL AFTER `sex`
;
FC接收到的資料
{
'Records': [{
'type': 'DDL',
'serverId': '27142679',
'es': 1690000000000,
'sql': '/* Query from DMS-WEBSQL-0-Eid_15682857722282385K by user 14xxxxxxxx */ ALTER TABLE `demoDatabase`.`DDL` \n\tADD COLUMN `address` varchar(20) NULL AFTER `sex`',
'database': 'demoDatabase',
'id': 63151,
'isDdl': True,
'table': 'demoTable',
'ts': 1690000000000
}]
}
DML操作資料格式樣本
插入資料
SQL語句
INSERT INTO demoTable VALUES("xiaoming", 10, "man");
FC接收到的資料
{
'Records': [{
'data': [{
'sex': 'man',
'name': 'xiaoming',
'age': '10'
}],
'pkNames': ['name'],
'type': 'INSERT',
'serverId': '27142678',
'es': 1690000000000,
'sql': '',
'database': 'demoDatabase',
'sqlType': {
'sex': 253,
'name': 253,
'age': 3
},
'mysqlType': {
'sex': 'varchar',
'name': 'varchar',
'age': 'int'
},
'id': 62051,
'isDdl': False,
'table': 'demoTable',
'ts': 1690000000000
}]
}
更新資料
SQL語句
UPDATE `demoDatabase`.`demoTable` SET `age`=11 WHERE `name`='xiaoming';
FC接收到的資料
{
'Records': [{
'data': [{
'sex': 'man',
'name': 'xiaoming',
'age': '11'
}],
'pkNames': ['name'],
'old': [{
'sex': 'man',
'name': 'xiaoming',
'age': '10'
}],
'type': 'UPDATE',
'serverId': '27142679',
'es': 1690000000000,
'sql': '',
'database': 'demoDatabase',
'sqlType': {
'sex': 253,
'name': 253,
'age': 3
},
'mysqlType': {
'sex': 'varchar',
'name': 'varchar',
'age': 'int'
},
'id': 62373,
'isDdl': False,
'table': 'demoTable',
'ts': 1690000000000
}]
}
刪除資料
SQL語句
DELETE FROM `demoDatabase`.`demoTable` WHERE `name`='xiaoming';
FC接收到的資料
{
'Records': [{
'data': [{
'sex': 'man',
'name': 'xiaoming',
'age': '11'
}],
'pkNames': ['name'],
'type': 'DELETE',
'serverId': '27142679',
'es': 1690000000000,
'sql': '',
'database': 'demoDatabase',
'sqlType': {
'sex': 253,
'name': 253,
'age': 3
},
'mysqlType': {
'sex': 'varchar',
'name': 'varchar',
'age': 'int'
},
'id': 62635,
'isDdl': False,
'table': 'demoTable',
'ts': 1690000000000
}]
}