全部產品
Search
文件中心

Realtime Compute for Apache Flink:SelectDB(公測中)

更新時間:Nov 23, 2024

本文介紹如何使用自訂SelectDB連接器寫入資料至雲資料庫SelectDB版。

背景資訊

雲資料庫 SelectDB 版是新一代即時資料倉庫SelectDB在阿里雲上的全託管服務,100%相容Apache Doris。您可以在阿里雲上便捷地購買SelectDB數倉服務,滿足海量資料分析需求,具體的產品優勢和應用情境請參見什麼是雲資料庫SelectDB版

自訂SelectDB連接器支援的資訊如下:

類別

詳情

支援類型

結果表

運行模式

流模式和批模式

資料格式

JSON和CSV

特有監控指標

API種類

DataStream和SQL

是否支援更新/刪除

特色功能

  • 支援整庫資料同步。

  • SelectDB連接器提供Exactly-Once語義,保證資料不重複也不丟失。

  • 相容1.0及以上Apache Doris,可以使用Flink SelectDB自訂連接器同步資料至Apache Doris。

注意事項

  • 僅Realtime ComputeFlink版的引擎VVR 8.0.10及以上版本支援使用SelectDB自訂連接器。

  • SelectDB自訂連接器使用過程如有問題,請先提交工單給雲資料庫SelectDB版。

  • 同步資料至雲資料庫SelectDB版時,需要滿足以下條件:

    • 已建立ApsaraDB for SelectDB執行個體,如何購買執行個體請參見建立執行個體

    • 已配置IP白名單,配置白名單詳情請參見設定白名單

使用方法

  1. 單擊JAR包擷取SelectDB自訂連接器(需要為1.15~1.17)。

  2. Realtime Compute開發控制台上,上傳SelectDB自訂連接器,詳情請參見管理自訂連接器

  3. 在SQL作業中使用SelectDB自訂連接器,作業開發詳情請參見SQL作業開發

    具體的文法結構如下。

    CREATE TABLE selectdb_sink (
      emp_no       INT ,
      birth_date   DATE,
      first_name   STRING,
      last_name    STRING,
      gender       STRING,
      hire_date    DATE
    ) WITH (
      'connector' = 'doris',
      'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
      'table.identifier' = 'test.employees',
      'username' = 'admin',
      'password' = '****',
      'sink.enable-delete' = 'true'
    );

    connector為表類型,固定值為doris。SelectDB自訂連接器結果表參數配置詳情請參見Sink配置項

類型映射

詳情請參見Doris & Flink Column Type Mapping

使用樣本

本文以MySQL資料寫入SelectDB為例為您詳細介紹如何使用SelectDB自訂連接器。

  1. 準備工作。

    1. 建立Flink工作空間、MySQL和SelectDB執行個體,詳情請參見開通Realtime ComputeFlink版第一步:快捷建立RDS MySQL執行個體與設定資料庫建立執行個體

    2. 在MySQL中建立名稱為order_dw_mysql的資料庫和名稱為orders的表並匯入測試資料。

      CREATE TABLE `orders` (
        order_id bigint not null primary key,
        user_id varchar(50) not null,
        shop_id bigint not null,
        product_id bigint not null,
        buy_fee decimal(20,2) not null,   
        create_time timestamp not null,
        update_time timestamp not null default now(),
        state int not null 
      );
      
      INSERT INTO orders VALUES
      (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
      (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
      (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
      (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
      (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
      (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
      (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
    3. 通過DMS串連雲資料庫SelectDB版執行個體後建立名稱為selectdb的資料庫和名稱為selecttable的表。

      CREATE DATABASE selectdb;
      
      CREATE TABLE `selecttable` (
        order_id bigint,
        user_id varchar(50),
        shop_id bigint,
        product_id bigint,
        buy_fee DECIMAL,   
        create_time DATETIME,
        update_time DATETIME,
        state int
       )DISTRIBUTED BY HASH(order_id) BUCKETS 10;
    4. 將Realtime ComputeFlink版的虛擬交換器的網段資訊添加到SelectDB的白名單中,詳情請參見空間管理與操作設定白名單

  2. Realtime Compute開發控制台上建立Flink SQL作業並啟動。

    1. 建立名稱為mysqlcatalog的MySQL Catalog,詳情請參見管理MySQL Catalog

    2. 單擊JAR包擷取SelectDB自訂連接器(需要為1.15~1.17),註冊SelectDB自訂連接器,詳情請參見管理自訂連接器

    3. 資料開發 > ETL新增作業草稿,程式碼範例如下。

      CREATE TEMPORARY TABLE  selectdb_sink (
        order_id BIGINT,
        user_id STRING,
        shop_id BIGINT,
        product_id BIGINT,
        buy_fee DECIMAL,   
        create_time TIMESTAMP(6),
        update_time TIMESTAMP(6),
        state int
      ) 
        WITH (
        'connector' = 'doris',
        'fenodes' = 'selectdb-cn-jfj3z******.selectdbfe.rds.aliyuncs.com:8080',
        'table.identifier' = 'selectdb.selecttable',
        'username' = 'admin',
        'password' = '${secret_values.selectdb}',
        'sink.enable-delete' = 'true'
      );
      
      INSERT INTO selectdb_sink SELECT * FROM `mysqlcatalog`.`order_dw_mysql`.`orders`;
    4. 單擊部署後無狀態啟動作業,詳情請參見部署作業作業啟動

  3. 通過DMS串連雲資料庫SelectDB版執行個體後,查詢名稱為selecttable的表資料。

    SELECT * FROM `selecttable` ;