Realtime Compute for Apache Flink は Flink Change Data Capture (CDC) を使用して、ソースから宛先にデータをインジェストします。YAML ジョブを開発してデータを同期できます。このトピックでは、Flink CDC データインジェストジョブを作成して、MySQL データベースから StarRocks にすべてのデータを同期する方法について説明します。
前提条件
Flink ワークスペースが作成されていること。詳細については、「Realtime Compute for Apache Flink の有効化」をご参照ください。
アップストリームおよびダウンストリームストレージ
RDS MySQL インスタンスが作成されていること。詳細については、「RDS MySQL インスタンスの作成」をご参照ください。
StarRocks インスタンスが作成されていること。詳細については、「手順」をご参照ください。
説明RDS MySQL インスタンスと StarRocks インスタンスは、Flink ワークスペースと同じ VPC にある必要があります。そうでない場合は、ネットワーク接続を確立し、RDS MySQL インスタンスの IP アドレスホワイトリストを設定する必要があります。詳細については、「VPC をまたいで他のサービスにアクセスする方法」、「インターネットにアクセスする方法」、および「ホワイトリストの設定方法」をご参照ください。
背景情報
MySQL インスタンスに order_dw_mysql という名前のデータベースがあると仮定します。このデータベースには、orders、orders_pay、product_catalog の 3 つのビジネス テーブルが含まれています。これらのテーブルとそのデータを StarRocks の order_dw_sr データベースに同期する Flink CDC データインジェストジョブを開発するには、次の手順に従います。
ステップ 1:RDS MySQL テストデータの準備
データベースとアカウントの作成
RDS MySQL インスタンスで、order_dw_mysql という名前のデータベースと、そのデータベースに対する読み取りおよび書き込み権限を持つ標準アカウントを作成します。詳細については、「データベースとアカウントの作成」および「データベースの管理」をご参照ください。
DMS を使用して RDS MySQL インスタンスにログイン
詳細については、「DMS を使用した RDS MySQL インスタンスへのログイン」をご参照ください。
SQL コンソールで、次のコマンドを入力し、[実行] をクリックして 3 つのビジネス テーブルを作成し、データを挿入します。
CREATE TABLE `orders` ( order_id bigint not null primary key, user_id varchar(50) not null, shop_id bigint not null, product_id bigint not null, buy_fee numeric(20,2) not null, create_time timestamp not null, update_time timestamp not null default now(), state int not null ); CREATE TABLE `orders_pay` ( pay_id bigint not null primary key, order_id bigint not null, pay_platform int not null, create_time timestamp not null ); CREATE TABLE `product_catalog` ( product_id bigint not null primary key, catalog_name varchar(50) not null ); -- データの準備 INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee'); INSERT INTO orders VALUES (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1), (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1), (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1), (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1), (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1), (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1), (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1); INSERT INTO orders_pay VALUES (2001, 100001, 1, '2023-02-15 17:40:56'), (2002, 100002, 1, '2023-02-15 17:40:56'), (2003, 100003, 0, '2023-02-15 17:40:56'), (2004, 100004, 0, '2023-02-15 17:40:56'), (2005, 100005, 0, '2023-02-15 18:40:56'), (2006, 100006, 0, '2023-02-15 18:40:56'), (2007, 100007, 0, '2023-02-15 18:40:56');
ステップ 2:Flink CDC データインジェストジョブの開発
[コンソール] をクリックして、目的のワークスペースに移動します。
左側のナビゲーションウィンドウで、 を選択します。
アイコンをクリックし、[テンプレートから新規作成] をクリックし、[MySQL to StarRocks Data Synchronization] を選択してから、[次へ] をクリックします。
[ジョブ名] を入力し、[格納先] を指定し、[エンジンバージョン] を選択してから、[OK] をクリックします。
YAML ジョブコードを設定します。
次のコードは、MySQL の order_dw_mysql データベースから StarRocks の order_dw_sr データベースにすべてのテーブルを同期する方法の例を示しています。
source: type: mysql hostname: rm-bp1rk934iidc3****.mysql.rds.aliyuncs.com port: 3306 username: ${secret_values.mysqlusername} password: ${secret_values.mysqlpassword} tables: order_dw_mysql.\.* server-id: 8601-8604 # (オプション) 増分フェーズ中に新しく作成されたテーブルからデータを同期します。 scan.binlog.newly-added-table.enabled: true # (オプション) テーブルとフィールドのコメントを同期します。 include-comments.enabled: true # (オプション) TaskManager の OutOfMemory エラーの可能性を防ぐために、無制限のチャンクの配布を優先します。 scan.incremental.snapshot.unbounded-chunk-first.enabled: true # (オプション) 読み取りを高速化するために解析フィルターを有効にします。 scan.only.deserialize.captured.tables.changelog.enabled: true sink: type: starrocks name: StarRocks Sink jdbc-url: jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030 load-url: fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030 username: ${secret_values.starrocksusername} password: ${secret_values.starrockspassword} table.create.properties.replication_num: 1 sink.buffer-flush.interval-ms: 5000 # 5 秒ごとにデータをフラッシュします。 route: - source-table: order_dw_mysql.\.* sink-table: order_dw_sr.<> replace-symbol: <> description: route all tables in source_db to sink_db pipeline: name: Sync MySQL Database to StarRocks次の表に、この例で必要な構成情報を示します。データインジェストパラメーターの詳細については、「MySQL」および「StarRocks」をご参照ください。
説明YAML ジョブはプロジェクト変数のみをサポートします。変数を使用すると、プレーンテキストパスワードやその他の機密情報が公開されるのを防ぐことができます。詳細については、「変数管理」をご参照ください。
カテゴリ
パラメーター
説明
値の例
source
hostname
MySQL データベースの IP アドレスまたはホスト名。
VPC アドレスの使用を推奨します。
rm-bp1rk934iidc3****.mysql.rds.aliyuncs.comport
MySQL データベースサービスのポート番号。
3306
username
MySQL データベースサービスのユーザー名とパスワード。「ステップ 1:RDS MySQL テストデータの準備」で作成したアカウント情報を使用します。
${secret_values.mysqlusername}password
${secret_values.mysqlpassword}tables
MySQL テーブルの名前。正規表現をサポートしており、複数のテーブルからデータを読み取ることができます。
このトピックでは、order_dw_mysql データベース内のすべてのテーブルとデータを同期します。
order_dw_mysql.\.*
server-id
データベースクライアントの数値 ID。
5405-5415
sink
jdbc-url
Java Database Connectivity (JDBC) URL。
jdbc:mysql://ip:port形式でフロントエンド (FE) の IP アドレスとクエリポートを指定します。E-MapReduce コンソールの [インスタンス詳細] タブで、ターゲットインスタンスの FE の [内部ネットワークアドレス] と [クエリポート] を表示できます。
jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030load-url
FE ノードへの接続に使用される HTTP サービス URL。
E-MapReduce コンソールの [インスタンス詳細] タブで、ターゲットインスタンスの FE の [内部エンドポイント] と [HTTP ポート] を表示できます。
fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030username
StarRocks 接続のユーザー名とパスワード。
StarRocks インスタンスを作成したときに指定したユーザー名とパスワードを使用します。
説明この例では、プレーンテキストパスワードやその他の機密情報が公開されるのを防ぐために変数を使用しています。詳細については、「変数管理」をご参照ください。
${secret_values.starrocksusername}password
${secret_values.starrockspassword}sink.buffer-flush.interval-ms
内部バッファーの更新間隔。
この例ではデータ量が少ないため、結果を早く確認できるように短い間隔 (5 秒) を設定しています。
5000
route
source-table
ルーティングする先祖テーブルを指定します。
正規表現を使用して複数のテーブルを照合できます。たとえば、
order_dw_mysql.\.*はorder_dw_mysqlデータベース内のすべてのテーブルをルーティングします。order_dw_mysql.\.*
sink-table
データルーティングの宛先を指定します。
replace-symbolのシンボルを各先祖テーブル名のプレースホルダーとして使用して、多対多のルーティングを実装できます。ルーティングルールの詳細については、「ルートモジュール」をご参照ください。
order_dw_sr.<>
replace-symbol
パターンマッチング機能を使用するときに先祖テーブル名を表す文字列。
<>
[デプロイ] をクリックします。
ステップ 3:Flink CDC データインジェストジョブの開始
[データインジェスト] ページで、[デプロイ] をクリックします。表示されるダイアログボックスで、[OK] をクリックします。
ページで、ターゲットの YAML ジョブを見つけ、[操作] 列の [開始] をクリックします。
[開始] をクリックします。
この例では、[ステートレス開始] が選択されています。パラメーター設定の詳細については、「ジョブの開始」をご参照ください。ジョブが開始された後、[ジョブ O&M] ページでそのランタイム情報とステータスをモニターできます。
ステップ 4:StarRocks での同期結果の確認
YAML ジョブが [実行中] 状態になった後、StarRocks でデータ同期の結果を表示できます。
左側のナビゲーションウィンドウで、[SQL エディタ] をクリックします。[データベース] タブで、
アイコンをクリックします。default_catalog の下に order_dw_sr という名前のデータベースが表示されます。
[クエリリスト] タブで、[+ファイル] をクリックして [クエリスクリプト] を作成します。次に、次の SQL 文を入力し、[実行] をクリックします。
SELECT * FROM default_catalog.order_dw_sr.orders order by order_id; SELECT * FROM default_catalog.order_dw_sr.orders_pay order by pay_id; SELECT * FROM default_catalog.order_dw_sr.product_catalog order by product_id;コマンドの下に表示される同期結果を表示します。
MySQL データベースのテーブルとデータが StarRocks に存在することがわかります。

参考資料
Flink CDC データインジェストジョブの開発方法の詳細については、「Flink CDC データインジェストジョブの開発 (パブリックプレビュー)」をご参照ください。
Flink CDC データインジェストジョブのソース、シンク、変換、およびルートモジュールの詳細については、「Flink CDC データインジェストジョブの開発リファレンス」をご参照ください。