本文為您介紹如何使用Flink CDC將MySQL資料同步至EMR Serverless StarRocks中。
前提條件
- 已在新版控制台建立DataFlow叢集,詳情請參見建立叢集。
- 已建立EMR Serverless StarRocks執行個體,詳情請參見建立執行個體。
- 已建立RDS MySQL,詳情請參見快速建立RDS MySQL執行個體。
說明 本文以5.7版本的MySQL、EMR-3.39.1版本的DataFlow叢集為例介紹。
使用限制
- DataFlow叢集、EMR Serverless StarRocks執行個體和RDS MySQL執行個體需要在同一個VPC下。
- DataFlow叢集和EMR Serverless StarRocks執行個體均須開啟公網訪問。
- RDS MySQL須為5.7及以上版本。
操作流程
步驟一:準備測試資料
- 建立測試的資料庫和帳號,詳情請參見建立資料庫和帳號。建立完資料庫和帳號後,需要授權測試帳號的讀寫權限。說明 本文建立的資料庫名稱為test_cdc。
- 使用建立的測試帳號串連MySQL執行個體,詳情請參見通過DMS登入RDS MySQL。
- 執行以下命令,建立資料表。
CREATE TABLE test_cdc.`t_user` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `name` varchar(255) DEFAULT NULL, `age` tinyint(4) DEFAULT NULL, `create_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
步驟二:配置同步工具和啟動Flink任務
- 使用SSH方式登入DataFlow叢集,詳情請參見登入叢集。
- 下載Flink CDC connector和Flink StarRocks Connector。說明 下載過程需要一定時間,請耐心等待。
- 執行以下命令,將下載的Flink CDC Connector和Flink StarRocks Connector檔案複製到DataFlow叢集的/opt/apps/FLINK/flink-current/lib目錄下。
cp flink-* /opt/apps/FLINK/flink-current/lib/ - 執行以下命令,啟動叢集。重要 本文樣本僅供測試,如果是生產層級的Flink作業請使用YARN或Kubernetes方式提交,詳情請參見Apache Hadoop YARN和Native Kubernetes。
/opt/apps/FLINK/flink-current/bin/start-cluster.sh - 下載並修改設定檔。
- 下載StarRocks Migrate Tool,並上傳到DataFlow叢集的root目錄下。
- 執行以下命令,解壓縮smt.tar.gz檔案。
tar -zxvf smt.tar.gz && cd smt - 執行以下命令,編輯config_prod.conf檔案。
請根據實際資訊修改各參數值,各參數描述如下表所示。vim conf/config_prod.conf參數 描述 host RDS的內網地址。 您可以在RDS的資料庫連接頁面,單擊內網地址進行複製。例如,rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com。
port 固定值3306。 user RDS上建立的帳號。 填寫步驟一:準備測試資料中帳號的使用者名稱。
password RDS上建立帳號的密碼。 填寫步驟一:準備測試資料中帳號的密碼。
be_num EMR Serverless StarRocks執行個體的BE節點(Backend)個數,如果是最小叢集,則直接設定為1。 database Regex用於匹配RDS資料庫的名稱,表示需要同步到StarRocks的資料庫。例如, ^test.*$。table Regex用於匹配RDS表的名稱,表示需要同步到StarRocks的表。例如, ^.*$。flink.starrocks.jdbc-url 用於在StarRocks中執行查詢操作。 例如,jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030。其中,fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com為EMR Serverless StarRocks執行個體FE節點的內網地址。說明 關於如何擷取EMR Serverless StarRocks執行個體FE節點的內網地址,請參見查看執行個體列表與詳情。flink.starrocks.load-url 指定FE節點的內網地址和HTTP連接埠,格式為 EMR Serverless StarRocks執行個體FE節點的內網地址:8030。例如,fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030。說明 關於如何擷取EMR Serverless StarRocks執行個體FE節點的內網地址,請參見查看執行個體列表與詳情。flink.starrocks.username StarRocks串連使用者名稱。 flink.starrocks.password StarRocks串連密碼。 說明 預設值為空白,可以不填寫密碼。設定檔中的StarRocks相關配置樣本如下,其他參數的配置樣本請參見更多資訊。flink.starrocks.jdbc-url=jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030 flink.starrocks.load-url=fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030 flink.starrocks.username=admin flink.starrocks.password=1qaz!QAZ
- 執行以下命令,將所有建表語句都產生在result目錄下。
./starrocks-migrate-tool您可以通過
ls result命令,查看result下的目錄。返回資訊如下所示。flink-create.1.sql flink-create.all.sql starrocks-create.1.sql starrocks-create.all.sql starrocks-external-create.1.sql starrocks-external-create.all.sql說明- 本文樣本的設定檔僅定義了table-rule.1等規則,.1.sql格式檔案對應的就是table-rule.1的建表語句。如果您的設定檔有table-rule.2等規則,則.all.sql的檔案為所有規則的集合。
external-create尾碼的檔案,為對應資料來源的外表。如果對於部分情境小的維表您不想同步,則可以直接通過外表查詢,使用該檔案可以產生對應的外表。本文樣本未使用。
- 執行以下命令,建立StarRocks表。
mysql -h<EMR Serverless StarRocks執行個體FE節點的內網地址> -P9030 -uroot -p < result/starrocks-create.1.sql說明 如果修改config_prod.conf檔案時,沒有設定StarRocks串連密碼,則直接按斷行符號鍵。 - 執行以下命令,啟動Flink任務。
/opt/apps/FLINK/flink-current/bin/sql-client.sh -f result/flink-create.1.sql
步驟三:驗證資料同步結果
查詢資料
- 登入並串連EMR Serverless StarRocks執行個體,詳情請參見通過用戶端方式串連StarRocks執行個體。
- 執行以下命令,查看資料庫資訊。
show databases;返回資訊如下所示。+--------------------+ | Database | +--------------------+ | _statistics_ | | information_schema | | test_cdc | +--------------------+ 3 rows in set (0.00 sec) - 在StarRocks串連視窗執行以下命令,查看錶資料。
use test_cdc; select * from t_user;由於本文樣本中t_user表中還沒有資料,因此預期返回資料為空白。
查詢插入後的資料
- 在RDS資料庫視窗執行以下命令,插入資料。
INSERT INTO test_cdc.t_user(`name`,age,create_time,update_time) VALUES("aliyun.com.0",30,NOW(),NOW()); INSERT INTO test_cdc.t_user(`name`,age,create_time,update_time) VALUES("aliyun.com.1",31,NOW(),NOW()); INSERT INTO test_cdc.t_user(`name`,age,create_time,update_time) VALUES("aliyun.com.2",32,NOW(),NOW()); - 在StarRocks串連視窗執行以下命令,查看錶資料。
select * from t_user;返回資訊如下,表示資料已成功插入。+------+--------------+------+---------------------+---------------------+ | id | name | age | create_time | update_time | +------+--------------+------+---------------------+---------------------+ | 4 | aliyun.com.0 | 30 | 2022-03-10 13:22:41 | 2022-03-10 13:22:41 | | 5 | aliyun.com.1 | 31 | 2022-03-10 13:22:41 | 2022-03-10 13:22:41 | | 6 | aliyun.com.2 | 32 | 2022-03-10 13:22:42 | 2022-03-10 13:22:42 | +------+--------------+------+---------------------+---------------------+ 3 rows in set (0.00 sec)
同步資料更新
- 在RDS資料庫視窗執行以下命令,更新指定資料。
UPDATE test_cdc.t_user SET age=35 where name="aliyun.com.0"; - 在StarRocks串連視窗執行以下命令,查看錶資料。
select * from t_user where name = "aliyun.com.0";返回資訊如下,表示資料已同步更新。+------+--------------+------+---------------------+---------------------+ | id | name | age | create_time | update_time | +------+--------------+------+---------------------+---------------------+ | 4 | aliyun.com.0 | 35 | 2022-03-10 13:22:41 | 2022-03-10 13:22:41 | +------+--------------+------+---------------------+---------------------+ 1 row in set (0.01 sec)
同步資料刪除
- 在RDS資料庫視窗執行以下命令,刪除指定資料。
DELETE FROM test_cdc.t_user where 1=1; - 在StarRocks串連視窗執行以下命令,查看錶資料。
select * from t_user;返回資訊如下,表示資料已同步刪除。Empty set (0.01 sec)
更多資訊
- 本文檔僅供測試使用,生產層級的Flink作業請使用阿里雲VVP產品進行配置,或者使用YARN或者Kubernetes提交作業。
- 如果RDS的表有修改(
ALTER TABLE),則MySQL中alter table之後的Schema變更需要在StarRocks中手動同步。如果RDS的表有建立,則MySQL建立的表需要重新運行StarRocks Migrate Tool以進行資料同步。 - StarRocks Migrate Tool的設定檔樣本。
[db] host = rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com port = 3306 user = *** password = *** # currently available types: `mysql`, `pgsql`, `oracle`, `hive`, `clickhouse` type = mysql # # only takes effect on `type == hive`. # # Available values: kerberos, none, nosasl, kerberos_http, none_http, zk, ldap # authentication = kerberos [other] # number of backends in StarRocks be_num = 1 # `decimal_v3` is supported since StarRocks-1.8.1 use_decimal_v3 = false # directory to save the converted DDL SQL output_dir = ./result # !!!`database` `table` `schema` are case sensitive in `oracle`!!! [table-rule.1] # pattern to match databases for setting properties # !!! database should be a `whole instance(or pdb) name` but not a regex when it comes with an `oracle db` !!! database = ^test.*$ # pattern to match tables for setting properties table = ^.*$ # `schema` only takes effect on `postgresql` and `oracle` and `sqlserver` schema = ^.*$ ############################################ ### starrocks table configurations ############################################ # # set a column as the partition_key # partition_key = p_key # # override the auto-generated partitions # partitions = START ("2021-01-02") END ("2021-01-04") EVERY (INTERVAL 1 day) # # only take effect on tables without primary keys or unique indexes # duplicate_keys=k1,k2 # # override the auto-generated distributed keys # distributed_by=k1,k2 # # override the auto-generated distributed buckets # bucket_num=32 # # properties.xxxxx: properties used to create tables # properties.in_memory = false ############################################ ### flink sink configurations ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated ############################################ flink.starrocks.jdbc-url=jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030 flink.starrocks.load-url=fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030 flink.starrocks.username=admin flink.starrocks.password=1qaz!QAZ flink.starrocks.sink.max-retries=10 flink.starrocks.sink.buffer-flush.interval-ms=15000 flink.starrocks.sink.properties.format=json flink.starrocks.sink.properties.strip_outer_array=true # # used to set the server-id for mysql-cdc jobs instead of using a random server-id # flink.cdc.server-id = 5000 ############################################ ### flink-cdc plugin configuration for `postgresql` ############################################ # # for `9.*` decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming # # refer to https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html # # and https://debezium.io/documentation/reference/postgres-plugins.html # flink.cdc.decoding.plugin.name = decoderbufs