CTAS可以實現單表的結構和資料同步,CDAS可以實現整庫同步或者同一庫中的多表結構和資料同步。本文為您介紹如何使用Realtime ComputeFlink平台和E-MapReduce StarRocks通過CTAS&CDAS功能實現即時數倉中TP(Transaction Processing)和AP(Analytical Processing)資料同步的情境。
背景資訊
通過CTAS(CREATE TABLE AS)語句可以在StarRocks中自動建立和MySQL中表結構一致的表,並進行資料同步,還能即時同步上遊表結構(Schema)的變更到下遊表,提高您在目標儲存中建立表和維護源表結構變更的效率。
當執行CTAS語句時,Flink會按照以下流程執行:
檢查目標儲存中是否存在該目標表。
如果不存在,則通過目標端Catalog在目標儲存中建立相應的目標表,該目標表具有和資料來源相同的Schema。
如果存在,則跳過建表。如果已存在的目標表與源表Schema不一致,則會報錯提示。
提交和啟動相應的資料同步作業。同步資料來源的資料以及Schema的變更到目標表中。
表結構變更同步策略通過CTAS語句,在即時同步資料的同時,還能同步源表Schema的變更到目標表中。
Schema變更包括初始表的建立以及未來表的變更。
當前支援同步的Schema變更:
添加可空列:會自動在目標表Schema末尾添加對應的列,並自動同步新增列的資料。
刪除可空列:不會直接在目標表中刪除該列,而是將該列的資料自動填滿為NULL值。
重新命名列:被視為添加列和刪除列。直接在目標表中末尾添加重新命名後的列,並將重新命名前的列資料自動填滿為NULL值。
例如,如果col_a重新命名為col_b,則會在目標表末尾添加col_b,並自動將col_a的資料填充為NULL值。
暫不支援同步的Schema變更:
資料類型的變更。
例如,由VARCHAR變為BIGINT,由NOT NULL變為NULLABLE屬性。
主鍵或索引等約束的變更。
非空列的增加或刪除的變更。
DDL中欄位長度的調整。
如果遇到不支援的Schema變更,則需要您手動刪除下遊目標表,重新啟動CTAS作業,即重新建立目標表並重新同步歷史資料。
CTAS不會識別具體的DDL類型,而是對比前後兩條資料的Schema差異。因此,如果您先刪除了某列後,又加回了該列,且這兩個DDL之間無資料變化,則CTAS會認為沒有發生結構變更。同理,如果您添加了一列,直到該表有資料變化,CTAS才會感知到結構變更,才會同步結構變更到目標表。
通過CTAS建表支援的欄位類型資訊,請參見Flink與StarRocks的資料類型映射關係。
在使用CTAS語句合并MySQL多張表時,預設情況下,系統會自動在產生的新表結構最前面添加
_db_name
和_table_name
兩列,用來追蹤來源資料表資訊。由於這一自動添加行為不可更改,您在定義新表的列順序時,請直接從第三列開始定義您期望的列順序,以確保新表結構符合預期。
前提條件
已開通阿里雲Realtime ComputeFlink全託管並建立了Flink叢集,詳情請參見開通Flink全託管和Flink SQL作業快速入門。
已建立StarRocks叢集,詳情請參見建立StarRocks叢集。
已建立RDS MySQL,詳情請參見建立RDS MySQL執行個體。
本文以5.7版本的MySQL、EMR-3.39.1版本的StarRocks叢集和1.15-6.0.3版本的Flink為例介紹。
使用限制
建立的Flink叢集、StarRocks叢集以及RDS MySQL執行個體需要在同一個VPC下。
RDS MySQL須為5.7及以上版本。
StarRocks須開啟公網訪問。
Flink叢集中的Flink須為1.15-vvr-6.0.3及以上版本。
步驟一:準備測試資料
建立測試的資料庫和帳號,詳情請參見建立資料庫和帳號。
建立完資料庫和帳號後,需要授權測試帳號的讀寫權限。
說明本文建立的資料庫名稱為test_cdc,帳號為test。
使用建立的測試帳號串連MySQL執行個體,詳情請參見通過DMS登入RDS MySQL。
在MySQL中執行以下命令,建立資料表。
use test_cdc; CREATE TABLE IF NOT EXISTS `runoob_tbl`( `runoob_id` INT UNSIGNED AUTO_INCREMENT, `runoob_title` VARCHAR(100) NOT NULL, `runoob_author` VARCHAR(40) NOT NULL, `submission_date` DATE, `add_col` int DEFAULT NULL, PRIMARY KEY ( `runoob_id` ) )ENGINE=InnoDB DEFAULT CHARSET=utf8; INSERT INTO test_cdc.`runoob_tbl` (`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`) values (18,'first','tom','2022-06-22 17:13:44',3)
使用SSH方式登入StarRocks叢集,詳情請參見登入叢集。
執行以下,串連StarRocks叢集。
mysql -h127.0.0.1 -P 9030 -uroot
執行以下命令,建立使用者和授權。
CREATE DATABASE test_cdc; CREATE USER 'test' IDENTIFIED by '123456'; GRANT CREATE TABLE ON DATABASE test_cdc TO test;
步驟二:在Realtime ComputeFlink控制台通過SQL用戶端建立Catalog
在阿里雲Realtime ComputeFlink控制台的作業開發頁面中,建立MySQL和StarRocks的Catalog。詳情請參見Flink SQL作業快速入門。
參數僅供參考格式,具體內容請根據實際情況配置。
MySQL Catalog
程式碼範例
CREATE CATALOG mysql WITH ( 'type' = 'mysql', 'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'emr-test', 'password' = '123456', 'default-database' = 'test_cdc' );
參數配置
參數
說明
type
類型,固定值為mysql。
hostname
RDS的內網地址。您可以在RDS的資料庫連接頁面,單擊內網地址進行複製。例如,rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com。
port
MySQL資料庫服務的連接埠號碼,預設值為3306。
username
MySQL資料庫服務的使用者名稱。
填寫步驟一:準備測試資料中帳號的使用者名稱。本樣本為test。
password
MySQL資料庫服務的密碼。
填寫步驟一:準備測試資料中帳號的密碼。
default-database
預設的MySQL資料庫名稱。
填寫步驟一:準備測試資料中建立的資料庫名。本樣本為test_cdc。
StarRocks Catalog
程式碼範例
CREATE CATALOG sr WITH ( 'type' = 'starrocks', 'endpoint' = '172.16.**.**:9030', 'username' = 'test', 'password' = '123456', 'dbname' = 'test_cdc' );
參數配置
參數
說明
type
類型,固定值為starrocks。
endpoint
StarRocks FE的IP地址和連接埠。
username
StarRocks的使用者名稱。
填寫步驟一:準備測試資料中帳號的使用者名稱。本樣本為test。
password
StarRocks資料庫服務的密碼。
填寫步驟一:準備測試資料中帳號的密碼。
dbname
StarRocks資料庫名稱。
填寫步驟一:準備測試資料中建立的資料庫名。本樣本為test_cdc。
步驟三:建立並上線作業
在阿里雲Realtime ComputeFlink控制台的作業開發頁面,編寫CTAS語句。
您可以使用以下三種樣本發送CTAS語句。
AtLeast once語義:通過sink.buffer-flush.interval-ms配置項,配置每次寫入StarRocks的時間間隔,優點是寫入間隔時間短,佔用記憶體較少。
/* AtLeast once 語義 */ use CATALOG sr; CREATE TABLE IF NOT EXISTS runoob_tbl_sr with ( 'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8', 'database-name'='test_cdc', 'jdbc-url'='jdbc:mysql://172.16.**.**:9030', 'load-url'='172.16.**.**:18030', 'table-name'='runoob_tbl_sr', 'username'='test', 'password' = '123456', 'sink.buffer-flush.interval-ms' = '5000', 'sink.properties.row_delimiter' = '\x02', 'sink.properties.column_separator' = '\x01' ) as table mysql.test_cdc.runoob_tbl /*+ OPTIONS ( 'connector' = 'mysql-cdc', 'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'test', 'password' = '123456', 'database-name' = 'test_cdc', 'table-name' = 'runoob_tbl' )*/;
Exactly once語義:需要定義checkpoint間隔,優點是在各種異常情況下保障資料不丟失不重複,缺點是資料可見時間取決於checkpoint間隔。更多資訊,請參見Checkpointing。
/* Exactly once 語義。 */ set 'execution.checkpointing.interval' = '1 min'; set 'execution.checkpointing.mode' = 'EXACTLY_ONCE'; set 'execution.checkpointing.timeout' = '10 min'; use CATALOG sr; CREATE TABLE IF NOT EXISTS runoob_tbl with ( 'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8', 'database-name'='test_cdc', 'jdbc-url'='jdbc:mysql://172.16.**.**:9030', 'load-url'='172.16.**.**:18030', 'table-name'='runoob_tbl', 'username'='test', 'password' = '123456', 'sink.semantic' = 'exactly-once', 'sink.properties.row_delimiter' = '\x02', 'sink.properties.column_separator' = '\x01 ) as table mysql.test_cdc.runoob_tbl /*+ OPTIONS ( 'connector' = 'mysql-cdc', 'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'test', 'password' = '123456', 'database-name' = 'test_cdc', 'table-name' = 'runoob_tbl' )*/;
Simple模式:優點是建立表時不需要關注原表有哪些欄位,會按照MySQL的表格式照搬過來,開發人員使用比較方便。缺點是不能建立分區,對於需要分區的表,仍需要通過normal模式建立。
/* 上面兩個為normal模式,本樣本示範simple模式 */ use CATALOG sr; CREATE TABLE IF NOT EXISTS runoob_tbl1 with ( 'starrocks.create.table.properties'='buckets 8', 'starrocks.create.table.mode'='simple', 'database-name'='test_cdc', 'jdbc-url'='jdbc:mysql://172.16.**.**:9030', 'load-url'='172.16.**.**:18030', 'table-name'='runoob_tbl_sr', 'username'='test', 'password' = '123456', 'sink.buffer-flush.interval-ms' = '5000', 'sink.properties.row_delimiter' = '\x02', 'sink.properties.column_separator' = '\x01' ) as table mysql.test_cdc.runoob_tbl /*+ OPTIONS ( 'connector' = 'mysql-cdc', 'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'emr-test', 'password' = '123456', 'database-name' = 'test_cdc', 'table-name' = 'runoob_tbl' )*/;
表 1. WITH參數
參數
是否必選
描述
starrocks.create.table.properties
是
StarRocks建表語句中除了欄位定義以外的其他尾碼定義,例如樣本中的engine、key和buckets等。
database-name
是
StarRocks資料庫名稱。
本樣本為test_cdc。
jdbc-url
是
用於在StarRocks中執行查詢操作。
例如,jdbc:mysql://172.16.**.**:9030。其中,
172.16.**.**
為StarRocks叢集的內網IP地址。load-url
是
指定FE的IP地址和HTTP連接埠,格式為
StarRocks叢集的內網IP地址:連接埠
。本文以8030連接埠為例,實際請根據您的叢集版本選擇訪問的連接埠:18030:EMR-5.9.0及以上版本、EMR-3.43.0及以上版本。
8030:EMR-5.8.0及以下版本、EMR-3.42.0及以下版本。
說明訪問連接埠詳情,請參見UI和連接埠。
sink.semantic
否
填寫exactly-once可以保障資料一致性語義,預設為at-least-once。
starrocks.create.table.mode
否
支援以下參數值:
normal模式(預設值):必須像樣本一樣在starrocks.create.table.properties配置中填寫engine、key和buckets等完整的配置。
simple模式:預設選擇engine為olap,選擇key類型為primary key,且主鍵與MySQL的主鍵保持完全一致,預設distributed by hash(所有的主鍵),預設無分區。需要在starrocks.create.table.properties配置中填寫的必填內容為buckets ,選填內容為properties等配置。
說明因為vvr-6.0.5-flink-1.15及以上版本移除了
sink.use.new-api
,所以使用vvr-6.0.5-flink-1.15之前的版本時,請在with參數中添加'sink.use.new-api'='false',
。
表 2. OPTIONS參數
參數
描述
connector
類型,固定值為mysql-cdc。
hostname
RDS的內網地址。
您可以在RDS的資料庫連接頁面,單擊內網地址進行複製。例如,rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com。
port
MySQL資料庫服務的連接埠號碼,預設值為3306。
username
MySQL資料庫服務的使用者名稱。
填寫步驟一:準備測試資料中帳號的使用者名稱。本樣本為test。
password
MySQL資料庫服務的密碼。
填寫步驟一:準備測試資料中帳號的密碼。
table-name
StarRocks中的表名稱。
填寫步驟一:準備測試資料中建立的表名。本樣本為runoob_tbl。
database-name
預設的MySQL資料庫名稱。
填寫步驟一:準備測試資料中建立的資料庫名。本樣本為test_cdc。
在作業開發頁面的進階配置中,選擇vvr-6.0.3及以上的版本。
單擊上線。
在作業營運頁面,單擊目標作業操作列的啟動。
步驟四:情境示範
查詢資料
使用SSH方式登入StarRocks叢集,詳情請參見登入叢集。
執行以下,串連StarRocks叢集。
mysql -h127.0.0.1 -P 9030 -uroot
在StarRocks串連視窗執行以下命令,查看錶資料。
use test_cdc; select * from runoob_tbl1;
返回資訊如下,表示MySQL上的資料已同步至StarRocks。
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 18 | first | tom | 2022-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
查詢插入後的資料
在RDS資料庫視窗執行以下命令,插入資料。
INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`) values(1,'second','tom2','2022-06-23',1)
在StarRocks串連視窗執行以下命令,查看錶資料。
select * from runoob_tbl1;
返回資訊如下,表示資料已成功插入。
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 1 | second | tom2 | 2022-06-23 | 1 | | 18 | first | tom | 2022-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
同步資料更新
在RDS資料庫視窗執行以下命令,更新指定資料。
update runoob_tbl set runoob_title= 'new' where runoob_id = 18
在StarRocks串連視窗執行以下命令,查看錶資料。
select * from runoob_tbl1;
返回資訊如下,表示資料已同步更新。
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 1 | second | tom2 | 2022-06-23 | 1 | | 18 | new | tom | 2022-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
同步資料刪除
在RDS資料庫視窗執行以下命令,刪除指定資料。
DELETE FROM runoob_tbl WHERE runoob_id = 1
在StarRocks串連視窗執行以下命令,查看錶資料。
select * from runoob_tbl1;
返回資訊如下,表示資料已同步刪除。
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 18 | new | tom | 2022-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
增加可空列
在RDS資料庫視窗執行以下命令,增加可空列。
alter table `runoob_tbl` add COLUMN `add_col2` INT;
執行以下命令 ,插入資料。
INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`,`add_col2`) values(1,'second','tom2','2022-06-23',1,2)
在StarRocks串連視窗執行以下命令,查看錶資料。
select * from runoob_tbl1;
返回資訊如下,表示Schema已經成功變更。
+-----------+--------------+---------------+-----------------+---------+----------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | add_col2 | +-----------+--------------+---------------+-----------------+---------+----------+ | 18 | new | tom | 2022-06-22 | 3 | NULL | +-----------+--------------+---------------+-----------------+---------+----------+ | 1 | second | tom2 | 2022-06-23 | 1 | 2 | | 18 | first | tom | 2022-06-22 | 3 | NULL | +-----------+--------------+---------------+-----------------+---------+----------+
CDAS介紹
CDAS是CTAS的一個文法糖。通過CDAS語句,可以實現MySQL中的整庫同步,即產生一個Flink Job。Source是MySQL中的database,目標表是StarRocks中對應的多張表,同時可以使用including table文法,只選擇一個database中的部分表進行CDAS操作。
與CTAS的執行相同,需要在建立MySQL和StarRocks相應的Catalog後,執行CDAS語句。建立文法樣本如下。
CREATE DATABASE IF NOT EXISTS sr_db with (
'starrocks.create.table.properties'=' buckets 8',
'starrocks.create.table.mode'='simple',
'jdbc-url'='jdbc:mysql://172.16.**.**:9030',
'load-url'='172.16.**.**:18030',
'username'='test',
'password' = '123456',
'sink.buffer-flush.interval-ms' = '5000' ,
'sink.properties.row_delimiter' = '\x02',
'sink.properties.column_separator' = '\x01'
)
as DATABASE mysql.test_cdc including table 'tabl1','tbl2','tbl3'
/*+ OPTIONS ( 'connector' = 'mysql-cdc',
'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
'port' = '3306',
'username' = 'test',
'password' = '123456',
'database-name' = 'test_cdc' )*/;