Realtime ComputeFlink版基於Flink CDC,通過開發YAML作業的方式有效地實現了將資料從源端同步到目標端的資料攝入工作。本文介紹如何快速構建一個YAML作業將MySQL庫中的所有資料同步到StarRocks中。
前提條件
已建立Flink工作空間,詳情請參見開通Realtime ComputeFlink版。
上下遊儲存
已建立RDS MySQL執行個體,詳情請參見快速建立RDS MySQL執行個體。
已建立StarRocks執行個體,詳情請參見步驟一:建立存算一體版StarRocks執行個體。
背景資訊
假設MySQL執行個體中有一個order_dw_mysql庫,裡面有名稱為orders、orders_pay和product_catalog的3張業務表。此時,如果您希望開發一個資料攝入YAML作業,將這些表和資料都同步到StarRocks的order_dw_sr資料庫中,則可以按照以下步驟進行:
步驟一:準備RDS MySQL測試資料
建立資料庫和帳號。
為目標執行個體建立名稱為order_dw_mysql資料庫和具有對應資料庫讀寫權限的普通帳號。具體操作請參見建立資料庫和帳號和管理資料庫。
通過DMS登入RDS MySQL。
詳情請參見通過DMS登入RDS MySQL。
在已登入的SQL Console視窗,輸入如下命令後單擊執行,建立資料庫和三張業務表,並插入資料。
CREATE TABLE `orders` ( order_id bigint not null primary key, user_id varchar(50) not null, shop_id bigint not null, product_id bigint not null, buy_fee numeric(20,2) not null, create_time timestamp not null, update_time timestamp not null default now(), state int not null ); CREATE TABLE `orders_pay` ( pay_id bigint not null primary key, order_id bigint not null, pay_platform int not null, create_time timestamp not null ); CREATE TABLE `product_catalog` ( product_id bigint not null primary key, catalog_name varchar(50) not null ); -- 準備資料 INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee'); INSERT INTO orders VALUES (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1), (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1), (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1), (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1), (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1), (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1), (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1); INSERT INTO orders_pay VALUES (2001, 100001, 1, '2023-02-15 17:40:56'), (2002, 100002, 1, '2023-02-15 17:40:56'), (2003, 100003, 0, '2023-02-15 17:40:56'), (2004, 100004, 0, '2023-02-15 17:40:56'), (2005, 100005, 0, '2023-02-15 18:40:56'), (2006, 100006, 0, '2023-02-15 18:40:56'), (2007, 100007, 0, '2023-02-15 18:40:56');
步驟二:開發資料攝入YAML作業
在左側導覽列選擇
。單擊建立,選擇MySQL到Starrocks資料同步,單擊下一步。
填寫作業名稱,儲存位置和選擇引擎版本後,單擊確定。
配置YAML作業代碼資訊。
將MySQL中order_dw_mysql資料庫下的所有表同步到starrocks的order_dw_sr資料庫中,程式碼範例如下。
source: type: mysql hostname: rm-bp1rk934iidc3****.mysql.rds.aliyuncs.com port: 3306 username: ${secret_values.mysqlusername} password: ${secret_values.mysqlpassword} tables: order_dw_mysql.\.* server-id: 5405-5415 sink: type: starrocks name: StarRocks Sink jdbc-url: jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030 load-url: fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030 username: ${secret_values.starrocksusername} password: ${secret_values.starrockspassword} table.create.properties.replication_num: 1 route: - source-table: order_dw_mysql.\.* sink-table: order_dw_sr.<> replace-symbol: <> description: route all tables in source_db to sink_db pipeline: name: Sync MySQL Database to StarRocks
關於MySQL和Starrocks的本樣本需要的配置資訊說明如下表所示,資料攝入更多參數詳情請參見MySQL和StarRocks。
類別
參數
說明
樣本值
source
hostname
MySQL資料庫的IP地址或者Hostname。
建議填寫Virtual Private Cloud地址。
rm-bp1rk934iidc3****.mysql.rds.aliyuncs.com
port
MySQL資料庫服務的連接埠號碼。
3306
username
MySQL資料庫服務的使用者名稱和密碼。填寫您步驟一:準備RDS MySQL測試資料中建立的帳號和密碼資訊。
說明本樣本使用變數,可以避免明文展示密碼等資訊,詳情請參見變數管理。
${secret_values.mysqlusername}
password
${secret_values.mysqlpassword}
tables
MySQL表名。支援Regex以讀取多個表的資料。
本文將同步order_dw_mysql資料庫所有表及資料。
order_dw_mysql.\.*
server-id
資料庫用戶端的一個數字ID。
5405-5415
sink
jdbc-url
JDBC串連的URL。
指定FE(Front End)的IP和查詢連接埠,格式為
jdbc:mysql://ip:port
。您可以在E-MapReduce控制台執行個體詳情頁簽,查看目標執行個體的FE內網地址和查詢連接埠。
jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030
load-url
串連到FE節點的HTTP服務URL。
您可以在E-MapReduce控制台執行個體詳情頁簽,查看目標執行個體的FE內網地址和HTTP連接埠。
fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030
username
StarRocks串連使用者名稱和密碼。
此處需要填寫為您開通StarRocks時填寫的使用者名稱和密碼資訊。
說明本樣本使用變數,可以避免明文展示密碼等資訊,詳情請參見變數管理。
${secret_values.starrocksusername}
password
${secret_values.starrockspassword}
route
source-table
指定生效上遊表。
order_dw_mysql.\.*
sink-table
指定資料路由的目標位置。
order_dw_sr.<>
replace-symbol
在使用模式比對功能時,用於指代上遊表名的字串。
<>
單擊部署。
步驟三:啟動資料攝入YAML作業
在資料攝入頁面,單擊部署後,在彈出的對話方塊中,單擊確定。
在
頁面,單擊目標YAML作業操作中的啟動。單擊啟動。
本樣本選擇為無狀態啟動,參數配置詳情請參見作業啟動。作業啟動後,您可以在作業營運頁面觀察作業的運行資訊和狀態。
步驟四:在StarRocks上查看同步結果
當YAML作業處於運行中後,您就可以在StarRocks查看資料同步情況。
在左側導覽列,單擊SQL Editor,在資料庫頁簽,單擊按鈕。
您會看到default_catalog下出現名稱為order_dw_sr的資料庫。
在查詢列表頁簽,單擊+檔案,建立查詢指令碼後,輸入以下SQL語句,單擊運行。
SELECT * FROM default_catalog.order_dw_sr.orders order by order_id; SELECT * FROM default_catalog.order_dw_sr.orders_pay order by pay_id; SELECT * FROM default_catalog.order_dw_sr.product_catalog order by product_id;
在命令下方查看同步結果。
您會看到StarRocks中已存在和MySQL資料庫中相同名稱的表及資料。
相關文檔
資料攝入YAML作業開發步驟詳情,請參見資料攝入YAML作業開發(公測中)。
YAML作業中source、sink、transform和route模組的開發參考詳情,情參見資料攝入開發參考。