全部產品
Search
文件中心

Realtime Compute for Apache Flink:資料攝入YAML作業快速入門

更新時間:Sep 28, 2024

Realtime ComputeFlink版基於Flink CDC,通過開發YAML作業的方式有效地實現了將資料從源端同步到目標端的資料攝入工作。本文介紹如何快速構建一個YAML作業將MySQL庫中的所有資料同步到StarRocks中。

前提條件

背景資訊

假設MySQL執行個體中有一個order_dw_mysql庫,裡面有名稱為orders、orders_pay和product_catalog的3張業務表。此時,如果您希望開發一個資料攝入YAML作業,將這些表和資料都同步到StarRocks的order_dw_sr資料庫中,則可以按照以下步驟進行:

  1. 步驟一:準備RDS MySQL測試資料

  2. 步驟二:開發資料攝入YAML作業

  3. 步驟三:啟動資料攝入YAML作業

  4. 步驟四:在StarRocks上查看同步結果

步驟一:準備RDS MySQL測試資料

  1. 建立資料庫和帳號。

    為目標執行個體建立名稱為order_dw_mysql資料庫和具有對應資料庫讀寫權限的普通帳號。具體操作請參見建立資料庫和帳號管理資料庫

  2. 通過DMS登入RDS MySQL。

    詳情請參見通過DMS登入RDS MySQL

  3. 在已登入的SQL Console視窗,輸入如下命令後單擊執行,建立資料庫和三張業務表,並插入資料。

    CREATE TABLE `orders` (
      order_id bigint not null primary key,
      user_id varchar(50) not null,
      shop_id bigint not null,
      product_id bigint not null,
      buy_fee numeric(20,2) not null,   
      create_time timestamp not null,
      update_time timestamp not null default now(),
      state int not null 
    );
    
    
    CREATE TABLE `orders_pay` (
      pay_id bigint not null primary key,
      order_id bigint not null,
      pay_platform int not null, 
      create_time timestamp not null
    );
    
    
    CREATE TABLE `product_catalog` (
      product_id bigint not null primary key,
      catalog_name varchar(50) not null
    );
    
    -- 準備資料
    INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee');
    
    INSERT INTO orders VALUES
    (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
    (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
    (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
    (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
    (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
    (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
    (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
    
    INSERT INTO orders_pay VALUES
    (2001, 100001, 1, '2023-02-15 17:40:56'),
    (2002, 100002, 1, '2023-02-15 17:40:56'),
    (2003, 100003, 0, '2023-02-15 17:40:56'),
    (2004, 100004, 0, '2023-02-15 17:40:56'),
    (2005, 100005, 0, '2023-02-15 18:40:56'),
    (2006, 100006, 0, '2023-02-15 18:40:56'),
    (2007, 100007, 0, '2023-02-15 18:40:56');

步驟二:開發資料攝入YAML作業

  1. 登入Realtime Compute管理主控台

  2. 在左側導覽列選擇資料開發 > 資料攝入

  3. 單擊建立,選擇MySQL到Starrocks資料同步,單擊下一步

  4. 填寫作業名稱儲存位置和選擇引擎版本後,單擊確定

  5. 配置YAML作業代碼資訊。

    將MySQL中order_dw_mysql資料庫下的所有表同步到starrocks的order_dw_sr資料庫中,程式碼範例如下。

    source:
      type: mysql
      hostname: rm-bp1rk934iidc3****.mysql.rds.aliyuncs.com
      port: 3306
      username: ${secret_values.mysqlusername}
      password: ${secret_values.mysqlpassword}
      tables: order_dw_mysql.\.*
      server-id: 5405-5415
    
    sink:
      type: starrocks
      name: StarRocks Sink
      jdbc-url: jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030
      load-url: fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030
      username: ${secret_values.starrocksusername}
      password: ${secret_values.starrockspassword}
      table.create.properties.replication_num: 1
      
    route:
      - source-table: order_dw_mysql.\.*
        sink-table: order_dw_sr.<>
        replace-symbol: <>
        description: route all tables in source_db to sink_db
    
    pipeline:
      name: Sync MySQL Database to StarRocks

    關於MySQL和Starrocks的本樣本需要的配置資訊說明如下表所示,資料攝入更多參數詳情請參見MySQLStarRocks

    類別

    參數

    說明

    樣本值

    source

    hostname

    MySQL資料庫的IP地址或者Hostname。

    建議填寫Virtual Private Cloud地址。

    rm-bp1rk934iidc3****.mysql.rds.aliyuncs.com

    port

    MySQL資料庫服務的連接埠號碼。

    3306

    username

    MySQL資料庫服務的使用者名稱和密碼。填寫您步驟一:準備RDS MySQL測試資料中建立的帳號和密碼資訊。

    說明

    本樣本使用變數,可以避免明文展示密碼等資訊,詳情請參見變數管理

    ${secret_values.mysqlusername}

    password

    ${secret_values.mysqlpassword}

    tables

    MySQL表名。支援Regex以讀取多個表的資料。

    本文將同步order_dw_mysql資料庫所有表及資料。

    order_dw_mysql.\.*

    server-id

    資料庫用戶端的一個數字ID。

    5405-5415

    sink

    jdbc-url

    JDBC串連的URL。

    指定FE(Front End)的IP和查詢連接埠,格式為jdbc:mysql://ip:port

    您可以在E-MapReduce控制台執行個體詳情頁簽,查看目標執行個體的FE內網地址查詢連接埠

    jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030

    load-url

    串連到FE節點的HTTP服務URL。

    您可以在E-MapReduce控制台執行個體詳情頁簽,查看目標執行個體的FE內網地址HTTP連接埠

    fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030

    username

    StarRocks串連使用者名稱和密碼。

    此處需要填寫為您開通StarRocks時填寫的使用者名稱和密碼資訊。

    說明

    本樣本使用變數,可以避免明文展示密碼等資訊,詳情請參見變數管理

    ${secret_values.starrocksusername}

    password

    ${secret_values.starrockspassword}

    route

    source-table

    指定生效上遊表。

    order_dw_mysql.\.*

    sink-table

    指定資料路由的目標位置。

    order_dw_sr.<>

    replace-symbol

    在使用模式比對功能時,用於指代上遊表名的字串。

    <>

  6. 單擊部署

步驟三:啟動資料攝入YAML作業

  1. 資料攝入頁面,單擊部署後,在彈出的對話方塊中,單擊確定

  2. 營運中心 > 作業營運頁面,單擊目標YAML作業操作中的啟動

  3. 單擊啟動

    本樣本選擇為無狀態啟動,參數配置詳情請參見作業啟動。作業啟動後,您可以在作業營運頁面觀察作業的運行資訊和狀態。

步驟四:在StarRocks上查看同步結果

當YAML作業處於運行中後,您就可以在StarRocks查看資料同步情況。

  1. 通過EMR StarRocks Manager串連StarRocks執行個體

  2. 在左側導覽列,單擊SQL Editor,在資料庫頁簽,單擊image按鈕。

    您會看到default_catalog下出現名稱為order_dw_sr的資料庫。

  3. 查詢列表頁簽,單擊+檔案,建立查詢指令碼後,輸入以下SQL語句,單擊運行

    SELECT * FROM default_catalog.order_dw_sr.orders order by order_id;
    SELECT * FROM default_catalog.order_dw_sr.orders_pay order by pay_id;
    SELECT * FROM default_catalog.order_dw_sr.product_catalog order by product_id;
  4. 在命令下方查看同步結果。

    您會看到StarRocks中已存在和MySQL資料庫中相同名稱的表及資料。

    image

相關文檔