すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Flink CDC データインジェストジョブのクイックスタート

最終更新日:Mar 01, 2026

Realtime Compute for Apache Flink は Flink Change Data Capture (CDC) を使用して、ソースから宛先にデータをインジェストします。YAML ジョブを開発してデータを同期できます。このトピックでは、Flink CDC データインジェストジョブを作成して、MySQL データベースから StarRocks にすべてのデータを同期する方法について説明します。

前提条件

背景情報

MySQL インスタンスに order_dw_mysql という名前のデータベースがあると仮定します。このデータベースには、orders、orders_pay、product_catalog の 3 つのビジネス テーブルが含まれています。これらのテーブルとそのデータを StarRocks の order_dw_sr データベースに同期する Flink CDC データインジェストジョブを開発するには、次の手順に従います。

  1. ステップ 1:RDS MySQL テストデータの準備

  2. ステップ 2:Flink CDC データインジェストジョブの開発

  3. ステップ 3:Flink CDC データインジェストジョブの開始

  4. ステップ 4:StarRocks での同期結果の確認

ステップ 1:RDS MySQL テストデータの準備

  1. データベースとアカウントの作成

    RDS MySQL インスタンスで、order_dw_mysql という名前のデータベースと、そのデータベースに対する読み取りおよび書き込み権限を持つ標準アカウントを作成します。詳細については、「データベースとアカウントの作成」および「データベースの管理」をご参照ください。

  2. DMS を使用して RDS MySQL インスタンスにログイン

    詳細については、「DMS を使用した RDS MySQL インスタンスへのログイン」をご参照ください。

  3. SQL コンソールで、次のコマンドを入力し、[実行] をクリックして 3 つのビジネス テーブルを作成し、データを挿入します。

    CREATE TABLE `orders` (
      order_id bigint not null primary key,
      user_id varchar(50) not null,
      shop_id bigint not null,
      product_id bigint not null,
      buy_fee numeric(20,2) not null,   
      create_time timestamp not null,
      update_time timestamp not null default now(),
      state int not null 
    );
    
    
    CREATE TABLE `orders_pay` (
      pay_id bigint not null primary key,
      order_id bigint not null,
      pay_platform int not null, 
      create_time timestamp not null
    );
    
    
    CREATE TABLE `product_catalog` (
      product_id bigint not null primary key,
      catalog_name varchar(50) not null
    );
    
    -- データの準備
    INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee');
    
    INSERT INTO orders VALUES
    (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
    (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
    (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
    (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
    (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
    (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
    (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
    
    INSERT INTO orders_pay VALUES
    (2001, 100001, 1, '2023-02-15 17:40:56'),
    (2002, 100002, 1, '2023-02-15 17:40:56'),
    (2003, 100003, 0, '2023-02-15 17:40:56'),
    (2004, 100004, 0, '2023-02-15 17:40:56'),
    (2005, 100005, 0, '2023-02-15 18:40:56'),
    (2006, 100006, 0, '2023-02-15 18:40:56'),
    (2007, 100007, 0, '2023-02-15 18:40:56');

ステップ 2:Flink CDC データインジェストジョブの開発

  1. Realtime Compute for Apache Flink コンソールにログインします。

  2. [コンソール] をクリックして、目的のワークスペースに移動します。

  3. 左側のナビゲーションウィンドウで、[データ開発] > [データインジェスト] を選択します。

  4. image アイコンをクリックし、[テンプレートから新規作成] をクリックし、[MySQL to StarRocks Data Synchronization] を選択してから、[次へ] をクリックします。

    image

  5. [ジョブ名] を入力し、[格納先] を指定し、[エンジンバージョン] を選択してから、[OK] をクリックします。

  6. YAML ジョブコードを設定します。

    次のコードは、MySQL の order_dw_mysql データベースから StarRocks の order_dw_sr データベースにすべてのテーブルを同期する方法の例を示しています。

    source:
      type: mysql
      hostname: rm-bp1rk934iidc3****.mysql.rds.aliyuncs.com
      port: 3306
      username: ${secret_values.mysqlusername}
      password: ${secret_values.mysqlpassword}
      tables: order_dw_mysql.\.*
      server-id: 8601-8604
      # (オプション) 増分フェーズ中に新しく作成されたテーブルからデータを同期します。
      scan.binlog.newly-added-table.enabled: true
      # (オプション) テーブルとフィールドのコメントを同期します。
      include-comments.enabled: true
      # (オプション) TaskManager の OutOfMemory エラーの可能性を防ぐために、無制限のチャンクの配布を優先します。
      scan.incremental.snapshot.unbounded-chunk-first.enabled: true
      # (オプション) 読み取りを高速化するために解析フィルターを有効にします。
      scan.only.deserialize.captured.tables.changelog.enabled: true 
    
    sink:
      type: starrocks
      name: StarRocks Sink
      jdbc-url: jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030
      load-url: fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030
      username: ${secret_values.starrocksusername}
      password: ${secret_values.starrockspassword}
      table.create.properties.replication_num: 1
      sink.buffer-flush.interval-ms: 5000 # 5 秒ごとにデータをフラッシュします。
      
    route:
      - source-table: order_dw_mysql.\.*
        sink-table: order_dw_sr.<>
        replace-symbol: <>
        description: route all tables in source_db to sink_db
    
    pipeline:
      name: Sync MySQL Database to StarRocks

    次の表に、この例で必要な構成情報を示します。データインジェストパラメーターの詳細については、「MySQL」および「StarRocks」をご参照ください。

    説明

    YAML ジョブはプロジェクト変数のみをサポートします。変数を使用すると、プレーンテキストパスワードやその他の機密情報が公開されるのを防ぐことができます。詳細については、「変数管理」をご参照ください。

    カテゴリ

    パラメーター

    説明

    値の例

    source

    hostname

    MySQL データベースの IP アドレスまたはホスト名。

    VPC アドレスの使用を推奨します。

    rm-bp1rk934iidc3****.mysql.rds.aliyuncs.com

    port

    MySQL データベースサービスのポート番号。

    3306

    username

    MySQL データベースサービスのユーザー名とパスワード。「ステップ 1:RDS MySQL テストデータの準備」で作成したアカウント情報を使用します。

    ${secret_values.mysqlusername}

    password

    ${secret_values.mysqlpassword}

    tables

    MySQL テーブルの名前。正規表現をサポートしており、複数のテーブルからデータを読み取ることができます。

    このトピックでは、order_dw_mysql データベース内のすべてのテーブルとデータを同期します。

    order_dw_mysql.\.*

    server-id

    データベースクライアントの数値 ID。

    5405-5415

    sink

    jdbc-url

    Java Database Connectivity (JDBC) URL。

    jdbc:mysql://ip:port 形式でフロントエンド (FE) の IP アドレスとクエリポートを指定します。

    E-MapReduce コンソールの [インスタンス詳細] タブで、ターゲットインスタンスの FE の [内部ネットワークアドレス][クエリポート] を表示できます。

    jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030

    load-url

    FE ノードへの接続に使用される HTTP サービス URL。

    E-MapReduce コンソールの [インスタンス詳細] タブで、ターゲットインスタンスの FE の [内部エンドポイント][HTTP ポート] を表示できます。

    fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030

    username

    StarRocks 接続のユーザー名とパスワード。

    StarRocks インスタンスを作成したときに指定したユーザー名とパスワードを使用します。

    説明

    この例では、プレーンテキストパスワードやその他の機密情報が公開されるのを防ぐために変数を使用しています。詳細については、「変数管理」をご参照ください。

    ${secret_values.starrocksusername}

    password

    ${secret_values.starrockspassword}

    sink.buffer-flush.interval-ms

    内部バッファーの更新間隔。

    この例ではデータ量が少ないため、結果を早く確認できるように短い間隔 (5 秒) を設定しています。

    5000

    route

    source-table

    ルーティングする先祖テーブルを指定します。

    正規表現を使用して複数のテーブルを照合できます。たとえば、order_dw_mysql.\.*order_dw_mysql データベース内のすべてのテーブルをルーティングします。

    order_dw_mysql.\.*

    sink-table

    データルーティングの宛先を指定します。

    replace-symbol のシンボルを各先祖テーブル名のプレースホルダーとして使用して、多対多のルーティングを実装できます。

    ルーティングルールの詳細については、「ルートモジュール」をご参照ください。

    order_dw_sr.<>

    replace-symbol

    パターンマッチング機能を使用するときに先祖テーブル名を表す文字列。

    <>

  7. [デプロイ] をクリックします。

ステップ 3:Flink CDC データインジェストジョブの開始

  1. [データインジェスト] ページで、[デプロイ] をクリックします。表示されるダイアログボックスで、[OK] をクリックします。

  2. [オペレーションセンター] > [ジョブ O&M] ページで、ターゲットの YAML ジョブを見つけ、[操作] 列の [開始] をクリックします。

  3. [開始] をクリックします。

    この例では、[ステートレス開始] が選択されています。パラメーター設定の詳細については、「ジョブの開始」をご参照ください。ジョブが開始された後、[ジョブ O&M] ページでそのランタイム情報とステータスをモニターできます。

ステップ 4:StarRocks での同期結果の確認

YAML ジョブが [実行中] 状態になった後、StarRocks でデータ同期の結果を表示できます。

  1. EMR StarRocks Manager を使用して StarRocks インスタンスに接続します。

  2. 左側のナビゲーションウィンドウで、[SQL エディタ] をクリックします。[データベース] タブで、image アイコンをクリックします。

    default_catalog の下に order_dw_sr という名前のデータベースが表示されます。

  3. [クエリリスト] タブで、[+ファイル] をクリックして [クエリスクリプト] を作成します。次に、次の SQL 文を入力し、[実行] をクリックします。

    SELECT * FROM default_catalog.order_dw_sr.orders order by order_id;
    SELECT * FROM default_catalog.order_dw_sr.orders_pay order by pay_id;
    SELECT * FROM default_catalog.order_dw_sr.product_catalog order by product_id;
  4. コマンドの下に表示される同期結果を表示します。

    MySQL データベースのテーブルとデータが StarRocks に存在することがわかります。

    image

参考資料