全部产品
Search
文档中心

实时计算Flink版:SelectDB(公测中)

更新时间:Nov 22, 2024

本文介绍如何使用自定义SelectDB连接器写入数据至云数据库SelectDB版。

背景信息

云数据库 SelectDB 版是新一代实时数据仓库SelectDB在阿里云上的全托管服务,100%兼容Apache Doris。您可以在阿里云上便捷地购买SelectDB数仓服务,满足海量数据分析需求,具体的产品优势和应用场景请参见什么是云数据库SelectDB版

自定义SelectDB连接器支持的信息如下:

类别

详情

支持类型

结果表

运行模式

流模式和批模式

数据格式

JSON和CSV

特有监控指标

API种类

DataStream和SQL

是否支持更新/删除

特色功能

  • 支持整库数据同步。

  • SelectDB连接器提供Exactly-Once语义,保证数据不重复也不丢失。

  • 兼容1.0及以上Apache Doris,可以使用Flink SelectDB自定义连接器同步数据至Apache Doris。

注意事项

  • 仅实时计算Flink版的引擎VVR 8.0.10及以上版本支持使用SelectDB自定义连接器。

  • SelectDB自定义连接器使用过程如有问题,请先提交工单给云数据库SelectDB版。

  • 同步数据至云数据库SelectDB版时,需要满足以下条件:

    • 已创建云数据库 SelectDB 版实例,如何购买实例请参见创建实例

    • 已配置IP白名单,配置白名单详情请参见设置白名单

使用方法

  1. 单击JAR包获取SelectDB自定义连接器(需要为1.15~1.17)。

  2. 实时计算开发控制台上,上传SelectDB自定义连接器,详情请参见管理自定义连接器

  3. 在SQL作业中使用SelectDB自定义连接器,作业开发详情请参见SQL作业开发

    具体的语法结构如下。

    CREATE TABLE selectdb_sink (
      emp_no       INT ,
      birth_date   DATE,
      first_name   STRING,
      last_name    STRING,
      gender       STRING,
      hire_date    DATE
    ) WITH (
      'connector' = 'doris',
      'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
      'table.identifier' = 'test.employees',
      'username' = 'admin',
      'password' = '****',
      'sink.enable-delete' = 'true'
    );

    connector为表类型,固定值为doris。SelectDB自定义连接器结果表参数配置详情请参见Sink配置项

类型映射

详情请参见Doris & Flink Column Type Mapping

使用示例

本文以MySQL数据写入SelectDB为例为您详细介绍如何使用SelectDB自定义连接器。

  1. 准备工作。

    1. 创建Flink工作空间、MySQL和SelectDB实例,详情请参见开通实时计算Flink版第一步:创建RDS MySQL实例与配置数据库创建实例

    2. 在MySQL中创建名称为order_dw_mysql的数据库和名称为orders的表并导入测试数据。

      CREATE TABLE `orders` (
        order_id bigint not null primary key,
        user_id varchar(50) not null,
        shop_id bigint not null,
        product_id bigint not null,
        buy_fee decimal(20,2) not null,   
        create_time timestamp not null,
        update_time timestamp not null default now(),
        state int not null 
      );
      
      INSERT INTO orders VALUES
      (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
      (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
      (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
      (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
      (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
      (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
      (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
    3. 通过DMS连接云数据库SelectDB版实例后创建名称为selectdb的数据库和名称为selecttable的表。

      CREATE DATABASE selectdb;
      
      CREATE TABLE `selecttable` (
        order_id bigint,
        user_id varchar(50),
        shop_id bigint,
        product_id bigint,
        buy_fee DECIMAL,   
        create_time DATETIME,
        update_time DATETIME,
        state int
       )DISTRIBUTED BY HASH(order_id) BUCKETS 10;
    4. 将实时计算Flink版的虚拟交换机的网段信息添加到SelectDB的白名单中,详情请参见操作指导设置白名单

  2. 实时计算开发控制台上创建Flink SQL作业并启动。

    1. 创建名称为mysqlcatalog的MySQL Catalog,详情请参见管理MySQL Catalog

    2. 单击JAR包获取SelectDB自定义连接器(需要为1.15~1.17),注册SelectDB自定义连接器,详情请参见管理自定义连接器

    3. 数据开发 > ETL新建作业草稿,代码示例如下。

      CREATE TEMPORARY TABLE  selectdb_sink (
        order_id BIGINT,
        user_id STRING,
        shop_id BIGINT,
        product_id BIGINT,
        buy_fee DECIMAL,   
        create_time TIMESTAMP(6),
        update_time TIMESTAMP(6),
        state int
      ) 
        WITH (
        'connector' = 'doris',
        'fenodes' = 'selectdb-cn-jfj3z******.selectdbfe.rds.aliyuncs.com:8080',
        'table.identifier' = 'selectdb.selecttable',
        'username' = 'admin',
        'password' = '${secret_values.selectdb}',
        'sink.enable-delete' = 'true'
      );
      
      INSERT INTO selectdb_sink SELECT * FROM `mysqlcatalog`.`order_dw_mysql`.`orders`;
    4. 单击部署后无状态启动作业,详情请参见部署作业作业启动

  3. 通过DMS连接云数据库SelectDB版实例后,查询名称为selecttable的表数据。

    SELECT * FROM `selecttable` ;